drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [07/13] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.
Date Tue, 14 May 2013 01:52:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
new file mode 100644
index 0000000..fc03a23
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -0,0 +1,163 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner.fragment;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.work.QueryWorkUnit;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class SimpleParallelizer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
+
+  private final Materializer materializer = new Materializer();
+
+  /**
+   * Generate a set of assigned fragments based on the provided planningSet. Do not allow parallelization stages to go
+   * beyond the global max width.
+   * 
+   * @param context
+   *          The current QueryContext.
+   * @param planningSet
+   *          The set of queries with collected statistics that we'll work with.
+   * @param globalMaxWidth
+   *          The maximum level or paralellization any stage of the query can do. Note that while this might be the
+   *          number of active Drillbits, realistically, this could be well beyond that number of we want to do things
+   *          like speed results return.
+   * @return The list of generatoe PlanFragment protobuf objects to be assigned out to the individual nodes.
+   * @throws FragmentSetupException
+   */
+  public QueryWorkUnit getFragments(DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
+      int globalMaxWidth) throws ExecutionSetupException {
+    assignEndpoints(activeEndpoints, planningSet, globalMaxWidth);
+    return generateWorkUnit(foremanNode, queryId, reader, rootNode, planningSet);
+  }
+
+  private QueryWorkUnit generateWorkUnit(DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode,
+      PlanningSet planningSet) throws ExecutionSetupException {
+
+    List<PlanFragment> fragments = Lists.newArrayList();
+
+    PlanFragment rootFragment = null;
+    FragmentRoot rootOperator = null;
+
+    // now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints
+    // assigned before we can materialize, so we start a new loop here rather than utilizing the previous one.
+    for (Wrapper wrapper : planningSet) {
+      Fragment node = wrapper.getNode();
+      Stats stats = node.getStats();
+      final PhysicalOperator physicalOperatorRoot = node.getRoot();
+      boolean isRootNode = rootNode == node;
+
+      if (isRootNode && wrapper.getWidth() != 1)
+        throw new FragmentSetupException(
+            String.format(
+                    "Failure while trying to setup fragment.  The root fragment must always have parallelization one.  In the current case, the width was set to %d.",
+                    wrapper.getWidth()));
+      // a fragment is self driven if it doesn't rely on any other exchanges.
+      boolean isLeafFragment = node.getReceivingExchangePairs().size() == 0;
+
+      // Create a minorFragment for each major fragment.
+      for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) {
+        IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
+        PhysicalOperator op = physicalOperatorRoot.accept(materializer, iNode);
+        Preconditions.checkArgument(op instanceof FragmentRoot);
+        FragmentRoot root = (FragmentRoot) op; 
+
+        // get plan as JSON
+        String plan;
+        try {
+          plan = reader.writeJson(root);
+        } catch (JsonProcessingException e) {
+          throw new FragmentSetupException("Failure while trying to convert fragment into json.", e);
+        }
+        
+        FragmentHandle handle = FragmentHandle //
+            .newBuilder() //
+            .setMajorFragmentId(wrapper.getMajorFragmentId()) //
+            .setMinorFragmentId(minorFragmentId) //
+            .setQueryId(queryId) //
+            .build();
+        PlanFragment fragment = PlanFragment.newBuilder() //
+            .setCpuCost(stats.getCpuCost()) //
+            .setDiskCost(stats.getDiskCost()) //
+            .setForeman(foremanNode) //
+            .setMemoryCost(stats.getMemoryCost()) //
+            .setNetworkCost(stats.getNetworkCost()) //
+            .setFragmentJson(plan) //
+            .setHandle(handle) //
+            .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) //
+            .setLeafFragment(isLeafFragment) //
+            .build();
+
+        if (isRootNode) {
+          rootFragment = fragment;
+          rootOperator = root;
+        } else {
+          fragments.add(fragment);
+        }
+      }
+    }
+
+    return new QueryWorkUnit(rootOperator, rootFragment, fragments);
+
+  }
+
+  private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet,
+      int globalMaxWidth) throws PhysicalOperatorSetupException {
+    // First we determine the amount of parallelization for a fragment. This will be between 1 and maxWidth based on
+    // cost. (Later could also be based on cluster operation.) then we decide endpoints based on affinity (later this
+    // could be based on endpoint load)
+    for (Wrapper wrapper : planningSet) {
+
+      Stats stats = wrapper.getStats();
+
+      // figure out width.
+      int width = Math.min(stats.getMaxWidth(), globalMaxWidth);
+      float diskCost = stats.getDiskCost();
+      logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
+
+      // TODO: right now we'll just assume that each task is cost 1 so we'll set the breadth at the lesser of the number
+      // of tasks or the maximum width of the fragment.
+      if (diskCost < width) {
+        width = (int) diskCost;
+      }
+
+      if (width < 1) width = 1;
+      logger.debug("Setting width {} on fragment {}", width, wrapper);
+      wrapper.setWidth(width);
+      // figure out endpoint assignments. also informs the exchanges about their respective endpoints.
+      wrapper.assignEndpoints(allNodes);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
new file mode 100644
index 0000000..729b2f1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
@@ -0,0 +1,70 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.exec.physical.OperatorCost;
+
+public class Stats {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Stats.class);
+  
+  private int maxWidth = Integer.MAX_VALUE;
+  private float networkCost; 
+  private float diskCost;
+  private float memoryCost;
+  private float cpuCost;
+  
+  public void addMaxWidth(int maxWidth){
+    this.maxWidth = Math.min(this.maxWidth, maxWidth);
+  }
+  
+  public void addCost(OperatorCost cost){
+    networkCost += cost.getNetwork();
+    diskCost += cost.getDisk();
+    memoryCost += cost.getMemory();
+    cpuCost += cost.getCpu();
+  }
+
+  public int getMaxWidth() {
+    return maxWidth;
+  }
+
+  public float getNetworkCost() {
+    return networkCost;
+  }
+
+  public float getDiskCost() {
+    return diskCost;
+  }
+
+  public float getMemoryCost() {
+    return memoryCost;
+  }
+
+  public float getCpuCost() {
+    return cpuCost;
+  }
+
+  @Override
+  public String toString() {
+    return "FragmentStats [maxWidth=" + maxWidth + ", networkCost=" + networkCost + ", diskCost=" + diskCost
+        + ", memoryCost=" + memoryCost + ", cpuCost=" + cpuCost + "]";
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
new file mode 100644
index 0000000..d53a78c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -0,0 +1,106 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.HasAffinity;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+
+import com.google.common.base.Preconditions;
+
+public class StatsCollector {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatsCollector.class);
+
+  private final static OpStatsCollector opStatCollector = new OpStatsCollector();
+
+  private StatsCollector() {
+  };
+
+  private static void visit(PlanningSet planningSet, Fragment n) {
+    Preconditions.checkNotNull(planningSet);
+    Preconditions.checkNotNull(n);
+
+    Wrapper wrapper = planningSet.get(n);
+    n.getRoot().accept(opStatCollector, wrapper);
+    logger.debug("Set stats to {}", wrapper.getStats());
+    // receivers...
+    for (ExchangeFragmentPair child : n) {
+      // get the fragment node that feeds this node.
+      Fragment childNode = child.getNode();
+      visit(planningSet, childNode);
+    }
+
+  }
+
+  public static PlanningSet collectStats(Fragment rootFragment) {
+    PlanningSet fps = new PlanningSet();
+    visit(fps, rootFragment);
+    return fps;
+  }
+
+  private static class OpStatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeException> {
+
+    @Override
+    public Void visitSendingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
+      Stats stats = wrapper.getStats();
+      stats.addCost(exchange.getAggregateSendCost());
+      stats.addMaxWidth(exchange.getMaxSendWidth());
+      return super.visitSendingExchange(exchange, wrapper);
+    }
+
+    @Override
+    public Void visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
+      wrapper.getStats().addCost(exchange.getAggregateReceiveCost());
+      // no traversal since it would cross fragment boundary.
+      return null;
+    }
+
+    @Override
+    public Void visitScan(Scan<?> scan, Wrapper wrapper) {
+      Stats stats = wrapper.getStats();      
+      stats.addMaxWidth(scan.getReadEntries().size());
+      return super.visitScan(scan, wrapper);
+    }
+
+    @Override
+    public Void visitStore(Store store, Wrapper wrapper) {
+      Stats stats = wrapper.getStats();
+      stats.addMaxWidth(store.getMaxWidth());
+      return super.visitStore(store, wrapper);
+    }
+
+    @Override
+    public Void visitOp(PhysicalOperator op, Wrapper wrapper) {
+      if(op instanceof HasAffinity){
+        wrapper.addEndpointAffinity(((HasAffinity)op).getOperatorAffinity());
+      }
+      Stats stats = wrapper.getStats();
+      stats.addCost(op.getCost());
+      for (PhysicalOperator child : op) {
+        child.accept(this, wrapper);
+      }
+      return null;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
new file mode 100644
index 0000000..0dfcb62
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -0,0 +1,186 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner.fragment;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A wrapping class that allows us to add additional information to each fragment node for planning purposes.
+ */
+public class Wrapper {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Wrapper.class);
+
+  private final Fragment node;
+  private final int majorFragmentId;
+  private int width = -1;
+  private final Stats stats;
+  private boolean endpointsAssigned;
+  private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = Maps.newHashMap();
+
+  // a list of assigned endpoints. Technically, there could repeated endpoints in this list if we'd like to assign the
+  // same fragment multiple times to the same endpoint.
+  private List<DrillbitEndpoint> endpoints = Lists.newLinkedList();
+
+  public Wrapper(Fragment node, int majorFragmentId) {
+    this.majorFragmentId = majorFragmentId;
+    this.node = node;
+    this.stats = new Stats();
+  }
+
+  public Stats getStats() {
+    return stats;
+  }
+
+  public void addEndpointAffinity(List<EndpointAffinity> affinities){
+    Preconditions.checkState(!endpointsAssigned);
+    for(EndpointAffinity ea : affinities){
+      addEndpointAffinity(ea.getEndpoint(), ea.getAffinity());
+    }
+  }
+  
+  public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
+    Preconditions.checkState(!endpointsAssigned);
+    EndpointAffinity ea = endpointAffinity.get(endpoint);
+    if (ea == null) {
+      ea = new EndpointAffinity(endpoint);
+      endpointAffinity.put(endpoint, ea);
+    }
+
+    ea.addAffinity(affinity);
+  }
+
+  public int getMajorFragmentId() {
+    return majorFragmentId;
+  }
+
+  public int getWidth() {
+    return width;
+  }
+
+  public void setWidth(int width) {
+    Preconditions.checkState(this.width == -1);
+    this.width = width;
+  }
+
+  public Fragment getNode() {
+    return node;
+  }
+
+  private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{
+
+    
+    @Override
+    public Void visitExchange(Exchange exchange, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+      if(exchange == node.getSendingExchange()){
+        return visitOp(exchange, value);
+      }
+      // stop on receiver exchange.
+      return null;
+    }
+
+    @Override
+    public Void visitScan(Scan<?> scan, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+      scan.applyAssignments(value);
+      return super.visitScan(scan, value);
+    }
+
+    @Override
+    public Void visitStore(Store store, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+      store.applyAssignments(value);
+      return super.visitStore(store, value);
+    }
+
+    @Override
+    public Void visitOp(PhysicalOperator op, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+      return visitChildren(op, value);
+    }
+    
+  }
+  
+  public void assignEndpoints(Collection<DrillbitEndpoint> allPossible) throws PhysicalOperatorSetupException {
+    Preconditions.checkState(!endpointsAssigned);
+
+    endpointsAssigned = true;
+
+    List<EndpointAffinity> values = Lists.newArrayList();
+    values.addAll(endpointAffinity.values());
+
+    if (values.size() == 0) {
+      List<DrillbitEndpoint> all = Lists.newArrayList(allPossible);
+      final int div = allPossible.size();
+      int start = ThreadLocalRandom.current().nextInt(div);
+      // round robin with random start.
+      for (int i = start; i < start + width; i++) {
+        endpoints.add(all.get(i % div));
+      }
+    } else if (values.size() < width) {
+      throw new NotImplementedException(
+          "Haven't implemented a scenario where we have some node affinity but the affinity list is smaller than the expected width.");
+    } else {
+      // get nodes with highest affinity.
+      Collections.sort(values);
+      values = Lists.reverse(values);
+      for (int i = 0; i < width; i++) {
+        endpoints.add(values.get(i).getEndpoint());
+      }
+    }
+
+    // Set scan and store endpoints.
+    AssignEndpointsToScanAndStore visitor = new AssignEndpointsToScanAndStore();
+    node.getRoot().accept(visitor, endpoints);
+    
+    // Set the endpoints for this (one at most) sending exchange.
+    if (node.getSendingExchange() != null) {
+      node.getSendingExchange().setupSenders(majorFragmentId, endpoints);
+    }
+
+    // Set the endpoints for each incoming exchange within this fragment.
+    for (ExchangeFragmentPair e : node.getReceivingExchangePairs()) {
+      e.getExchange().setupReceivers(majorFragmentId, endpoints);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "FragmentWrapper [majorFragmentId=" + majorFragmentId + ", width=" + width + ", stats=" + stats + "]";
+  }
+
+  public DrillbitEndpoint getAssignedEndpoint(int minorFragmentId) {
+    Preconditions.checkState(endpointsAssigned);
+    return this.endpoints.get(minorFragmentId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
deleted file mode 100644
index 562d109..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.pop.receiver;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractReceiver;
-import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("nway-ordering-receiver")
-public class NWayOrderingReceiver extends AbstractReceiver{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NWayOrderingReceiver.class);
-
-  @Override
-  public List<DrillbitEndpoint> getProvidingEndpoints() {
-    return null;
-  }
-
-  @Override
-  public boolean supportsOutOfOrderExchange() {
-    return false;
-  }
-
-  @Override
-  public int getSenderCount() {
-    return 0;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
deleted file mode 100644
index 487c645..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.pop.receiver;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractReceiver;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
-import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("receiver-random")
-public class RandomReceiver extends AbstractReceiver{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
-
-  @Override
-  public List<DrillbitEndpoint> getProvidingEndpoints() {
-    return null;
-  }
-
-  @Override
-  public boolean supportsOutOfOrderExchange() {
-    return false;
-  }
-
-  @Override
-  public int getSenderCount() {
-    return 0;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
-    return null;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
deleted file mode 100644
index b0fb51c..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.pop.sender;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractSender;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
-import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("hash-partition-sender")
-public class HashPartitionSender extends AbstractSender {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartitionSender.class);
-
-  public HashPartitionSender(PhysicalOperator child) {
-    super(child);
-  }
-
-
-  @Override
-  public List<DrillbitEndpoint> getDestinations() {
-    return null;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
-    return null;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index d3e4b23..05b1cc7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -17,25 +17,19 @@
  ******************************************************************************/
 package org.apache.drill.exec.record;
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.vector.ValueVector;
 
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
-import com.google.common.collect.Lists;
 
-public class BatchSchema implements Iterable<MaterializedField>{
+public class BatchSchema implements Iterable<MaterializedField> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
-  
+
   private final List<MaterializedField> fields;
-  private final boolean hasSelectionVector;
-  
-  private BatchSchema(boolean hasSelectionVector, List<MaterializedField> fields) {
+  final boolean hasSelectionVector;
+
+  BatchSchema(boolean hasSelectionVector, List<MaterializedField> fields) {
     this.fields = fields;
     this.hasSelectionVector = hasSelectionVector;
   }
@@ -45,88 +39,16 @@ public class BatchSchema implements Iterable<MaterializedField>{
     return fields.iterator();
   }
 
-  public void addAnyField(short fieldId, boolean nullable, ValueMode mode){
-    addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
+  public static SchemaBuilder newBuilder() {
+    return new SchemaBuilder();
   }
-  
-  public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass){
-    fields.add(new MaterializedField(fieldId, type, nullable, mode, valueClass));
+
+  @Override
+  public String toString() {
+    return "BatchSchema [fields=" + fields + ", hasSelectionVector=" + hasSelectionVector + "]";
   }
+
   
   
-  /**
-   * Builder to build BatchSchema.  Can have a supporting expected object.  If the expected Schema object is defined, the builder will always check that this schema is a equal or more materialized version of the current schema.
-   */
-  public class BatchSchemaBuilder{
-    private IntObjectOpenHashMap<MaterializedField> fields = new IntObjectOpenHashMap<MaterializedField>();
-    private IntObjectOpenHashMap<MaterializedField> expectedFields = new IntObjectOpenHashMap<MaterializedField>();
-    
-    private boolean hasSelectionVector;
-    
-    public BatchSchemaBuilder(BatchSchema expected){
-      for(MaterializedField f: expected){
-        expectedFields.put(f.getFieldId(), f);
-      }
-      hasSelectionVector = expected.hasSelectionVector;
-    }
-    
-    public BatchSchemaBuilder(){
-    }
-    
-    
-    /**
-     * Add a field where we don't have type information.  In this case, DataType will be set to LATEBIND and valueClass will be set to null.
-     * @param fieldId The desired fieldId.  Should be unique for this BatchSchema.
-     * @param nullable Whether this field supports nullability.
-     * @param mode
-     * @throws SchemaChangeException
-     */
-    public void addLateBindField(short fieldId, boolean nullable, ValueMode mode) throws SchemaChangeException{
-      addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
-    }
-    
-    public void setSelectionVector(boolean hasSelectionVector){
-      this.hasSelectionVector = hasSelectionVector;
-    }
-    
-    private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
-      MaterializedField f = new MaterializedField(fieldId, type, nullable, mode, valueClass);
-      if(expectedFields != null){
-        if(!expectedFields.containsKey(f.getFieldId())) throw new SchemaChangeException(String.format("You attempted to add a field for Id An attempt was made to add a duplicate fieldId to the schema.  The offending fieldId was %d", fieldId));
-        f.checkMaterialization(expectedFields.lget());
-      }
-      fields.put(f.getFieldId(), f);
-    }
-    
-    public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
-      if(fields.containsKey(fieldId)) throw new SchemaChangeException(String.format("An attempt was made to add a duplicate fieldId to the schema.  The offending fieldId was %d", fieldId));
-      setTypedField(fieldId, type, nullable, mode, valueClass);
-    }
-    
-    public void replaceTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
-      if(!fields.containsKey(fieldId)) throw new SchemaChangeException(String.format("An attempt was made to replace a field in the schema, however the schema does not currently contain that field id.  The offending fieldId was %d", fieldId));
-      setTypedField(fieldId, type, nullable, mode, valueClass);
-    }
-    
-//    public void addVector(ValueVector<?> v){
-//      
-//    }
-//    
-//    public void replaceVector(ValueVector<?> oldVector, ValueVector<?> newVector){
-//      
-//    }
-    
-    
-    public BatchSchema buildAndClear() throws SchemaChangeException{
-      // check if any fields are unaccounted for.
-      
-      List<MaterializedField> fieldList = Lists.newArrayList();
-      for(MaterializedField f : fields.values){
-        if(f != null) fieldList.add(f);
-      }
-      Collections.sort(fieldList);
-      return new BatchSchema(this.hasSelectionVector, fieldList);
-    }
-  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
new file mode 100644
index 0000000..c19065d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -0,0 +1,59 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+
+public class FragmentWritableBatch{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
+  
+  private final ByteBuf[] buffers;
+  private final FragmentRecordBatch header;
+  
+  public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, WritableBatch batch){
+    this.buffers = batch.getBuffers();
+    FragmentHandle handle = FragmentHandle //
+        .newBuilder() //
+        .setMajorFragmentId(receiveMajorFragmentId) //
+        .setMinorFragmentId(receiveMinorFragmentId) //
+        .setQueryId(queryId) //
+        .build();
+    this.header = FragmentRecordBatch //
+        .newBuilder() //
+        .setIsLastBatch(isLast) //
+        .setDef(batch.getDef()) //
+        .setHandle(handle) //
+        .setSendingMajorFragmentId(sendMajorFragmentId) //
+        .setSendingMinorFragmentId(sendMinorFragmentId) //
+        .build();
+  }
+
+  public ByteBuf[] getBuffers(){
+    return buffers;
+  }
+
+  public FragmentRecordBatch getHeader() {
+    return header;
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
index 403c7a3..d820e0e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
@@ -17,7 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.record;
 
-import org.apache.drill.exec.exception.ExecutionSetupException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 
 public class InvalidValueAccessor extends ExecutionSetupException{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InvalidValueAccessor.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
new file mode 100644
index 0000000..718396e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
+import org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class MajorTypeSerDe {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MajorTypeSerDe.class);
+  
+  
+  public static class De extends StdDeserializer<MajorType> {
+
+    public De() {
+      super(MajorType.class);
+    }
+
+    @Override
+    public MajorType deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
+        JsonProcessingException {
+      return jp.readValueAs(MajorTypeHolder.class).getMajorType();
+    }
+    
+    
+  }
+  
+  
+  public static class Se extends StdSerializer<MajorType> {
+
+    public Se() {
+      super(MajorType.class);
+    }
+
+    @Override
+    public void serialize(MajorType value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
+        JsonGenerationException {
+      MajorTypeHolder holder = MajorTypeHolder.get(value);
+      jgen.writeObject(holder);
+    }
+    
+  }
+  
+  @JsonInclude(Include.NON_NULL)
+  public static class MajorTypeHolder{
+    @JsonProperty("type") public MinorType minorType;
+    public DataMode mode;
+    public Integer width;
+    public Integer precision;
+    public Integer scale;
+    
+    @JsonCreator
+    public MajorTypeHolder(@JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
+      super();
+      this.minorType = minorType;
+      this.mode = mode;
+      this.width = width;
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    private MajorTypeHolder(){}
+    
+    @JsonIgnore
+    public MajorType getMajorType(){
+      MajorType.Builder b = MajorType.newBuilder();
+      b.setMode(mode);
+      b.setMinorType(minorType);
+      if(precision != null) b.setPrecision(precision);
+      if(width != null) b.setWidth(width);
+      if(scale != null) b.setScale(scale);
+      return b.build();
+    }
+    
+    public static MajorTypeHolder get(MajorType mt){
+      MajorTypeHolder h = new MajorTypeHolder();
+      h.minorType = mt.getMinorType();
+      h.mode = mt.getMode();
+      if(mt.hasPrecision()) h.precision = mt.getPrecision();
+      if(mt.hasScale()) h.scale = mt.getScale();
+      if(mt.hasWidth()) h.width = mt.getWidth();
+      return h;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 2e941a2..09427ef 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -17,67 +17,152 @@
  ******************************************************************************/
 package org.apache.drill.exec.record;
 
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.exception.SchemaChangeException;
-
-public class MaterializedField implements Comparable<MaterializedField>{
-  private int fieldId;
-  private DataType type;
-  private boolean nullable;
-  private ValueMode mode;
-  private Class<?> valueClass;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.RecordField.ValueMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.vector.TypeHelper;
+
+public class MaterializedField implements Comparable<MaterializedField> {
+  private final FieldDef def;
+
+  public MaterializedField(FieldDef def) {
+    this.def = def;
+  }
+  
+  public static MaterializedField create(FieldDef def){
+    return new MaterializedField(def);
+  }
+  
+  public static MaterializedField create(SchemaPath path, int fieldId, int parentId, MajorType type) {
+    FieldDef.Builder b = FieldDef.newBuilder();
+    b.setFieldId(fieldId);
+    b.setMajorType(type);
+    addSchemaPathToFieldDef(path, b);
+    b.setParentId(parentId);
+    return create(b.build());
+  }
+
+  private static void addSchemaPathToFieldDef(SchemaPath path, FieldDef.Builder builder) {
+    for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
+      NamePart.Builder b = NamePart.newBuilder();
+      if (p.isArray()) {
+        b.setType(Type.ARRAY);
+      } else {
+        b.setName(p.getNameSegment().getPath().toString());
+        b.setType(Type.NAME);
+      }
+      builder.addName(b.build());
+      if(p.isLastPath()) break;
+    }
+  }
+
+  public FieldDef getDef() {
+    return def;
+  }
   
-  public MaterializedField(int fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) {
-    super();
-    this.fieldId = fieldId;
-    this.type = type;
-    this.nullable = nullable;
-    this.mode = mode;
-    this.valueClass = valueClass;
+  public String getName(){
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for(NamePart np : def.getNameList()){
+      if(np.getType() == Type.ARRAY){
+        sb.append("[]");
+      }else{
+        if(first){
+          first = false;
+        }else{
+          sb.append(".");
+        }
+        sb.append(np.getName());
+        
+      }
+    }
+    return sb.toString();
+  }
+
+  public int getWidth() {
+    return def.getMajorType().getWidth();
   }
 
   public int getFieldId() {
-    return fieldId;
+    return def.getFieldId();
   }
 
-  public DataType getType() {
-    return type;
+  public MajorType getType() {
+    return def.getMajorType();
   }
 
   public boolean isNullable() {
-    return nullable;
+    return def.getMajorType().getMode() == DataMode.OPTIONAL;
   }
 
-  public ValueMode getMode() {
-    return mode;
+  public DataMode getDataMode() {
+    return def.getMajorType().getMode();
   }
 
   public Class<?> getValueClass() {
-    return valueClass;
+    return TypeHelper.getValueVectorClass(getType().getMinorType(), getDataMode());
   }
 
-  private void check(String name, Object val1, Object expected) throws SchemaChangeException{
-    if(expected.equals(val1)) return;
-    throw new SchemaChangeException("Expected and actual field definitions don't match. Actual %s: %s, expected %s: %s", name, val1, name, expected);
-  }
-  
-  public void checkMaterialization(MaterializedField expected) throws SchemaChangeException{
-    if(this.type == expected.type || expected.type == DataType.LATEBIND) throw new SchemaChangeException("Expected and actual field definitions don't match. Actual DataType: %s, expected DataTypes: %s", this.type, expected.type);
-    if(expected.valueClass != null) check("valueClass", this.valueClass, expected.valueClass);
-    check("fieldId", this.fieldId, expected.fieldId);
-    check("nullability", this.nullable, expected.nullable);
-    check("valueMode", this.mode, expected.mode);
-  }
+  public boolean matches(SchemaPath path) {
+    Iterator<NamePart> iter = def.getNameList().iterator();
+    
+    for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
+      if(p == null) break;
+      if (!iter.hasNext()) return false;
+      NamePart n = iter.next();
+
+      if (p.isArray()) {
+        if (n.getType() == Type.ARRAY) continue;
+        return false;
+      } else {
+        if (p.getNameSegment().getPath().equals(n.getName())) continue;
+        return false;
+      }
+      
+    }
+    // we've reviewed all path segments. confirm that we don't have any extra name parts.
+    return !iter.hasNext();
 
-  public MaterializedField getNullableVersion(Class<?> valueClass){
-    return new MaterializedField(fieldId, type, true, mode, valueClass);
   }
-  
+
+  // private void check(String name, Object val1, Object expected) throws SchemaChangeException{
+  // if(expected.equals(val1)) return;
+  // throw new
+  // SchemaChangeException("Expected and actual field definitions don't match. Actual %s: %s, expected %s: %s", name,
+  // val1, name, expected);
+  // }
+
+  // public void checkMaterialization(MaterializedField expected) throws SchemaChangeException{
+  // if(this.type == expected.type || expected.type == DataType.LATEBIND) throw new
+  // SchemaChangeException("Expected and actual field definitions don't match. Actual DataType: %s, expected DataTypes: %s",
+  // this.type, expected.type);
+  // if(expected.valueClass != null) check("valueClass", this.valueClass, expected.valueClass);
+  // check("fieldId", this.fieldId, expected.fieldId);
+  // check("nullability", this.nullable, expected.nullable);
+  // check("valueMode", this.mode, expected.mode);
+  // }
+  //
+  // public MaterializedField getNullableVersion(Class<?> valueClass){
+  // return new MaterializedField(path, fieldId, type, true, mode, valueClass);
+  // }
+
   @Override
   public int compareTo(MaterializedField o) {
-    return Integer.compare(this.fieldId, o.fieldId);
+    return Integer.compare(this.getFieldId(), o.getFieldId());
   }
-  
+
+  @Override
+  public String toString() {
+    return "MaterializedField [" + def.toString() + "]";
+  }
+
   
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
new file mode 100644
index 0000000..c244cea
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch;
+
+public class RawFragmentBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class);
+
+  final FragmentRecordBatch header;
+  final ByteBuf body;
+
+  public RawFragmentBatch(FragmentRecordBatch header, ByteBuf body) {
+    super();
+    this.header = header;
+    this.body = body;
+  }
+
+  public FragmentRecordBatch getHeader() {
+    return header;
+  }
+
+  public ByteBuf getBody() {
+    return body;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
new file mode 100644
index 0000000..08b0e11
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -0,0 +1,27 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.ops.FragmentContext;
+
+public interface RawFragmentBatchProvider {
+  
+  public RawFragmentBatch getNext();
+  public void kill(FragmentContext context);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index eca62bb..3e4ded2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -34,8 +34,9 @@ public interface RecordBatch {
     NONE, // No more records were found.
     OK, // A new range of records have been provided.
     OK_NEW_SCHEMA, // A full collection of records
-    STOP // Informs parent nodes that the query has terminated. In this case, a consumer can consume their QueryContext
+    STOP, // Informs parent nodes that the query has terminated. In this case, a consumer can consume their QueryContext
          // to understand the current state of things.
+    NOT_YET // used by batches that haven't received incoming data yet.
   }
 
   /**
@@ -81,5 +82,11 @@ public interface RecordBatch {
    * @return An IterOutcome describing the result of the iteration.
    */
   public IterOutcome next();
+  
+  /**
+   * Get a writable version of this batch.  Takes over owernship of existing buffers.
+   * @return
+   */
+  public WritableBatch getWritableBatch();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
new file mode 100644
index 0000000..d990198
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -0,0 +1,143 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.cursors.IntObjectCursor;
+
+public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector<?>>>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
+
+  private IntObjectOpenHashMap<ValueVector<?>> vectors = new IntObjectOpenHashMap<ValueVector<?>>();
+  private final BufferAllocator allocator;
+  private int recordCount; 
+  private BatchSchema schema;
+  
+  public RecordBatchLoader(BufferAllocator allocator) {
+    super();
+    this.allocator = allocator;
+  }
+
+  /**
+   * Load a record batch from a single buffer.
+   * 
+   * @param def
+   *          The definition for the record batch.
+   * @param buf
+   *          The buffer that holds the data ssociated with the record batch
+   * @return Whether or not the schema changed since the previous load.
+   * @throws SchemaChangeException 
+   */
+  public boolean load(RecordBatchDef def, ByteBuf buf) throws SchemaChangeException {
+//    logger.debug("Loading record batch with def {} and data {}", def, buf);
+    this.recordCount = def.getRecordCount();
+    boolean schemaChanged = false;
+    
+    IntObjectOpenHashMap<ValueVector<?>> newVectors = new IntObjectOpenHashMap<ValueVector<?>>();
+
+    List<FieldMetadata> fields = def.getFieldList();
+    
+    int bufOffset = 0;
+    for (FieldMetadata fmd : fields) {
+      FieldDef fieldDef = fmd.getDef();
+      ValueVector<?> v = vectors.remove(fieldDef.getFieldId());
+      if (v != null) {
+        if (v.getField().getDef().equals(fieldDef)) {
+          v.setTo(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+          newVectors.put(fieldDef.getFieldId(), v);
+          continue;
+        } else {
+          v.close();
+          v = null;
+        }
+      }
+      // if we arrive here, either the metadata didn't match, or we didn't find a vector.
+      schemaChanged = true;
+      MaterializedField m = new MaterializedField(fieldDef);
+      v = TypeHelper.getNewVector(m, allocator);
+      v.setTo(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+      newVectors.put(fieldDef.getFieldId(), v);
+    }
+    
+    if(!vectors.isEmpty()){
+      schemaChanged = true;
+      for(IntObjectCursor<ValueVector<?>> cursor : newVectors){
+        cursor.value.close();
+      }
+      
+    }
+    
+    if(schemaChanged){
+      // rebuild the schema.
+      SchemaBuilder b = BatchSchema.newBuilder();
+      for(IntObjectCursor<ValueVector<?>> cursor : newVectors){
+        b.addField(cursor.value.getField());
+      }
+      b.setSelectionVector(false);
+      this.schema = b.build();
+    }
+    vectors = newVectors;
+    return schemaChanged;
+
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+    ValueVector<?> v = vectors.get(fieldId);
+    assert v != null;
+    if (v.getClass() != clazz)
+      throw new InvalidValueAccessor(String.format(
+          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
+          clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+    return (T) v;
+  }
+
+  public int getRecordCount() {
+    return recordCount;
+  }
+
+
+  public WritableBatch getWritableBatch(){
+    return WritableBatch.get(recordCount, vectors);
+  }
+
+  @Override
+  public Iterator<IntObjectCursor<ValueVector<?>>> iterator() {
+    return this.vectors.iterator();
+  }
+
+  public BatchSchema getSchema(){
+    return schema;
+  }
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
new file mode 100644
index 0000000..1e25b1a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
@@ -0,0 +1,127 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.expression.types.DataType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.RecordField.ValueMode;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.google.common.collect.Lists;
+
+/**
+ * A reusable builder that supports the creation of BatchSchemas. Can have a supporting expected object. If the expected Schema object is defined, the
+ * builder will always check that this schema is a equal or more materialized version of the current schema.
+ */
+public class SchemaBuilder {
+  private IntObjectOpenHashMap<MaterializedField> fields = new IntObjectOpenHashMap<MaterializedField>();
+  private IntObjectOpenHashMap<MaterializedField> expectedFields = new IntObjectOpenHashMap<MaterializedField>();
+
+  private boolean hasSelectionVector;
+
+  public SchemaBuilder(BatchSchema expected) {
+    for (MaterializedField f : expected) {
+      expectedFields.put(f.getFieldId(), f);
+    }
+    hasSelectionVector = expected.hasSelectionVector;
+  }
+
+  SchemaBuilder() {
+  }
+
+  /**
+   * Add a field where we don't have type information. In this case, DataType will be set to LATEBIND and valueClass
+   * will be set to null.
+   * 
+   * @param fieldId
+   *          The desired fieldId. Should be unique for this BatchSchema.
+   * @param nullable
+   *          Whether this field supports nullability.
+   * @param mode
+   * @throws SchemaChangeException
+   */
+//  public void addLateBindField(short fieldId, boolean nullable, ValueMode mode) throws SchemaChangeException {
+//    addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
+//  }
+
+  public void setSelectionVector(boolean hasSelectionVector) {
+    this.hasSelectionVector = hasSelectionVector;
+  }
+
+  
+//  private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
+//      throws SchemaChangeException {
+//    MaterializedField f = new MaterializedField(fieldId, type, nullable, mode, valueClass);
+//    if (expectedFields != null) {
+//      if (!expectedFields.containsKey(f.getFieldId()))
+//        throw new SchemaChangeException(
+//            String
+//                .format(
+//                    "You attempted to add a field for Id An attempt was made to add a duplicate fieldId to the schema.  The offending fieldId was %d",
+//                    fieldId));
+//      f.checkMaterialization(expectedFields.lget());
+//    }
+//    fields.put(f.getFieldId(), f);
+//  }
+  
+  public void addField(MaterializedField f){
+    fields.put(f.getFieldId(), f);
+  }
+
+//  public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
+//      throws SchemaChangeException {
+//    if (fields.containsKey(fieldId))
+//      throw new SchemaChangeException(String.format(
+//          "An attempt was made to add a duplicate fieldId to the schema.  The offending fieldId was %d", fieldId));
+//    setTypedField(fieldId, type, nullable, mode, valueClass);
+//  }
+//
+//  public void replaceTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
+//      throws SchemaChangeException {
+//    if (!fields.containsKey(fieldId))
+//      throw new SchemaChangeException(
+//          String.format("An attempt was made to replace a field in the schema, however the schema does " +
+//          		"not currently contain that field id.  The offending fieldId was %d", fieldId));
+//    setTypedField(fieldId, type, nullable, mode, valueClass);
+//  }
+  
+  public void removeField(short fieldId) throws SchemaChangeException{
+    MaterializedField f = fields.remove(fieldId);
+    if(f == null) throw new SchemaChangeException("You attempted to remove an nonexistent field.");
+  }
+
+  /**
+   * Generate a new BatchSchema object based on the current state of the builder.
+   * @return
+   * @throws SchemaChangeException
+   */
+  public BatchSchema build() throws SchemaChangeException {
+    // check if any fields are unaccounted for.
+
+    List<MaterializedField> fieldList = Lists.newArrayList();
+    for (ObjectCursor<MaterializedField> f : fields.values()) {
+      if (f != null) fieldList.add(f.value);
+    }
+    Collections.sort(fieldList);
+    return new BatchSchema(this.hasSelectionVector, fieldList);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
new file mode 100644
index 0000000..788c731
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -0,0 +1,108 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+import com.google.common.collect.Lists;
+
+/**
+ * A specialized version of record batch that can moves out buffers and preps them for writing. 
+ */
+public class WritableBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WritableBatch.class);
+  
+  private final RecordBatchDef def;
+  private final ByteBuf[] buffers;
+  
+  public WritableBatch(RecordBatchDef def, List<ByteBuf> buffers) {
+    logger.debug("Created new writable batch with def {} and buffers {}", def, buffers);
+    this.def = def;
+    this.buffers = buffers.toArray(new ByteBuf[buffers.size()]);
+  }
+  
+  public WritableBatch(RecordBatchDef def, ByteBuf[] buffers) {
+    super();
+    this.def = def;
+    this.buffers = buffers;
+  }
+  
+  
+  public RecordBatchDef getDef(){
+    return def;
+  }
+  public ByteBuf[] getBuffers(){
+    return buffers;
+  }
+  
+//  public static WritableBatch get(ValueVector<?>[] vectors){
+//    WritableCreator c = new WritableCreator();
+//    for(int i =0; i < vectors.length; i++){
+//      c.apply(i, vectors[i]);
+//    }
+//    return c.get();
+//  }
+//  
+  
+  public static WritableBatch get(int recordCount, IntObjectOpenHashMap<ValueVector<?>> fields){
+    WritableCreator creator = new WritableCreator(recordCount);
+    fields.forEach(creator);
+    return creator.get();
+    
+  }
+  
+  private static class WritableCreator implements IntObjectProcedure<ValueVector<?>>{
+    
+    List<ByteBuf> buffers = Lists.newArrayList();
+    List<FieldMetadata> metadata = Lists.newArrayList();
+    private int recordCount;
+    
+
+    public WritableCreator(int recordCount) {
+      super();
+      this.recordCount = recordCount;
+    }
+    
+    @Override
+    public void apply(int key, ValueVector<?> value) {
+      metadata.add(value.getMetadata());
+      for(ByteBuf b : value.getBuffers()){
+        buffers.add(b);
+        b.retain();
+      }
+      // allocate new buffer to release hold on old buffer.
+      value.allocateNew(value.capacity());
+    }
+
+
+    public WritableBatch get(){
+      RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).build();
+      WritableBatch b = new WritableBatch(batchDef, buffers);
+      return b;
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
index 912e02d..b32f067 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.record.vector;
 import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.record.MaterializedField;
 
 /**
  * Abstract class that fixed value vectors are derived from.
@@ -27,12 +29,12 @@ import org.apache.drill.exec.memory.BufferAllocator;
 abstract class AbstractFixedValueVector<T extends AbstractFixedValueVector<T>> extends BaseValueVector<T> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFixedValueVector.class);
 
-  private final int widthInBits;
+  protected final int widthInBits;
 
   protected int longWords = 0;
-
-  public AbstractFixedValueVector(int fieldId, BufferAllocator allocator, int widthInBits) {
-    super(fieldId, allocator);
+  
+  public AbstractFixedValueVector(MaterializedField field, BufferAllocator allocator, int widthInBits) {
+    super(field, allocator);
     this.widthInBits = widthInBits;
   }
   
@@ -56,5 +58,16 @@ abstract class AbstractFixedValueVector<T extends AbstractFixedValueVector<T>> e
     longWords = 0;
   }
 
+  @Override
+  public void setRecordCount(int recordCount) {
+    this.data.writerIndex(recordCount*(widthInBits/8));
+    super.setRecordCount(recordCount);
+  }
+
+
+
+
+
+  
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
index 8d524b2..b001add 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
@@ -19,20 +19,25 @@ package org.apache.drill.exec.record.vector;
 
 import io.netty.buffer.ByteBuf;
 
+import java.util.Random;
+
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.DeadBuf;
+import org.apache.drill.exec.record.MaterializedField;
 
 public abstract class BaseValueVector<T extends BaseValueVector<T>> implements ValueVector<T>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
   
   protected final BufferAllocator allocator;
   protected ByteBuf data = DeadBuf.DEAD_BUFFER;
-  protected int valueCount = 0;
-  protected final int fieldId;
+  protected int maxValueCount = 0;
+  protected final MaterializedField field;
+  private int recordCount;
   
-  public BaseValueVector(int fieldId, BufferAllocator allocator) {
+  public BaseValueVector(MaterializedField field, BufferAllocator allocator) {
     this.allocator = allocator;
-    this.fieldId = fieldId;
+    this.field = field;
   }
 
   public final void allocateNew(int valueCount){
@@ -42,35 +47,42 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
     resetAllocation(valueCount, newBuf);
   }
 
-  protected abstract int getAllocationSize(int valueCount);
+  protected abstract int getAllocationSize(int maxValueCount);
   protected abstract void childResetAllocation(int valueCount, ByteBuf buf);
   protected abstract void childCloneMetadata(T other);
   protected abstract void childClear();
   
-  protected final void resetAllocation(int valueCount, ByteBuf buf){
+  /**
+   * Update the current buffer allocation utilize the provided allocation.
+   * @param valueCount
+   * @param buf
+   */
+  protected final void resetAllocation(int maxValueCount, ByteBuf buf){
     clear();
-    this.valueCount = valueCount;
+    buf.retain();
+    this.maxValueCount = maxValueCount;
     this.data = buf;
-    childResetAllocation(valueCount, buf);
+    childResetAllocation(maxValueCount, buf);
   }
   
   public final void cloneMetadata(T other){
-    other.valueCount = this.valueCount;
+    other.maxValueCount = this.maxValueCount;
   }
   
+  
   @Override
   public final void cloneInto(T vector) {
-    vector.allocateNew(valueCount);
+    vector.allocateNew(maxValueCount);
     data.writeBytes(vector.data);
     cloneMetadata(vector);
-    childResetAllocation(valueCount, vector.data);
+    childResetAllocation(maxValueCount, vector.data);
   }
   
   @Override
   public final void transferTo(T vector) {
     vector.data = this.data;
     cloneMetadata(vector);
-    childResetAllocation(valueCount, data);
+    childResetAllocation(maxValueCount, data);
     clear();
   }
 
@@ -78,7 +90,7 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
     if(this.data != DeadBuf.DEAD_BUFFER){
       this.data.release();
       this.data = DeadBuf.DEAD_BUFFER;
-      this.valueCount = 0;
+      this.maxValueCount = 0;
     }
     childClear();
   }
@@ -88,8 +100,8 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
    * 
    * @return
    */
-  public int size() {
-    return valueCount;
+  public int capacity() {
+    return maxValueCount;
   }
   
   @Override
@@ -98,8 +110,48 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
   }
 
   @Override
-  public ByteBuf getBuffer() {
-    return data;
+  public ByteBuf[] getBuffers() {
+    return new ByteBuf[]{data};
+  }
+  
+  public MaterializedField getField(){
+    return field;
+  }
+  
+  
+  public int getRecordCount() {
+    return recordCount;
+  }
+
+  public void setRecordCount(int recordCount) {
+    this.recordCount = recordCount;
+  }
+
+  @Override
+  public FieldMetadata getMetadata() {
+    int len = 0;
+    for(ByteBuf b : getBuffers()){
+      len += b.writerIndex();
+    }
+    return FieldMetadata.newBuilder().setDef(getField().getDef()).setValueCount(getRecordCount()).setBufferLength(len).build();
+  }
+  
+  @Override
+  public void setTo(FieldMetadata metadata, ByteBuf data) {
+//    logger.debug("Updating value vector to {}, {}", metadata, data);
+    clear();
+    resetAllocation(metadata.getValueCount(), data);
+  }
+
+  @Override
+  public void randomizeData() {
+    if(this.data != DeadBuf.DEAD_BUFFER){
+      Random r = new Random();
+      for(int i =0; i < data.capacity()-8; i+=8){
+        data.setLong(i, r.nextLong());
+      }
+    }
+    
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Bit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Bit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Bit.java
new file mode 100644
index 0000000..533e3bd
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Bit.java
@@ -0,0 +1,168 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.common.expression.types.DataType;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.RecordField.ValueMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Describes a vector which holds a number of true/false values.
+ */
+public class Bit extends AbstractFixedValueVector<Bit> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Bit.class);
+
+  public Bit(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator, 1);
+  }
+
+  
+//  /** Returns true or false for the specified bit index.
+//   * The index should be less than the OpenBitSet size
+//   */
+//  public boolean get(int index) {
+//    assert index >= 0 && index < this.valueCount;
+//    int i = index >> 3;               // div 8
+//    // signed shift will keep a negative index and force an
+//    // array-index-out-of-bounds-exception, removing the need for an explicit check.
+//    int bit = index & 0x3f;           // mod 64
+//    long bitmask = 1L << bit;
+//    return (data.getLong(i) & bitmask) != 0;
+//  }
+  
+  public int getBit(int index) {
+    
+    assert index >= 0 && index < this.maxValueCount;
+    int i = 8*(index >> 6); // div 8
+    int bit = index & 0x3f; // mod 64
+    return ((int) (data.getLong(i) >>> bit)) & 0x01;
+  }
+  
+  /** Sets the bit at the specified index.
+   * The index should be less than the OpenBitSet size.
+   */
+   public void set(int index) {
+     assert index >= 0 && index < this.maxValueCount;
+     int wordNum = index >> 3;   
+     int bit = index & 0x3f;
+     long bitmask = 1L << bit;
+     data.setLong(wordNum, data.getLong(wordNum) | bitmask);
+   }
+   
+   public void clear(int index) {
+     assert index >= 0 && index < this.maxValueCount;
+     int wordNum = index >> 3;
+     int bit = index & 0x03f;
+     long bitmask = 1L << bit;
+     data.setLong(wordNum, data.getLong(wordNum) & ~bitmask);
+   }
+   
+   
+   
+   /** Clears a range of bits.  Clearing past the end does not change the size of the set.
+   *
+   * @param startBitIndex lower index
+   * @param lastBitIndex one-past the last bit to clear
+   */
+  private void clear2(int startBitIndex, int lastBitIndex) {
+    if (lastBitIndex <= startBitIndex) return;
+
+    int firstWordStart = (startBitIndex>>3);
+    if (firstWordStart >= this.longWords) return;
+
+    // since endIndex is one past the end, this is index of the last
+    // word to be changed.
+    int lastWordStart   = ((lastBitIndex-1)>>3);
+
+    long startmask = -1L << startBitIndex;
+    long endmask = -1L >>> -lastBitIndex;  // 64-(endIndex&0x3f) is the same as -endIndex due to wrap
+
+    // invert masks since we are clearing
+    startmask = ~startmask;
+    endmask = ~endmask;
+
+    if (firstWordStart == lastWordStart) {
+      data.setLong(firstWordStart,  data.getLong(firstWordStart) & (startmask | endmask));
+      return;
+    }
+    data.setLong(firstWordStart,  data.getLong(firstWordStart) & startmask);
+
+    int middle = Math.min(this.longWords, lastWordStart);
+    
+    for(int i =firstWordStart+8; i < middle; i += 8){
+      data.setLong(i, 0L);
+    }
+    if (lastWordStart < this.longWords) {
+      data.setLong(lastWordStart,  data.getLong(lastWordStart) & endmask);
+    }
+  }
+  
+  public void setAllFalse(){
+    clear(0, maxValueCount);
+  }
+
+  
+  public void clear(int startIndex, int endIndex) {
+    if (endIndex <= startIndex) return;
+
+    int startWord = (startIndex >> 6);
+    if (startWord >= longWords) return;
+
+    // since endIndex is one past the end, this is index of the last
+    // word to be changed.
+    int endWord = ((endIndex - 1) >> 6);
+
+    long startmask = -1L << startIndex;
+    long endmask = -1L >>> -endIndex; // 64-(endIndex&0x3f) is the same as -endIndex due to wrap
+
+    // invert masks since we are clearing
+    startmask = ~startmask;
+    endmask = ~endmask;
+    
+    int startWordPos = startWord * 8;
+    if (startWord == endWord) {
+      data.setLong(startWordPos, data.getLong(startWordPos) & (startmask | endmask));
+      return;
+    }
+
+    int endWordPos = endWord * 8;
+
+    data.setLong(startWordPos, data.getLong(startWordPos) & startmask);
+
+    int middle = Math.min(longWords, endWord)*8;
+    
+    
+    for(int i =startWordPos+8; i < middle; i += 8){
+      data.setLong(i, 0L);
+    }
+    
+    if (endWordPos < startWordPos) {
+      data.setLong(endWordPos, data.getLong(endWordPos) & endmask);
+    }
+  }
+
+
+  @Override
+  public Object getObject(int index) {
+    return this.getBit(index);
+  }
+  
+  
+}


Mime
View raw message