drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [10/13] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.
Date Tue, 14 May 2013 01:52:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
new file mode 100644
index 0000000..42a15ae
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
@@ -0,0 +1,90 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public abstract class AbstractExchange extends AbstractSingle implements Exchange {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractExchange.class);
+
+  protected int senderMajorFragmentId;
+  protected int receiverMajorFragmentId;
+
+  public AbstractExchange(PhysicalOperator child) {
+    super(child);
+  }
+
+  /**
+   * Exchanges are not executable. The Execution layer first has to set their parallelization and convert them into
+   * something executable
+   */
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  protected abstract void setupSenders(List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException ;
+  protected abstract void setupReceivers(List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException ;
+  
+  @Override
+  public final void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException {
+    this.senderMajorFragmentId = majorFragmentId;
+    setupSenders(senderLocations);
+  }
+  
+
+  @Override
+  public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
+    this.receiverMajorFragmentId = majorFragmentId;
+    setupReceivers(receiverLocations);
+  }
+  
+  @Override
+  public OperatorCost getAggregateSendCost() {
+    return getExchangeCost().getSendCost();
+  }
+
+  @Override
+  public OperatorCost getAggregateReceiveCost() {
+    return getExchangeCost().getReceiveCost();
+  }
+
+  @Override
+  public final <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitExchange(this, value);
+  }
+
+  @Override
+  public ExchangeCost getExchangeCost(){
+    return ExchangeCost.getSimpleEstimate(getSize());
+  }
+
+  @JsonIgnore
+  @Override
+  public OperatorCost getCost() {
+    return getExchangeCost().getCombinedCost();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
new file mode 100644
index 0000000..f782325
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -0,0 +1,124 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.RangeSender;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.UnionExchange;
+
+public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
+
+  @Override
+  public T visitExchange(Exchange exchange, X value) throws E{
+    return visitOp(exchange, value);
+  }
+
+  @Override
+  public T visitFilter(Filter filter, X value) throws E{
+    return visitOp(filter, value);
+  }
+
+  @Override
+  public T visitProject(Project project, X value) throws E{
+    return visitOp(project, value);
+  }
+
+  @Override
+  public T visitSort(Sort sort, X value) throws E{
+    return visitOp(sort, value);
+  }
+
+  @Override
+  public T visitSender(Sender sender, X value) throws E {
+    return visitOp(sender, value);
+  }
+
+  @Override
+  public T visitReceiver(Receiver receiver, X value) throws E {
+    return visitOp(receiver, value);
+  }
+
+  @Override
+  public T visitScan(Scan<?> scan, X value) throws E{
+    return visitOp(scan, value);
+  }
+
+  @Override
+  public T visitStore(Store store, X value) throws E{
+    return visitOp(store, value);
+  }
+
+  
+  public T visitChildren(PhysicalOperator op, X value) throws E{
+    for(PhysicalOperator child : op){
+      child.accept(this, value);
+    }
+    return null;
+  }
+  
+  @Override
+  public T visitHashPartitionSender(HashPartitionSender op, X value) throws E {
+    return visitSender(op, value);
+  }
+
+  @Override
+  public T visitRandomReceiver(RandomReceiver op, X value) throws E {
+    return visitReceiver(op, value);
+  }
+
+  @Override
+  public T visitHashPartitionSender(HashToRandomExchange op, X value) throws E {
+    return visitExchange(op, value);
+  }
+
+  @Override
+  public T visitRangeSender(RangeSender op, X value) throws E {
+    return visitSender(op, value);
+  }
+
+  @Override
+  public T visitScreen(Screen op, X value) throws E {
+    return visitStore(op, value);
+  }
+
+  @Override
+  public T visitSingleSender(SingleSender op, X value) throws E {
+    return visitSender(op, value);
+  }
+
+  @Override
+  public T visitUnionExchange(UnionExchange op, X value) throws E {
+    return visitExchange(op, value);
+  }
+
+  @Override
+  public T visitOp(PhysicalOperator op, X value) throws E{
+    throw new UnsupportedOperationException(String.format(
+        "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator type %s.", this
+            .getClass().getCanonicalName(), op.getClass().getCanonicalName()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
new file mode 100644
index 0000000..e8ba19c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.exec.physical.OperatorCost;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+public abstract class AbstractReceiver extends AbstractBase implements Receiver{
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractReceiver.class);
+
+  private final int oppositeMajorFragmentId; 
+  
+  public AbstractReceiver(int oppositeMajorFragmentId){
+    this.oppositeMajorFragmentId = oppositeMajorFragmentId;
+  }
+  
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitReceiver(this, value);
+  }
+
+  @Override
+  public final PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    //rewriting is unnecessary since the inputs haven't changed.
+    return this;
+  }
+
+  @JsonProperty("sender-major-fragment")
+  public int getOppositeMajorFragmentId() {
+    return oppositeMajorFragmentId;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
new file mode 100644
index 0000000..dbde9c5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Iterators;
+
+public abstract class AbstractScan<R extends ReadEntry> extends AbstractBase implements Scan<R>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractScan.class);
+  
+  protected final List<R> readEntries;
+  private final OperatorCost cost;
+  private final Size size;
+  
+  public AbstractScan(List<R> readEntries) {
+    this.readEntries = readEntries;
+    OperatorCost cost = new OperatorCost(0,0,0,0);
+    Size size = new Size(0,0);
+    for(R r : readEntries){
+      cost = cost.add(r.getCost());
+      size = size.add(r.getSize());
+    }
+    this.cost = cost;
+    this.size = size;
+  }
+
+  @Override
+  @JsonProperty("entries")
+  public List<R> getReadEntries() {
+    return readEntries;
+  }
+  
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  @Override
+  public boolean isExecutable() {
+    return true;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitScan(this, value);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return cost;
+  }
+
+  @Override
+  public Size getSize() {
+    return size;
+  }
+  
+  
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
new file mode 100644
index 0000000..f8c22b3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.OperatorCost;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+
+public abstract class AbstractSender extends AbstractSingle implements Sender {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSender.class);
+
+  protected final int oppositeMajorFragmentId;
+  
+  public AbstractSender(int oppositeMajorFragmentId, PhysicalOperator child) {
+    super(child);
+    this.oppositeMajorFragmentId = oppositeMajorFragmentId;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSender(this, value);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    float network = child.getSize().getAggSize();
+    return new OperatorCost(network, 0, 1000, child.getSize().getRecordCount());
+  }
+
+  @Override
+  public int getOppositeMajorFragmentId() {
+    return oppositeMajorFragmentId;
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
new file mode 100644
index 0000000..264ee91
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.Iterator;
+import java.util.List;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+/**
+ * Describes an operator that expects a single child operator as its input.
+ * @param <T> The type of Exec model supported.
+ */
+public abstract class AbstractSingle extends AbstractBase{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingle.class);
+  
+  protected final PhysicalOperator child;
+
+  public AbstractSingle(PhysicalOperator child) {
+    super();
+    this.child = child;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.singletonIterator(child);
+  }
+
+  public PhysicalOperator getChild(){
+    return child;
+  }
+  
+  @Override
+  public Size getSize() {
+    return child.getSize();
+  }
+  
+  @Override
+  @JsonIgnore
+  public final PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.size() == 1);
+    return getNewWithChild(children.iterator().next());
+  }
+  
+  protected abstract PhysicalOperator getNewWithChild(PhysicalOperator child);
+    
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
new file mode 100644
index 0000000..a833a4e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+
+
+
+public abstract class AbstractStore extends AbstractSingle implements Store{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStore.class);
+
+  public AbstractStore(PhysicalOperator child) {
+    super(child);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitStore(this, value);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
new file mode 100644
index 0000000..c8bc3a8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public interface Exchange extends PhysicalOperator {
+
+  @JsonIgnore
+  public abstract OperatorCost getAggregateSendCost();
+
+  @JsonIgnore
+  public abstract OperatorCost getAggregateReceiveCost();
+
+  @JsonIgnore
+  public abstract ExchangeCost getExchangeCost();
+
+  /**
+   * Inform this Exchange node about its sender locations. This list should be index-ordered the same as the expected
+   * minorFragmentIds for each sender.
+   * 
+   * @param senderLocations
+   */
+  public abstract void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException;
+
+  /**
+   * Inform this Exchange node about its receiver locations. This list should be index-ordered the same as the expected
+   * minorFragmentIds for each receiver.
+   * 
+   * @param receiverLocations
+   */
+  public abstract void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException;
+
+  /**
+   * Get the Sender associated with the given minorFragmentId. Cannot be called until after setupSenders() and
+   * setupReceivers() have been called.
+   * 
+   * @param minorFragmentId
+   *          The minor fragment id, must be in the range [0, fragment.width).
+   * @param child
+   *          The feeding node for the requested sender.
+   * @return The materialized sender for the given arguments.
+   */
+  public abstract Sender getSender(int minorFragmentId, PhysicalOperator child) throws PhysicalOperatorSetupException;
+
+  /**
+   * Get the Receiver associated with the given minorFragmentId. Cannot be called until after setupSenders() and
+   * setupReceivers() have been called.
+   * 
+   * @param minorFragmentId
+   *          The minor fragment id, must be in the range [0, fragment.width).
+   * @return The materialized recevier for the given arguments.
+   */
+  public abstract Receiver getReceiver(int minorFragmentId);
+
+  /**
+   * The widest width this sender can send (max sending parallelization). Typically Integer.MAX_VALUE.
+   * 
+   * @return
+   */
+  @JsonIgnore
+  public abstract int getMaxSendWidth();
+
+  /**
+   * Return the feeding child of this operator node.
+   * 
+   * @return
+   */
+  public PhysicalOperator getChild();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ExchangeCost.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ExchangeCost.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ExchangeCost.java
new file mode 100644
index 0000000..9e94f06
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ExchangeCost.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.OperatorCost;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A container class that holds both send and receive costs for an exchange node.
+ */
+public class ExchangeCost {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExchangeCost.class);
+  
+  private final OperatorCost send;
+  private final OperatorCost receive;
+  private final OperatorCost combined;
+  
+  @JsonCreator
+  public ExchangeCost(@JsonProperty("send") OperatorCost send, @JsonProperty("receive") OperatorCost receive) {
+    this.send = send;
+    this.receive = receive;
+    this.combined =  OperatorCost.combine(send,  receive);
+  }
+
+
+  
+  @JsonIgnore
+  public OperatorCost getCombinedCost(){
+    return combined;
+  }
+
+  @JsonProperty("send")
+  public OperatorCost getSendCost() {
+    return send;
+  }
+
+  @JsonProperty("receive")
+  public OperatorCost getReceiveCost() {
+    return receive;
+  }
+  
+  public static ExchangeCost getSimpleEstimate(Size s){
+    long cnt = s.getRecordCount();
+    int sz = s.getRecordSize();
+    OperatorCost send = new OperatorCost(cnt*sz, 0, 0, cnt);
+    OperatorCost receive = new OperatorCost(cnt*sz, 0, 0, cnt);
+    return new ExchangeCost(send, receive);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentLeaf.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentLeaf.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentLeaf.java
new file mode 100644
index 0000000..522ef7b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentLeaf.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+/**
+ * A Physical Operator that can be the leaf node of one particular execution fragment. Typically includes Receivers and
+ * Scans
+ */
+public interface FragmentLeaf extends PhysicalOperator {
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
new file mode 100644
index 0000000..66147cc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+
+/**
+ * Describes the root operation within a particular Fragment. This includes things Sender nodes. 
+ */
+public interface FragmentRoot extends FragmentLeaf{
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
new file mode 100644
index 0000000..1a9a3a9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Describes a physical operator that has affinity to particular nodes. Used for assignment decisions.
+ */
+public interface HasAffinity extends PhysicalOperator {
+  
+  /**
+   * Get the list of Endpoints with associated affinities that this operator has preference for.
+   * @return List of EndpointAffinity objects.
+   */
+  @JsonIgnore
+  public List<EndpointAffinity> getOperatorAffinity();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
new file mode 100644
index 0000000..d4ed456
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+/**
+ * An operator which specifically is a lowest level leaf node of a query plan across all possible fragments. Currently, the only operator that is a Leaf
+ * node are Scan nodes. Ultimately this could include use of Cache scans and other types of atypical data production systems.
+ */
+public interface Leaf extends FragmentLeaf {
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
new file mode 100644
index 0000000..d412c2d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -0,0 +1,80 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.common.graph.GraphValue;
+import org.apache.drill.exec.physical.OperatorCost;
+
+import com.fasterxml.jackson.annotation.JsonIdentityInfo;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.ObjectIdGenerators;
+
+@JsonInclude(Include.NON_NULL)
+@JsonPropertyOrder({ "@id" })
+@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop")
+public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
+
+  /**
+   * Get the cost of execution of this particular operator.
+   * 
+   * @return
+   */
+  @JsonIgnore
+  public OperatorCost getCost();
+  
+  /**
+   * Get the estimated size of this particular operator.
+   * @return
+   */
+  @JsonIgnore
+  public Size getSize();
+  
+  /**
+   * Describes whether or not a particular physical operator can actually be executed. Most physical operators can be
+   * executed. However, Exchange nodes cannot be executed. In order to be executed, they must be converted into their
+   * Exec sub components.
+   * 
+   * @return
+   */
+  @JsonIgnore
+  public boolean isExecutable();
+
+  /**
+   * Provides capability to build a set of output based on traversing a query graph tree.
+   * 
+   * @param physicalVisitor
+   * @return
+   */
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E;
+
+  /**
+   * Regenerate with this node with a new set of children.  This is used in the case of materialization or optimization.
+   * @param children
+   * @return
+   */
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
new file mode 100644
index 0000000..d6e2fc4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.PathScanner;
+
+public class PhysicalOperatorUtil {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalOperatorUtil.class);
+  
+  private PhysicalOperatorUtil(){}
+  
+  public synchronized static Class<?>[] getSubTypes(DrillConfig config){
+    Class<?>[] ops = PathScanner.scanForImplementationsArr(PhysicalOperator.class, config.getStringList(CommonConstants.PHYSICAL_OPERATOR_SCAN_PACKAGES));
+    logger.debug("Adding Physical Operator sub types: {}", ((Object) ops) );
+    return ops;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
new file mode 100644
index 0000000..f36633f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.RangeSender;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.UnionExchange;
+
+/**
+ * Visitor class designed to traversal of a operator tree.  Basis for a number of operator manipulations including fragmentation and materialization.
+ * @param <RETURN> The class associated with the return of each visit method.
+ * @param <EXTRA> The class object associated with additional data required for a particular operator modification.
+ * @param <EXCEP> An optional exception class that can be thrown when a portion of a modification or traversal fails.  Must extend Throwable.  In the case where the visitor does not throw any caught exception, this can be set as RuntimeException.
+ */
+public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalVisitor.class);
+  
+  
+  public RETURN visitExchange(Exchange exchange, EXTRA value) throws EXCEP;
+  public RETURN visitScan(Scan<?> scan, EXTRA value) throws EXCEP;
+  public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
+
+  public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
+  public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
+  public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
+  public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;
+  public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;
+  
+  public RETURN visitOp(PhysicalOperator op, EXTRA value) throws EXCEP;
+  
+  
+  public RETURN visitHashPartitionSender(HashPartitionSender op, EXTRA value) throws EXCEP;
+  public RETURN visitRandomReceiver(RandomReceiver op, EXTRA value) throws EXCEP;
+  public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) throws EXCEP;
+  public RETURN visitRangeSender(RangeSender op, EXTRA value) throws EXCEP;
+  public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
+  public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
+  public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
new file mode 100644
index 0000000..cd4cb4a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A receiver is one half of an exchange operator. The receiver is responsible for taking in one or more streams from
+ * corresponding Senders.  Receivers are a special type of Physical Operator that are typically only expressed within the execution plan.
+ */
+public interface Receiver extends FragmentLeaf {
+  
+  /**
+   * A receiver is expecting streams from one or more providing endpoints.  This method should return a list of the expected sending endpoints.
+   * @return List of counterpart sending DrillbitEndpoints.
+   */
+  public abstract List<DrillbitEndpoint> getProvidingEndpoints();
+
+  /**
+   * Whether or not this receive supports out of order exchange. This provides a hint for the scheduling node on whether
+   * the receiver can start work if only a subset of all sending endpoints are currently providing data. A random
+   * receiver would supports this form of operation. A NWAY receiver would not.
+   * 
+   * @return True if this receiver supports working on a streaming/out of order input.
+   */
+  @JsonIgnore
+  public abstract boolean supportsOutOfOrderExchange();
+
+  @JsonProperty("sender-major-fragment")
+  public int getOppositeMajorFragmentId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Root.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Root.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Root.java
new file mode 100644
index 0000000..7adef63
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Root.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+/**
+ * Marker interface describe the root of a query plan.  Currently, this is constrained to Screen.
+ */
+public interface Root extends FragmentRoot{
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
new file mode 100644
index 0000000..2207f79
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public interface Scan<R extends ReadEntry> extends Leaf, HasAffinity{
+
+  @JsonProperty("entries")
+  public abstract List<R> getReadEntries();
+
+  public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
+
+  public abstract Scan<?> getSpecificScan(int minorFragmentId);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
new file mode 100644
index 0000000..71513c7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A sender is one half of an exchange node operations. It is responsible for subdividing/cloning and sending a local
+ * record set to a set of destination locations. This is typically only utilized at the level of the execution plan.
+ */
+public interface Sender extends FragmentRoot {
+  
+  /**
+   * Get the list of destination endpoints that this Sender will be communicating with.
+   * @return List of DrillbitEndpoints.
+   */
+  public abstract List<DrillbitEndpoint> getDestinations();
+  
+  /**
+   * Get the receiver major fragment id that is opposite this sender.
+   * @return
+   */
+  @JsonProperty("receiver-major-fragment")
+  public int getOppositeMajorFragmentId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Size.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Size.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Size.java
new file mode 100644
index 0000000..7bc6f93
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Size.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+public class Size {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Size.class);
+
+  private final long rowCount;
+  private final int rowSize;
+
+  public Size(long rowCount, int rowSize) {
+    super();
+    this.rowCount = rowCount;
+    this.rowSize = rowSize;
+  }
+
+  public long getRecordCount() {
+    return rowCount;
+  }
+
+  public int getRecordSize() {
+    return rowSize;
+  }
+  
+  public Size add(Size s){
+    return new Size(rowCount + s.rowCount, Math.max(rowSize, s.rowSize));
+  }
+  
+  public long getAggSize(){
+    return rowCount * rowSize;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java
new file mode 100644
index 0000000..9b3a812
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java
@@ -0,0 +1,74 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * An interface which supports storing a record stream. In contrast to the logical layer, in the physical/execution
+ * layers, a Store node is actually an outputting node (rather than a root node) that provides returns one or more
+ * records regarding the completion of the query.
+ */
+public interface Store extends HasAffinity {
+
+  /**
+   * Inform the Store node about the actual decided DrillbitEndpoint assignments desired for storage purposes. This is a
+   * precursor to the execution planner running a set of getSpecificStore() method calls for full Store node
+   * materialization.
+   * 
+   * @param endpoints
+   *          The list of endpoints that this Store node are going to be executed on.
+   * @throws PhysicalOperatorSetupException
+   */
+  public abstract void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException;
+
+  /**
+   * Provides full materialized Store operators for execution purposes.
+   * 
+   * @param child
+   *          The child operator that this operator will consume from.
+   * @param minorFragmentId
+   *          The particular minor fragment id associated with this particular fragment materialization.
+   * @return A materialized Store Operator.
+   * @throws PhysicalOperatorSetupException
+   */
+  public abstract Store getSpecificStore(PhysicalOperator child, int minorFragmentId)
+      throws PhysicalOperatorSetupException;
+
+  /**
+   * The maximum allowable width for the Store operation. In some cases, a store operation has a limited number of
+   * parallelizations that it can support. For example, a Screen return cannot be parallelized at all. In this case, a
+   * maxWidth value of 1 will be returned. In the case that there is no limit for parallelization, this method should
+   * return Integer.MAX_VALUE.
+   * 
+   * @return
+   */
+  @JsonIgnore
+  public abstract int getMaxWidth();
+
+  /**
+   * Get the child of this store operator as this will be needed for parallelization materialization purposes.
+   * @return
+   */
+  public abstract PhysicalOperator getChild();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Filter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Filter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Filter.java
new file mode 100644
index 0000000..4af4243
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Filter.java
@@ -0,0 +1,75 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("filter")
+public class Filter extends AbstractSingle {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filter.class);
+
+  private final LogicalExpression expr;
+  private final float selectivity;
+  
+  @JsonCreator
+  public Filter(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr, @JsonProperty("selectivity") float selectivity) {
+    super(child);
+    this.expr = expr;
+    this.selectivity = selectivity;
+  }
+
+  public LogicalExpression getExpr() {
+    return expr;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitFilter(this, value);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return child.getCost();
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new Filter(child, expr, selectivity);
+  }
+
+  @Override
+  public Size getSize() {
+    return new Size( (long) (child.getSize().getRecordCount()*selectivity), child.getSize().getRecordSize());
+  }
+  
+  
+
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
new file mode 100644
index 0000000..84994f6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.AbstractSender;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Lists;
+
+@JsonTypeName("hash-partition-sender")
+public class HashPartitionSender extends AbstractSender {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartitionSender.class);
+
+  private final List<DrillbitEndpoint> endpoints;
+  private final LogicalExpression expr;
+  
+  @JsonCreator
+  public HashPartitionSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr, @JsonProperty("destinations") List<DrillbitEndpoint> endpoints) {
+    super(oppositeMajorFragmentId, child);
+    this.expr = expr;
+    this.endpoints = endpoints;
+  }
+
+  @Override
+  public List<DrillbitEndpoint> getDestinations() {
+    return endpoints;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new HashPartitionSender(oppositeMajorFragmentId, child, expr, endpoints);
+  }
+
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
new file mode 100644
index 0000000..1f158ce
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.beans.Transient;
+import java.util.List;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.AbstractExchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("hash-to-random-exchange")
+public class HashToRandomExchange extends AbstractExchange{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToRandomExchange.class);
+
+  
+  private final LogicalExpression expr;
+
+  //ephemeral for setup tasks.
+  private List<DrillbitEndpoint> senderLocations;
+  private List<DrillbitEndpoint> receiverLocations;
+  
+  @JsonCreator
+  public HashToRandomExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr) {
+    super(child);
+    this.expr = expr;
+  }
+
+  @Override
+  public int getMaxSendWidth() {
+    return Integer.MAX_VALUE;
+  }
+
+
+  @Override
+  protected void setupSenders(List<DrillbitEndpoint> senderLocations) {
+    this.senderLocations = senderLocations;
+  }
+
+  @Override
+  protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) {
+    this.receiverLocations = receiverLocations;
+  }
+
+  @Override
+  public Sender getSender(int minorFragmentId, PhysicalOperator child) {
+    return new HashPartitionSender(receiverMajorFragmentId, child, expr, receiverLocations);
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+    return new RandomReceiver(senderMajorFragmentId, senderLocations);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new HashToRandomExchange(child, expr);
+  }
+
+  
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
new file mode 100644
index 0000000..eaaeaa3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -0,0 +1,113 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MockScanPOP.MockColumn;
+import org.apache.drill.exec.physical.config.MockScanPOP.MockScanEntry;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
+import org.apache.drill.exec.store.RecordReader;
+
+public class MockRecordReader implements RecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
+
+  private OutputMutator output;
+  private MockScanEntry config;
+  private FragmentContext context;
+  private ValueVector<?>[] valueVectors;
+  private int recordsRead;
+
+  public MockRecordReader(FragmentContext context, MockScanEntry config) {
+    this.context = context;
+    this.config = config;
+  }
+
+  private int getEstimatedRecordSize(MockColumn[] types) {
+    int x = 0;
+    for (int i = 0; i < types.length; i++) {
+      x += TypeHelper.getSize(types[i].getMajorType());
+    }
+    return x;
+  }
+
+  private ValueVector<?> getVector(int fieldId, String name, MajorType type, int length) {
+    assert context != null : "Context shouldn't be null.";
+    
+    if(type.getMode() != DataMode.REQUIRED) throw new UnsupportedOperationException();
+    
+    MaterializedField f = MaterializedField.create(new SchemaPath(name), fieldId, 0, type);
+    ValueVector<?> v;
+    v = TypeHelper.getNewVector(f, context.getAllocator());
+    v.allocateNew(length);
+    
+    return v;
+
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    try {
+      this.output = output;
+      int estimateRowSize = getEstimatedRecordSize(config.getTypes());
+      valueVectors = new ValueVector<?>[config.getTypes().length];
+      int batchRecordCount = 250000 / estimateRowSize;
+
+      for (int i = 0; i < config.getTypes().length; i++) {
+        logger.debug("Adding field {} of type {}", i, config.getTypes()[i]);
+        valueVectors[i] = getVector(i, config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
+        output.addField(i, valueVectors[i]);
+      }
+      output.setNewSchema();
+    } catch (SchemaChangeException e) {
+      throw new ExecutionSetupException("Failure while setting up fields", e);
+    }
+
+  }
+
+  @Override
+  public int next() {
+    int recordSetSize = Math.min(valueVectors[0].capacity(), this.config.getRecords()- recordsRead);
+    recordsRead += recordSetSize;
+    for(ValueVector<?> v : valueVectors){
+      v.randomizeData();
+      v.setRecordCount(recordSetSize);
+    }
+    return recordSetSize;
+  }
+
+  @Override
+  public void cleanup() {
+    for (int i = 0; i < valueVectors.length; i++) {
+      try {
+        output.removeField(valueVectors[i].getField().getFieldId());
+      } catch (SchemaChangeException e) {
+        logger.warn("Failure while trying tremove field.", e);
+      }
+      valueVectors[i].close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
new file mode 100644
index 0000000..b821d6e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MockScanPOP.MockScanEntry;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.ValueVector;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class MockScanBatchCreator implements BatchCreator<MockScanPOP>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, MockScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    List<MockScanEntry> entries = config.getReadEntries();
+    List<RecordReader> readers = Lists.newArrayList();
+    for(MockScanEntry e : entries){
+      readers.add(new MockRecordReader(context, e));
+    }
+    return new ScanBatch(context, readers.iterator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
new file mode 100644
index 0000000..4a3a606
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
@@ -0,0 +1,193 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.AbstractScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
+import org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
+import org.apache.drill.exec.record.vector.TypeHelper;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("mock-scan")
+public class MockScanPOP extends AbstractScan<MockScanPOP.MockScanEntry> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanPOP.class);
+
+  private final String url;
+  private  LinkedList<MockScanEntry>[] mappings;
+
+  @JsonCreator
+  public MockScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
+    super(readEntries);
+    this.url = url;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  
+  public static class MockScanEntry implements ReadEntry {
+
+    private final int records;
+    private final MockColumn[] types;
+    private final int recordSize;
+    
+
+    @JsonCreator
+    public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
+      this.records = records;
+      this.types = types;
+      int size = 0;
+      for(MockColumn dt : types){
+        size += TypeHelper.getSize(dt.getMajorType());
+      }
+      this.recordSize = size;
+    }
+
+    @Override
+    public OperatorCost getCost() {
+      return new OperatorCost(1, 2, 1, 1);
+    }
+
+    
+    public int getRecords() {
+      return records;
+    }
+
+    public MockColumn[] getTypes() {
+      return types;
+    }
+
+    @Override
+    public Size getSize() {
+      return new Size(records, recordSize);
+    }
+  }
+  
+  @JsonInclude(Include.NON_NULL)
+  public static class MockColumn{
+    @JsonProperty("type") public MinorType minorType;
+    public String name;
+    public DataMode mode;
+    public Integer width;
+    public Integer precision;
+    public Integer scale;
+    
+    
+    @JsonCreator
+    public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
+      this.name = name;
+      this.minorType = minorType;
+      this.mode = mode;
+      this.width = width;
+      this.precision = precision;
+      this.scale = scale;
+    }
+    
+    @JsonProperty("type")
+    public MinorType getMinorType() {
+      return minorType;
+    }
+    public String getName() {
+      return name;
+    }
+    public DataMode getMode() {
+      return mode;
+    }
+    public Integer getWidth() {
+      return width;
+    }
+    public Integer getPrecision() {
+      return precision;
+    }
+    public Integer getScale() {
+      return scale;
+    }
+    
+    @JsonIgnore
+    public MajorType getMajorType(){
+      MajorType.Builder b = MajorType.newBuilder();
+      b.setMode(mode);
+      b.setMinorType(minorType);
+      if(precision != null) b.setPrecision(precision);
+      if(width != null) b.setWidth(width);
+      if(scale != null) b.setScale(scale);
+      return b.build();
+    }
+    
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.emptyList();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
+    
+    mappings = new LinkedList[endpoints.size()];
+
+    int i =0;
+    for(MockScanEntry e : this.getReadEntries()){
+      if(i == endpoints.size()) i -= endpoints.size();
+      LinkedList<MockScanEntry> entries = mappings[i];
+      if(entries == null){
+        entries = new LinkedList<MockScanEntry>();
+        mappings[i] = entries;
+      }
+      entries.add(e);
+      i++;
+    }
+  }
+
+  @Override
+  public Scan<?> getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
+    return new MockScanPOP(url, mappings[minorFragmentId]);
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new MockScanPOP(url, readEntries);
+
+  }
+
+}


Mime
View raw message