drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [12/38] DRILL-257: Move SQL parsing to server side. Switch to Avatica based JDBC driver. Update QuerySubmitter to support SQL queries. Update SqlAccesors to support getObject() Remove ref, clean up SQL packages some. Various performance fixes. Updating
Date Tue, 04 Mar 2014 08:07:39 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index e4f8e7b..329815d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,43 +17,58 @@
  */
 package org.apache.drill.exec.work.foreman;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.exception.OptimizerException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.opt.BasicOptimizer;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
 import org.apache.drill.exec.planner.fragment.PlanningSet;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
 import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.util.AtomicState;
+import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.exec.work.ErrorHelper;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 
 /**
@@ -151,6 +166,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       
       case LOGICAL:
         parseAndRunLogicalPlan(queryRequest.getPlan());
+        
         break;
       case PHYSICAL:
         parseAndRunPhysicalPlan(queryRequest.getPlan());
@@ -170,8 +186,18 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     
     try {
       LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
+      
+      if(logicalPlan.getProperties().resultMode == ResultMode.LOGICAL){
+        fail("Failure running plan.  You requested a result mode of LOGICAL and submitted a logical plan.  In this case you're output mode must be PHYSICAL or EXEC.", new Exception());
+      }      
       if(logger.isDebugEnabled()) logger.debug("Logical {}", logicalPlan.unparse(context.getConfig()));
       PhysicalPlan physicalPlan = convert(logicalPlan);
+      
+      if(logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL){
+        returnPhysical(physicalPlan);
+        return;
+      }
+      
       if(logger.isDebugEnabled()) logger.debug("Physical {}", context.getConfig().getMapper().writeValueAsString(physicalPlan));
       runPhysicalPlan(physicalPlan);
     } catch (IOException e) {
@@ -181,6 +207,74 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
   }
 
+  
+  private void returnLogical(LogicalPlan plan){
+    String jsonPlan = plan.toJsonStringSafe(context.getConfig());
+    sendSingleString("logical", jsonPlan);
+  }
+  
+  private void returnPhysical(PhysicalPlan plan){
+    String jsonPlan = plan.unparse(context.getConfig().getMapper().writer());
+    sendSingleString("physical", jsonPlan);
+  }
+  
+  private void sendSingleString(String columnName, String value){
+    MaterializedField f = MaterializedField.create(new SchemaPath(columnName, ExpressionPosition.UNKNOWN), Types.required(MinorType.VARCHAR));
+    VarCharVector vector = new VarCharVector(f, bee.getContext().getAllocator());
+    byte[] bytes = value.getBytes(Charsets.UTF_8);
+    vector.allocateNew(bytes.length, 1);
+    vector.getMutator().set(0, bytes);
+    vector.getMutator().setValueCount(1);
+    QueryResult header = QueryResult.newBuilder() //
+        .setQueryId(context.getQueryId()) //
+        .setRowCount(1) //
+        .setDef(RecordBatchDef.newBuilder().addField(vector.getMetadata()).build()) //
+        .setIsLastChunk(false) //
+        .build();
+    QueryWritableBatch b1 = new QueryWritableBatch(header, vector.getBuffers());
+    vector.close();
+    
+    QueryResult header2 = QueryResult.newBuilder() //
+        .setQueryId(context.getQueryId()) //
+        .setRowCount(0) //
+        .setDef(RecordBatchDef.getDefaultInstance()) //
+        .setIsLastChunk(true) //
+        .build();
+    QueryWritableBatch b2 = new QueryWritableBatch(header2);
+    
+    SingleListener l = new SingleListener();
+    this.initiatingClient.sendResult(l, b1);
+    this.initiatingClient.sendResult(l, b2);
+    l.acct.waitForSendComplete();
+    
+  }
+  
+  
+  class SingleListener implements RpcOutcomeListener<Ack>{
+
+    final SendingAccountor acct;
+    
+    public SingleListener(){
+      acct  = new SendingAccountor();
+      acct.increment();
+      acct.increment();
+    }
+    
+    @Override
+    public void failed(RpcException ex) {
+      acct.decrement();
+      fail("Failure while sending single result.", ex);
+    }
+
+    @Override
+    public void success(Ack value, ByteBuf buffer) {
+      acct.decrement();
+    }
+    
+  }
+  
+
+  
   private void parseAndRunPhysicalPlan(String json) {
     try {
       PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
@@ -192,7 +286,11 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   private void runPhysicalPlan(PhysicalPlan plan) {
 
+    if(plan.getProperties().resultMode != ResultMode.EXEC){
+      fail(String.format("Failure running plan.  You requested a result mode of %s and a physical plan can only be output as EXEC", plan.getProperties().resultMode), new Exception());
+    }
     PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+    
     MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor();
     Fragment rootFragment;
     try {
@@ -218,18 +316,26 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       List<PlanFragment> intermediateFragments = Lists.newArrayList();
 
       // store fragments in distributed grid.
+      logger.debug("Storing fragments");
       for (PlanFragment f : work.getFragments()) {
         
         // store all fragments in grid since they are part of handshake.
+        
         context.getCache().storeFragment(f);
         if (f.getLeafFragment()) {
           leafFragments.add(f);
         } else {
           intermediateFragments.add(f);
         }
+        
+        
       }
 
+      logger.debug("Fragments stored.");
+      
+      logger.debug("Submitting fragments to run.");
       fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments, intermediateFragments);
+      logger.debug("Fragments running.");
 
     
     } catch (ExecutionSetupException | RpcException e) {
@@ -238,11 +344,32 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   }
 
-  private void runSQL(String json) {
-    throw new UnsupportedOperationException();
+  private void runSQL(String sql) {
+    try{
+      DrillSqlWorker sqlWorker = new DrillSqlWorker(context.getSchemaFactory(), context.getFunctionRegistry());
+      LogicalPlan plan = sqlWorker.getPlan(sql);
+      
+
+      if(plan.getProperties().resultMode == ResultMode.LOGICAL){
+        returnLogical(plan);
+        return;
+      }
+
+      PhysicalPlan physical = convert(plan);
+      
+      if(plan.getProperties().resultMode == ResultMode.PHYSICAL){
+        returnPhysical(physical);
+        return;
+      }
+      
+      runPhysicalPlan(physical);
+    }catch(Exception e){
+      fail("Failure while parsing sql.", e);
+    }
   }
 
   private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
+    if(logger.isDebugEnabled()) logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
     return new BasicOptimizer(DrillConfig.create(), context).optimize(new BasicOptimizer.BasicOptimizationContext(), plan);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index e9302e1..eaf921d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -76,18 +76,25 @@ class QueryManager implements FragmentStatusListener{
   }
 
   public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments, List<PlanFragment> intermediateFragments) throws ExecutionSetupException{
+    logger.debug("Setting up fragment runs.");
     remainingFragmentCount.set(leafFragments.size()+1);
     queryId = rootFragment.getHandle().getQueryId();
     workBus = bee.getContext().getWorkBus();
 
     // set up the root fragment first so we'll have incoming buffers available.
     {
-      FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+      logger.debug("Setting up root context.");
+      FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext().getFunctionImplementationRegistry());
+      logger.debug("Setting up incoming buffers");
       IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+      logger.debug("Setting buffers on root context.");
       rootContext.setBuffers(buffers);
+      logger.debug("Generating Exec tree");
       RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
+      logger.debug("Exec tree generated.");
       // add fragment to local node.
       map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+      logger.debug("Fragment added to local node.");
       rootRunner = new FragmentExecutor(rootContext, rootExec, new RootStatusHandler(rootContext, rootFragment));
       RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
       
@@ -99,6 +106,7 @@ class QueryManager implements FragmentStatusListener{
         workBus.setRootFragmentManager(fragmentManager);  
       }
       
+      
     }
 
     // keep track of intermediate fragments (not root or leaf)
@@ -112,6 +120,7 @@ class QueryManager implements FragmentStatusListener{
       sendRemoteFragment(f);
     }
     
+    logger.debug("Fragment runs setup is complete.");
   }
     
   private void sendRemoteFragment(PlanFragment fragment){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/resources/storage-engines.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/storage-engines.json b/exec/java-exec/src/main/resources/storage-engines.json
new file mode 100644
index 0000000..d1d0413
--- /dev/null
+++ b/exec/java-exec/src/main/resources/storage-engines.json
@@ -0,0 +1,29 @@
+{
+  "storage":{
+    "parquet-local" :
+      {
+        "type":"parquet",
+        "dfsName" : "file:///"
+      },
+    "parquet-cp" :
+      {
+        "type":"parquet",
+        "dfsName" : "classpath:///"
+      },
+    "jsonl" :
+      {
+        "type":"json",
+        "dfsName" : "file:///"
+      },
+    "json-cp" :
+      {
+        "type":"json",
+        "dfsName" : "classpath:///"
+      },
+    "parquet" :
+      {
+        "type":"parquet",
+        "dfsName" : "file:///"
+      }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index b4dc943..454b15f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -171,12 +171,7 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
             }
 
             ValueVector.Accessor accessor = v.getValueVector().getAccessor();
-
-            if (v.getField().getType().getMinorType() == TypeProtos.MinorType.VARCHAR) {
-              System.out.println(new String((byte[]) accessor.getObject(r), UTF_8));
-            } else {
-              System.out.print(accessor.getObject(r));
-            }
+            System.out.print(accessor.getObject(r));
           }
           if (!first) System.out.println();
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index bffc427..2be3e8d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -145,10 +145,10 @@ public class JSONRecordReaderTest {
     assertEquals(3, addFields.size());
     assertField(addFields.get(0), 0, MinorType.INT, 123, "test");
     assertField(addFields.get(1), 0, MinorType.BIT, true, "b");
-    assertField(addFields.get(2), 0, MinorType.VARCHAR, "hi!".getBytes(UTF_8), "c");
+    assertField(addFields.get(2), 0, MinorType.VARCHAR, "hi!", "c");
     assertField(addFields.get(0), 1, MinorType.INT, 1234, "test");
     assertField(addFields.get(1), 1, MinorType.BIT, false, "b");
-    assertField(addFields.get(2), 1, MinorType.VARCHAR, "drill!".getBytes(UTF_8), "c");
+    assertField(addFields.get(2), 1, MinorType.VARCHAR, "drill!", "c");
 
     assertEquals(0, jr.next());
     assertTrue(mutator.getRemovedFields().isEmpty());
@@ -178,19 +178,19 @@ public class JSONRecordReaderTest {
     assertField(addFields.get(1), 0, MinorType.INT, 1, "b");
     assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 2.15, "c");
     assertField(addFields.get(3), 0, MinorType.BIT, true, "bool");
-    assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1".getBytes(UTF_8), "str1");
+    assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1", "str1");
 
     assertField(addFields.get(0), 1, MinorType.INT, 1234, "test");
     assertField(addFields.get(1), 1, MinorType.INT, 3, "b");
     assertField(addFields.get(3), 1, MinorType.BIT, false, "bool");
-    assertField(addFields.get(4), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "str1");
+    assertField(addFields.get(4), 1, MinorType.VARCHAR, "test2", "str1");
     assertField(addFields.get(5), 1, MinorType.INT, 4, "d");
 
     assertField(addFields.get(0), 2, MinorType.INT, 12345, "test");
     assertField(addFields.get(2), 2, MinorType.FLOAT4, (float) 5.16, "c");
     assertField(addFields.get(3), 2, MinorType.BIT, true, "bool");
     assertField(addFields.get(5), 2, MinorType.INT, 6, "d");
-    assertField(addFields.get(6), 2, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
+    assertField(addFields.get(6), 2, MinorType.VARCHAR, "test3", "str2");
     assertTrue(mutator.getRemovedFields().isEmpty());
     assertEquals(0, jr.next());
   }
@@ -220,14 +220,14 @@ public class JSONRecordReaderTest {
     assertField(addFields.get(1), 0, MinorType.INT, 1, "b");
     assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 2.15, "c");
     assertField(addFields.get(3), 0, MinorType.BIT, true, "bool");
-    assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1".getBytes(UTF_8), "str1");
+    assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1", "str1");
     assertTrue(removedFields.isEmpty());
     assertEquals(1, jr.next());
     assertEquals(6, addFields.size());
     assertField(addFields.get(0), 0, MinorType.INT, 1234, "test");
     assertField(addFields.get(1), 0, MinorType.INT, 3, "b");
     assertField(addFields.get(3), 0, MinorType.BIT, false, "bool");
-    assertField(addFields.get(4), 0, MinorType.VARCHAR, "test2".getBytes(UTF_8), "str1");
+    assertField(addFields.get(4), 0, MinorType.VARCHAR, "test2", "str1");
     assertField(addFields.get(5), 0, MinorType.INT, 4, "d");
     assertEquals(1, removedFields.size());
     assertEquals("c", removedFields.get(0).getName());
@@ -238,7 +238,7 @@ public class JSONRecordReaderTest {
     assertField(addFields.get(3), 0, MinorType.BIT, true, "bool");
     assertField(addFields.get(5), 0, MinorType.INT, 6, "d");
     assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 5.16, "c");
-    assertField(addFields.get(6), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
+    assertField(addFields.get(6), 0, MinorType.VARCHAR, "test3", "str2");
     assertEquals(2, removedFields.size());
     Iterables.find(removedFields, new Predicate<MaterializedField>() {
       @Override
@@ -274,10 +274,10 @@ public class JSONRecordReaderTest {
     assertEquals(2, jr.next());
     assertEquals(3, addFields.size());
     assertField(addFields.get(0), 0, MinorType.INT, 123, "test");
-    assertField(addFields.get(1), 0, MinorType.VARCHAR, "test".getBytes(UTF_8), "a.b");
+    assertField(addFields.get(1), 0, MinorType.VARCHAR, "test", "a.b");
     assertField(addFields.get(2), 0, MinorType.BIT, true, "a.a.d");
     assertField(addFields.get(0), 1, MinorType.INT, 1234, "test");
-    assertField(addFields.get(1), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "a.b");
+    assertField(addFields.get(1), 1, MinorType.VARCHAR, "test2", "a.b");
     assertField(addFields.get(2), 1, MinorType.BIT, false, "a.a.d");
 
     assertEquals(0, jr.next());
@@ -308,7 +308,7 @@ public class JSONRecordReaderTest {
     assertField(addFields.get(3), 0, MinorType.INT, Arrays.asList(7, 8, 9), "test3.b");
     assertField(addFields.get(4), 0, MinorType.INT, Arrays.asList(10, 11, 12), "test3.c.d");
     assertField(addFields.get(5), 0, MinorType.FLOAT4, Arrays.<Float>asList((float) 1.1, (float) 1.2, (float) 1.3), "testFloat");
-    assertField(addFields.get(6), 0, MinorType.VARCHAR, Arrays.asList("hello".getBytes(UTF_8), "drill".getBytes(UTF_8)), "testStr");
+    assertField(addFields.get(6), 0, MinorType.VARCHAR, Arrays.asList("hello", "drill"), "testStr");
     assertField(addFields.get(1), 1, MinorType.INT, Arrays.asList(1, 2), "test2");
     assertField(addFields.get(2), 1, MinorType.INT, Arrays.asList(7, 7, 7, 8), "test3.a");
     assertField(addFields.get(5), 1, MinorType.FLOAT4, Arrays.<Float>asList((float) 2.2, (float) 2.3,(float) 2.4), "testFloat");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
index 733cb1d..8efc762 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
@@ -17,10 +17,13 @@
  */
 package org.apache.drill.exec.store;
 
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.SettableFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static parquet.column.Encoding.PLAIN;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.types.TypeProtos;
@@ -34,6 +37,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.server.Drillbit;
@@ -54,10 +58,10 @@ import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
-import java.util.*;
-
-import static org.junit.Assert.*;
-import static parquet.column.Encoding.PLAIN;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.SettableFuture;
 
 public class ParquetRecordReaderTest {
   org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
@@ -368,7 +372,7 @@ public class ParquetRecordReaderTest {
     }
 
     @Override
-    public void resultArrived(QueryResultBatch result) {
+    public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
       logger.debug("result arrived in test batch listener.");
       if(result.getHeader().getIsLastChunk()){
         future.set(null);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
index b765ed0..9c4aeea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
@@ -17,9 +17,10 @@
  */
 package org.apache.drill.exec.store;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Stopwatch;
-import com.google.common.io.Resources;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.client.DrillClient;
@@ -28,6 +29,7 @@ import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.server.Drillbit;
@@ -37,10 +39,9 @@ import org.junit.AfterClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.io.Resources;
 
 public class TestParquetPhysicalPlan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetPhysicalPlan.class);
@@ -93,7 +94,7 @@ public class TestParquetPhysicalPlan {
     }
 
     @Override
-    public void resultArrived(QueryResultBatch result) {
+    public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
       int rows = result.getHeader().getRowCount();
       System.out.println(String.format("Result batch arrived. Number of records: %d", rows));
       count.addAndGet(rows);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index ab29a9f..52430e1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.server.Drillbit;
@@ -139,7 +140,7 @@ public class ParquetRecordReaderTest {
     }
 
     @Override
-    public void resultArrived(QueryResultBatch result) {
+    public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
       long columnValCounter = 0;
       int i = 0;
       FieldInfo currentField;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/pom.xml b/exec/pom.xml
index a463015..399f788 100644
--- a/exec/pom.xml
+++ b/exec/pom.xml
@@ -33,6 +33,5 @@
   <modules>
     <module>bufferl</module>
     <module>java-exec</module>
-    <module>ref</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/pom.xml
----------------------------------------------------------------------
diff --git a/exec/ref/pom.xml b/exec/ref/pom.xml
deleted file mode 100644
index b1b31dd..0000000
--- a/exec/ref/pom.xml
+++ /dev/null
@@ -1,107 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<artifactId>exec-parent</artifactId>
-		<groupId>org.apache.drill.exec</groupId>
-		<version>1.0.0-m2-incubating-SNAPSHOT</version>
-	</parent>
-
-	<artifactId>drill-ref</artifactId>
-
-	<name>exec/Reference Interpreter</name>
-
-	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.drill</groupId>
-			<artifactId>drill-common</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.carrotsearch</groupId>
-			<artifactId>hppc</artifactId>
-			<version>0.4.2</version>
-		</dependency>
-
-	</dependencies>
-
-  <profiles>
-    <profile>
-      <id>default-hadoop</id>
-      <activation>
-        <property>
-          <name>!alt-hadoop</name>
-        </property>
-      </activation>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
-        </dependency>
-      </dependencies>
-    </profile>
-    <profile>
-      <id>mapr</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
-        </dependency>
-      </dependencies>
-    </profile>
-    <profile>
-      <id>cdh4</id>
-      <dependencies>
-        <dependency>
-          <artifactId>hadoop-common</artifactId>
-          <groupId>org.apache.hadoop</groupId>
-        </dependency>
-      </dependencies>
-    </profile>
-    <profile>
-      <id>hdp</id>
-      <dependencies>
-        <dependency>
-          <artifactId>hadoop-common</artifactId>
-          <groupId>org.apache.hadoop</groupId>
-        </dependency>
-      </dependencies>
-    </profile>
-  </profiles>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/BasicStatusHandle.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/BasicStatusHandle.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/BasicStatusHandle.java
deleted file mode 100644
index 06b79fa..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/BasicStatusHandle.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.drill.exec.ref.rops.SinkROP.StatusHandle;
-
-public class BasicStatusHandle implements StatusHandle{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicStatusHandle.class);
-
-  private AtomicBoolean okToContinue = new AtomicBoolean(true);
-  
-  @Override
-  public void progress(long bytes, long records) {
-    logger.debug("Progressed {} bytes, {} records.", bytes, records);
-  }
-
-  @Override
-  public boolean okToContinue() {
-    return okToContinue.get();
-  }
-  
-  public void markCanceled(){
-    okToContinue.set(false);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/ExecRefConstants.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/ExecRefConstants.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/ExecRefConstants.java
deleted file mode 100644
index 447ad81..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/ExecRefConstants.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-public class ExecRefConstants {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecRefConstants.class);
-  
-  public static final String CONFIG_DEFAULT = "drill-exec-ref-default.json";
-  public static final String CONFIG_OVERRIDE = "drill-exec-ref-site.json";
-  public static final String STORAGE_ENGINE_SCAN_PACKAGES = "drill.storage.packages";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/IteratorRegistry.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/IteratorRegistry.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/IteratorRegistry.java
deleted file mode 100644
index 3b73d22..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/IteratorRegistry.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-import org.apache.drill.exec.ref.rops.ROP;
-import org.apache.drill.exec.ref.rops.SinkROP;
-
-import com.google.common.collect.ArrayListMultimap;
-
-public class IteratorRegistry {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorRegistry.class);
-  
-  ArrayListMultimap<LogicalOperator, ROP> map = ArrayListMultimap.create();
-  LinkedList<SinkROP> sinks = new LinkedList<SinkROP>();
-  
-  public void register(LogicalOperator logOp, ROP referenceOperator){
-    map.put(logOp, referenceOperator);
-    if(referenceOperator instanceof SinkROP) sinks.add((SinkROP) referenceOperator);
-  }
-  
-  public void remap(LogicalOperator currentId, LogicalOperator newId){
-    List<ROP> rops = map.removeAll(currentId);
-    map.putAll(newId, rops);
-  }
-  
-  public void swap(LogicalOperator op1, LogicalOperator op2){
-    List<ROP> ops1 = map.removeAll(op1);
-    List<ROP> ops2 = map.removeAll(op2);
-    map.putAll(op1, ops2);
-    map.putAll(op2, ops1);
-  }
-  
-  public List<RecordIterator> getOperator(LogicalOperator o){
-//    logger.debug("Getting iterator for Logical Operator {}", o);
-    if(o == null) throw new SetupException("You requested a Iterator list for a null operator.  This doesn't make any sense.");
-    List<ROP> refOps = map.get(o);
-    List<RecordIterator> iterators = new ArrayList<RecordIterator>(refOps.size());
-    for(ROP r : refOps){
-      RecordIterator iterator = r.getOutput();
-      if(iterator == null) throw new SetupException(String.format("The provided iterator for the reference operator %s is null.", r));
-      iterators.add(iterator);
-    }
-    return iterators;
-  }
-  
-  public List<SinkROP> getSinks(){
-    return sinks;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
deleted file mode 100644
index 5e4b3e4..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Collection;
-
-import org.apache.drill.common.logical.LogicalPlan;
-import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.logical.data.Store;
-import org.apache.drill.common.logical.data.Union;
-import org.apache.drill.exec.ref.eval.EvaluatorFactory;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-import org.apache.drill.exec.ref.rops.ROP;
-import org.apache.drill.exec.ref.rops.ScanROP;
-import org.apache.drill.exec.ref.rops.StoreROP;
-import org.apache.drill.exec.ref.rops.UnionROP;
-import org.apache.drill.exec.ref.rse.RSERegistry;
-import org.apache.drill.exec.ref.rse.ReferenceStorageEngine;
-import org.apache.drill.exec.ref.rse.ReferenceStorageEngine.ReadEntry;
-
-import com.typesafe.config.Config;
-
-class ROPConverter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ROPConverter.class);
-
-  private LogicalPlan plan;
-  private IteratorRegistry registry;
-  private EvaluatorFactory builder;
-  private RSERegistry engineRegistry;
-  private Config config;
-
-  public ROPConverter(LogicalPlan plan, IteratorRegistry registry, EvaluatorFactory builder, RSERegistry engineRegistry) {
-    this.plan = plan;
-    this.registry = registry;
-    this.builder = builder;
-    this.engineRegistry = engineRegistry;
-  }
-
-  public void convert(LogicalOperator o) throws SetupException {
-
-    try {
-      Method m = ROPConverter.class.getMethod("convertSpecific", o.getClass());
-      m.invoke(this, o);
-      return;
-    } catch (NoSuchMethodException e) {
-      // noop
-      logger.debug("There is no convertSpecific method for type {}.  Looking for a generic option...", o.getClass());
-    } catch (IllegalAccessException | IllegalArgumentException | SecurityException e) {
-      logger.debug("Failure while attempting to run convertSpecific value for class {}", o.getClass().getSimpleName(),
-          e);
-    } catch (InvocationTargetException e) {
-      Throwable c = e.getCause();
-      if (c instanceof SetupException) {
-        throw (SetupException) c;
-      } else {
-        throw new RuntimeException("Failure while trying to run convertSpecific conversion of operator type "
-            + o.getClass().getSimpleName(), c);
-      }
-    }
-
-    String name = ROP.class.getPackage().getName() + "." + o.getClass().getSimpleName() + "ROP";
-    try {
-      Class<?> c = Class.forName(name);
-      if (!ROP.class.isAssignableFrom(c)) throw new IllegalArgumentException("Class does not derive from ROP.");
-      Constructor<?> ctor = c.getConstructor(o.getClass());
-      if (ctor == null)
-        throw new IllegalArgumentException(
-            "Class does not have an available constructor that supports a single argument of the class "
-                + o.getClass().getSimpleName());
-      ROP r = (ROP) ctor.newInstance(o);
-      r.init(registry, builder);
-      return;
-
-    } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException | InstantiationException e) {
-      logger.debug("No {} class that accepts a single parameter or type {}.", name, o.getClass().getCanonicalName());
-    } catch (InvocationTargetException e) {
-      Throwable c = e.getCause();
-      if (c instanceof SetupException) throw (SetupException) c;
-      throw new SetupException("Failure while trying to run Convert node of type " + o.getClass().getSimpleName(), c);
-    }
-
-    throw new UnsupportedOperationException("Unable to convert Logical Operator of type "
-        + o.getClass().getCanonicalName());
-  }
-
-  private ReferenceStorageEngine getEngine(String name){
-    StorageEngineConfig config = plan.getStorageEngineConfig(name);
-    if(config == null) throw new SetupException(String.format("Unable to find define storage engine of name [%s].", name));
-    ReferenceStorageEngine engine = engineRegistry.getEngine(config);
-    return engine;
-  }
-  
-  public void convertSpecific(Store store) throws SetupException {
-    StoreROP rop = new StoreROP(store, getEngine(store.getStorageEngine()));
-    rop.init(registry, builder);
-  }
-
-  public void convertSpecific(Scan scan) throws SetupException {
-    StorageEngineConfig engineConfig = plan.getStorageEngineConfig(scan.getStorageEngine());
-    ReferenceStorageEngine engine = engineRegistry.getEngine(engineConfig);
-    Collection<ReadEntry> readEntries;
-    try {
-      readEntries = engine.getReadEntries(scan);
-    } catch (IOException e1) {
-      throw new SetupException("Failure reading input entries.", e1);
-    }
-    
-    switch(readEntries.size()){
-    case 0:
-      throw new SetupException(String.format("Scan provided did not correspond to any available data.", scan));
-    case 1:
-      ScanROP scanner = new ScanROP(scan, readEntries.iterator().next(), engine);
-      scanner.init(registry, builder);
-      return;
-    default:
-      Union logOp = new Union(null, false);
-
-      ROP parentUnion = new UnionROP(logOp);
-      ScanROP[] scanners = new ScanROP[readEntries.size()];
-      int i = 0;
-      for (ReadEntry e : readEntries) {
-        scanners[i] = new ScanROP(scan, e, engine);
-        scanners[i].init(registry, builder);
-        i++;
-      } 
-
-      parentUnion.init(registry, builder);
-      registry.swap(logOp, scan); // make it so future things point to the union as the original scans.
-      return;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordIterator.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordIterator.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordIterator.java
deleted file mode 100644
index 2460db6..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordIterator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-import java.util.Iterator;
-
-import org.apache.drill.exec.ref.rops.ROP;
-
-
-public interface RecordIterator{
-  
-  public enum NextOutcome {NONE_LEFT, INCREMENTED_SCHEMA_UNCHANGED, INCREMENTED_SCHEMA_CHANGED}
-  public RecordPointer getRecordPointer(); // called once
-  public NextOutcome next();
-  public ROP getParent();
-  
-  
-  public static class IteratorWrapper implements RecordIterator{
-    final Iterator<RecordPointer> iter;
-    final RecordPointer outputRecord;
-    final ROP parent;
-    public IteratorWrapper(ROP rop, Iterator<RecordPointer> iter, RecordPointer outputRecord) {
-      this.iter = iter;
-      this.parent = rop;
-      this.outputRecord = outputRecord;
-    }
-    
-    @Override
-    public NextOutcome next() {
-      if(iter.hasNext()) {
-        outputRecord.copyFrom(iter.next());
-        return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
-      }
-      
-      return NextOutcome.NONE_LEFT;
-    }
-
-    @Override
-    public ROP getParent() {
-      return parent;
-    }
-
-    @Override
-    public RecordPointer getRecordPointer() {
-      return outputRecord;
-    }
-
-    
-    
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordPointer.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordPointer.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordPointer.java
deleted file mode 100644
index 3e70fe7..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordPointer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-import java.io.IOException;
-
-import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ref.rops.DataWriter;
-import org.apache.drill.exec.ref.values.DataValue;
-
-
-public interface RecordPointer {
-  public DataValue getField(SchemaPath field);
-  public void addField(SchemaPath field, DataValue value);
-  public void addField(PathSegment segment, DataValue value);
-  public void removeField(SchemaPath segment);
-  public void write(DataWriter writer) throws IOException;
-  public RecordPointer copy();
-  public void copyFrom(RecordPointer r);
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java
deleted file mode 100644
index 6859274..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.logical.LogicalPlan;
-import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
-import org.apache.drill.exec.ref.eval.EvaluatorFactory;
-import org.apache.drill.exec.ref.rops.SinkROP;
-import org.apache.drill.exec.ref.rse.RSERegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-public class ReferenceInterpreter {
-  static final Logger logger = LoggerFactory.getLogger(ReferenceInterpreter.class);
-  
-  private List<SinkROP> sinks = new ArrayList<SinkROP>();
-  private LogicalPlan plan;
-  private ROPConverter converter;
-  private IteratorRegistry registry;
-  
-  public ReferenceInterpreter(LogicalPlan p, IteratorRegistry r, EvaluatorFactory builder, RSERegistry rses){
-    this.plan = p;
-    this.registry = r;
-    this.converter = new ROPConverter(p, registry, builder, rses);
-  }
-  
-  /** Generate Reference equivalents to each operation and then collect and store all the sinks. 
-   * @throws IOException **/
-  public void setup() throws IOException{
-    for(LogicalOperator op : plan.getSortedOperators()){
-      converter.convert(op);
-    }
-    sinks.addAll(registry.getSinks());
-  }
-  
-  public  Collection<RunOutcome> run(){
-    Collection<RunOutcome> outcomes  = new LinkedList<RunOutcome>();
-    
-    for(SinkROP r : sinks){
-      outcomes.add(r.run(new BasicStatusHandle()));
-    }
-    
-    return outcomes;
-  }
-  
-  public void cleanup(){
-    
-  }
-  
-  
-  public static void main(String[] args) throws Exception{
-    DrillConfig config = DrillConfig.create();
-    int arg = 0;
-    final BlockingQueue<Object> queue;
-    if (arg < args.length && args[arg].equals("--stdout")) {
-      ++arg;
-      queue = new ArrayBlockingQueue<>(100);
-      config.setSinkQueues(0, queue);
-    } else {
-      queue = null;
-    }
-    final String jsonFile = args[arg];
-    final String planString;
-    if (jsonFile.startsWith("inline:")) {
-      planString = jsonFile.substring("inline:".length());
-    } else {
-      planString = Files.toString(new File(jsonFile), Charsets.UTF_8);
-    }
-    LogicalPlan plan = LogicalPlan.parse(config, planString);
-    IteratorRegistry ir = new IteratorRegistry();
-    ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir), new RSERegistry(config));
-    i.setup();
-    final Object[] result = {null};
-    final Thread thread;
-    if (queue != null) {
-      thread = new Thread(
-          new Runnable() {
-            @Override
-            public void run() {
-              try {
-                result[0] = run0();
-              } catch (Throwable e) {
-                result[0] = e;
-              }
-            }
-            private boolean run0() throws IOException {
-              for (;;) {
-                try {
-                  Object o = queue.take();
-                  if (o instanceof RunOutcome.OutcomeType) {
-                    switch ((RunOutcome.OutcomeType) o) {
-                    case SUCCESS:
-                      return true; // end of data
-                    case CANCELED:
-                      throw new RuntimeException("canceled");
-                    case FAILED:
-                    default:
-                      throw new RuntimeException("failed");
-                    }
-                  } else {
-                    System.out.write((byte[]) o);
-                  }
-                } catch (InterruptedException e) {
-                  Thread.interrupted();
-                  throw new RuntimeException(e);
-                }
-              }
-            }
-          });
-      thread.start();
-    } else {
-      thread = null;
-    }
-    Collection<RunOutcome> outcomes = i.run();
-    
-    for(RunOutcome outcome : outcomes){
-      System.out.println("============");
-      System.out.println(outcome);
-      if(outcome.outcome == RunOutcome.OutcomeType.FAILED && outcome.exception != null){
-        outcome.exception.printStackTrace();
-      }
-    }
-     if (thread != null) {
-       thread.join();
-       System.out.println("Result: " + result[0]);
-     }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceTransform.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceTransform.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceTransform.java
deleted file mode 100644
index e31c09b..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceTransform.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-import org.apache.drill.exec.ref.rops.ReferenceOperator;
-
-public class ReferenceTransform implements ReferenceOperator{
-
-  @Override
-  public void setInput(RecordIterator incoming) {
-  }
-
-  @Override
-  public RecordIterator getOutput() {
-    return null;
-  }
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/RunOutcome.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/RunOutcome.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/RunOutcome.java
deleted file mode 100644
index 5a07031..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/RunOutcome.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-public class RunOutcome {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RunOutcome.class);
-  
-  public enum OutcomeType{
-    SUCCESS, FAILED, CANCELED;
-  }
-
-  public final OutcomeType outcome;
-  public final long bytes;
-  public final long records;
-  public final Throwable exception;
-  
-  public RunOutcome(OutcomeType outcome, long bytes, long records, Throwable exception) {
-    super();
-    if(outcome != OutcomeType.SUCCESS) logger.warn("Creating failed outcome.", exception);
-    this.outcome = outcome;
-    this.bytes = bytes;
-    this.records = records;
-    this.exception = exception;
-  }
-  
-  public RunOutcome(OutcomeType outcome, long bytes, long records) {
-    this(outcome, bytes, records, null);
-  }
-
-  @Override
-  public String toString() {
-    return "RunOutcome [outcome=" + outcome + ", bytes=" + bytes + ", records=" + records + ", exception=" + exception
-        + "]";
-  }
-  
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
deleted file mode 100644
index 29c83d4..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref;
-
-import java.io.IOException;
-
-import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.CollisionBehavior;
-import org.apache.drill.exec.ref.rops.DataWriter;
-import org.apache.drill.exec.ref.values.ContainerValue;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.SimpleMapValue;
-import org.apache.drill.exec.ref.values.ValueUtils;
-
-public class UnbackedRecord implements RecordPointer {
-
-    private DataValue root = new SimpleMapValue();
-
-    public DataValue getField(SchemaPath field) {
-        return root.getValue(field.getRootSegment());
-    }
-
-    public void addField(SchemaPath field, DataValue value) {
-        addField(field.getRootSegment(), value);
-    }
-
-    @Override
-    public void addField(PathSegment segment, DataValue value) {
-        root.addValue(segment, value);
-    }
-
-    @Override
-    public void removeField(SchemaPath field) {
-        root.removeValue(field.getRootSegment());
-    }
-
-    @Override
-    public void write(DataWriter writer) throws IOException {
-        writer.startRecord();
-        root.write(writer);
-        writer.endRecord();
-    }
-
-    public void merge(DataValue v) {
-        if (v instanceof ContainerValue) {
-            this.root = ValueUtils.getMergedDataValue(CollisionBehavior.MERGE_OVERRIDE, root, v);
-        } else {
-            this.root = v;
-        }
-    }
-
-    public void merge(RecordPointer pointer) {
-        if (pointer instanceof UnbackedRecord) {
-            merge(UnbackedRecord.class.cast(pointer).root);
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format("Unable to merge from a record of type %s to an UnbackedRecord.", pointer.getClass().getCanonicalName())
-            );
-        }
-    }
-
-    @Override
-    public RecordPointer copy() {
-        UnbackedRecord r = new UnbackedRecord();
-        r.root = this.root.copy();
-        return r;
-    }
-
-    public void clear() {
-        root = new SimpleMapValue();
-    }
-
-    public void setClearAndSetRoot(SchemaPath path, DataValue v) {
-        root = new SimpleMapValue();
-        root.addValue(path.getRootSegment(), v);
-    }
-
-    @Override
-    public void copyFrom(RecordPointer r) {
-        if (r instanceof UnbackedRecord) {
-            this.root = ((UnbackedRecord) r).root.copy();
-        } else {
-            throw new UnsupportedOperationException(String.format("Unable to copy from a record of type %s to an UnbackedRecord.", r.getClass().getCanonicalName()));
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "UnbackedRecord [root=" + root + "]";
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((root == null) ? 0 : root.hashCode());
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) return true;
-      if (obj == null) return false;
-      if (getClass() != obj.getClass()) return false;
-      UnbackedRecord other = (UnbackedRecord) obj;
-      if (root == null) {
-        if (other.root != null) return false;
-      } else if (!root.equals(other.root)) return false;
-      return true;
-    }
-    
-    
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BaseBasicEvaluator.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BaseBasicEvaluator.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BaseBasicEvaluator.java
deleted file mode 100644
index e6b7e7c..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BaseBasicEvaluator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval;
-
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-
-public abstract class BaseBasicEvaluator implements BasicEvaluator{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseBasicEvaluator.class);
-  
-  private final boolean isConstant;
-  protected final RecordPointer record;
-  
-  public BaseBasicEvaluator(boolean isConstant, RecordPointer record) {
-    super();
-    this.record = record;
-    this.isConstant = isConstant;
-  }
-
-  @Override
-  public final boolean isConstant() {
-    return isConstant;
-  }
-
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BasicEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BasicEvaluatorFactory.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BasicEvaluatorFactory.java
deleted file mode 100644
index 13b7729..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BasicEvaluatorFactory.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval;
-
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
-import org.apache.drill.common.expression.visitors.SimpleExprVisitor;
-import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.exec.ref.IteratorRegistry;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.AggregatingEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BooleanEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.ConnectedEvaluator;
-import org.apache.drill.exec.ref.eval.fn.agg.AggregatingWrapperEvaluator;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.ValueReader;
-
-public class BasicEvaluatorFactory extends EvaluatorFactory{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicEvaluatorFactory.class);
-
-  public BasicEvaluatorFactory(IteratorRegistry registry){
-    
-  }
-  
-  private SimpleExprVisitor<BasicEvaluator> get(RecordPointer record){
-    return new SimpleEvaluationVisitor(record);
-  }
-  
-  @Override
-  public BasicEvaluator getBasicEvaluator(RecordPointer inputRecord, LogicalExpression e) {
-    return e.accept(get(inputRecord), null);
-  }
-  
-
-  
-  @Override
-  public AggregatingEvaluator getAggregatingOperator(RecordPointer record, LogicalExpression e) {
-    SimpleEvaluationVisitor visitor = new SimpleEvaluationVisitor(record);
-    BasicEvaluator b = e.accept(visitor, null);
-    return new AggregatingWrapperEvaluator(visitor.getAggregators(), b);
-  }
-
-  @Override
-  public BooleanEvaluator getBooleanEvaluator(RecordPointer record, LogicalExpression e) {
-    return new BooleanEvaluatorImpl(e.accept(get(record),  null));
-  }
-
-  @Override
-  public ConnectedEvaluator getConnectedEvaluator(RecordPointer record, NamedExpression ne) {
-    return new ConnectedEvaluatorImpl(record, ne);
-  }
-
-  
-  private class BooleanEvaluatorImpl implements BooleanEvaluator{
-    private BasicEvaluator eval;
-    
-    public BooleanEvaluatorImpl(BasicEvaluator e){
-      this.eval = e;
-    }
-    
-    @Override
-    public boolean eval() {
-      return ValueReader.getBoolean(eval.eval());
-    }
-    
-  }
-  
-  private class ConnectedEvaluatorImpl implements ConnectedEvaluator{
-    private SchemaPath outputPath;
-    private BasicEvaluator eval;
-    private RecordPointer record;
-    
-    public ConnectedEvaluatorImpl(RecordPointer record, NamedExpression e){
-      this.outputPath = e.getRef();
-      this.record = record;
-      this.eval = e.getExpr().accept(get(record), null);
-    }
-
-    @Override
-    public void eval() {
-      DataValue val = eval.eval();
-      record.addField(outputPath, val);
-    }
-    
-    
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BoundarySubscriber.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BoundarySubscriber.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BoundarySubscriber.java
deleted file mode 100644
index 5e01fb6..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/BoundarySubscriber.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval;
-
-public interface BoundarySubscriber {
-  public void crossedBoundary();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/EvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/EvaluatorFactory.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/EvaluatorFactory.java
deleted file mode 100644
index 084fe70..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/EvaluatorFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval;
-
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.AggregatingEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BooleanEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.ConnectedEvaluator;
-
-public abstract class EvaluatorFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EvaluatorFactory.class);
-
-  /**
-   * Provide an evaluator that returns a true/false value on evaluation.
-   * @param e The base logical expression
-   * @return true/false of expression.
-   */
-  public abstract BooleanEvaluator getBooleanEvaluator(RecordPointer record, LogicalExpression e);
-  
-  /**
-   * Provide an evaluator that takes the output value of the named expression evaluation and place that value back into the simple record.
-   * @param ne The named expression to work with.
-   * @return
-   */
-  public abstract ConnectedEvaluator getConnectedEvaluator(RecordPointer record, NamedExpression ne);
-
-  /**
-   * Provide a basic evaluator that returns the data value interested in.
-   * @param e
-   * @return
-   */
-  public abstract BasicEvaluator getBasicEvaluator(RecordPointer record, LogicalExpression e);
-  
-  /**
-   * Given a set of named expressions, please provide a single 
-   * @param expressions
-   * @return
-   */
-  public abstract AggregatingEvaluator getAggregatingOperator(RecordPointer record, LogicalExpression e);
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/EvaluatorTypes.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/EvaluatorTypes.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/EvaluatorTypes.java
deleted file mode 100644
index a108f06..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/EvaluatorTypes.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval;
-
-import org.apache.drill.exec.ref.values.DataValue;
-
-public interface EvaluatorTypes {
-  
-  /**
-   * The base form of evaluator.  Other evaluators are typically just masks of this type of evaluator.
-   */
-  public interface BasicEvaluator{
-    public DataValue eval();
-    public boolean isConstant();
-  }
-
-  /**
-   * An evaluator that returns true or false based on the input record.
-   */
-  public interface BooleanEvaluator{
-    public boolean eval();  
-  }
-
-  /**
-   * An evaluator that returns no value.  It is expected to write the output to the provided input record instead.
-   */
-  public interface ConnectedEvaluator{
-    public void eval();
-  }
-  
-  /**
-   * An evaluator that takes multiple input values and returns a single output value.
-   */
-  public interface AggregatingEvaluator extends BasicEvaluator{
-    public void addRecord();
-  }
-
-  
-  public interface ConstantEvaluator{
-    public DataValue eval();
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/FieldEvaluator.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/FieldEvaluator.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/FieldEvaluator.java
deleted file mode 100644
index 2b1a218..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/FieldEvaluator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.values.DataValue;
-
-public class FieldEvaluator implements BasicEvaluator{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FieldEvaluator.class);
-  
-  private final SchemaPath path;
-  private final RecordPointer record;
-  
-  public FieldEvaluator(SchemaPath p, RecordPointer record){
-    this.path = p;
-    this.record = record;
-  }
-
-  @Override
-  public DataValue eval() {
-    return record.getField(path);
-  }
-
-  @Override
-  public boolean isConstant() {
-    return false;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/IfEvaluator.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/IfEvaluator.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/IfEvaluator.java
deleted file mode 100644
index c7f242e..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/IfEvaluator.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval;
-
-import org.apache.drill.common.expression.IfExpression;
-import org.apache.drill.common.expression.IfExpression.IfCondition;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.ValueReader;
-
-public class IfEvaluator implements BasicEvaluator{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IfEvaluator.class);
-
-  private final IfCond[] conditions;
-  private final BasicEvaluator elseExpression;
-  
-  public IfEvaluator(IfExpression expression, ExprVisitor<BasicEvaluator, Void, RuntimeException> evalBuilder, RecordPointer record){
-    this.conditions = new IfCond[expression.conditions.size()];
-    for(int i =0; i < conditions.length; i++){
-      conditions[i] = new IfCond(expression.conditions.get(i), evalBuilder);
-    }
-    elseExpression = expression.elseExpression.accept(evalBuilder, null);
-  }
-  
-  @Override
-  public DataValue eval() {
-    for(int i = 0 ; i < conditions.length; i++){
-      if(ValueReader.getBoolean(conditions[i].condition.eval())) return conditions[i].valueExpression.eval() ;
-    }
-    return elseExpression.eval();
-  }
-  
-  public class IfCond{
-    private final BasicEvaluator condition;
-    private final BasicEvaluator valueExpression;
-
-    public IfCond(IfCondition c, ExprVisitor<BasicEvaluator, Void, RuntimeException> evalBuilder){
-      this.condition = c.condition.accept(evalBuilder, null);
-      this.valueExpression = c.expression.accept(evalBuilder, null);
-    }
-    
-    public boolean matches(RecordPointer r){
-      return ValueReader.getBoolean(condition.eval());
-    }
-    
-    public DataValue eval(RecordPointer r){
-      return valueExpression.eval();
-    }
-    
-    public boolean isConstant() {
-      return condition.isConstant() && valueExpression.isConstant();
-    }
-  }
-
-  @Override
-  public boolean isConstant() {
-    for(IfCond c : conditions){
-      if(!c.isConstant()) return false;
-    }
-    return elseExpression.isConstant();
-  }
-  
-  
-}


Mime
View raw message