drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prog...@apache.org
Subject [1/2] drill git commit: DRILL-5842: Refactor fragment, operator contexts
Date Tue, 14 Nov 2017 00:14:55 GMT
Repository: drill
Updated Branches:
  refs/heads/master 42fc11e53 -> c56de2f13


http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
index 0f27884..e7a78ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
@@ -21,7 +21,7 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -47,7 +47,7 @@ public class SorterWrapper extends BaseSortWrapper {
 
   private SingleBatchSorter sorter;
 
-  public SorterWrapper(OperExecContext opContext) {
+  public SorterWrapper(OperatorContext opContext) {
     super(opContext);
   }
 
@@ -55,7 +55,7 @@ public class SorterWrapper extends BaseSortWrapper {
 
     SingleBatchSorter sorter = getSorter(convertedBatch);
     try {
-      sorter.setup(context, sv2, convertedBatch);
+      sorter.setup(context.getFragmentContext(), sv2, convertedBatch);
       sorter.sort(sv2);
     } catch (SchemaChangeException e) {
       convertedBatch.clear();
@@ -78,8 +78,8 @@ public class SorterWrapper extends BaseSortWrapper {
 
   private SingleBatchSorter newSorter(VectorAccessible batch) {
     CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(
-        SingleBatchSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(),
-        context.getOptionSet());
+        SingleBatchSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getFunctionRegistry(),
+        context.getFragmentContext().getOptionSet());
     ClassGenerator<SingleBatchSorter> g = cg.getRoot();
     cg.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
index b75ce77..3d7e63a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -23,13 +23,13 @@ import java.util.List;
 
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
 import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.VectorInitializer;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorInitializer;
 
 import com.google.common.collect.Lists;
 
@@ -56,9 +56,9 @@ public class SpilledRuns {
   private final PriorityQueueCopierWrapper copierHolder;
   private BatchSchema schema;
 
-  private final OperExecContext context;
+  private final OperatorContext context;
 
-  public SpilledRuns(OperExecContext opContext, SpillSet spillSet, PriorityQueueCopierWrapper
copier) {
+  public SpilledRuns(OperatorContext opContext, SpillSet spillSet, PriorityQueueCopierWrapper
copier) {
     this.context = opContext;
     this.spillSet = spillSet;
 //    copierHolder = new PriorityQueueCopierWrapper(opContext);

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
index 5e7be8d..489e03c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.dfs;
 
+import org.apache.drill.exec.ops.OperatorStatReceiver;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -38,13 +39,13 @@ import java.util.EnumSet;
 public class DrillFSDataInputStream extends FSDataInputStream {
   private final FSDataInputStream underlyingIs;
   private final OpenFileTracker openFileTracker;
-  private final OperatorStats operatorStats;
+  private final OperatorStatReceiver operatorStats;
 
-  public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws
IOException {
+  public DrillFSDataInputStream(FSDataInputStream in, OperatorStatReceiver operatorStats)
throws IOException {
     this(in, operatorStats, null);
   }
 
-  public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats,
+  public DrillFSDataInputStream(FSDataInputStream in, OperatorStatReceiver operatorStats,
       OpenFileTracker openFileTracker) throws IOException {
     super(new WrappedInputStream(in, operatorStats));
     underlyingIs = in;
@@ -193,9 +194,9 @@ public class DrillFSDataInputStream extends FSDataInputStream {
    */
   private static class WrappedInputStream extends InputStream implements Seekable, PositionedReadable
{
     final FSDataInputStream is;
-    final OperatorStats operatorStats;
+    final OperatorStatReceiver operatorStats;
 
-    WrappedInputStream(FSDataInputStream is, OperatorStats operatorStats) {
+    WrappedInputStream(FSDataInputStream is, OperatorStatReceiver operatorStats) {
       this.is = is;
       this.operatorStats = operatorStats;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 52e1a96..fc540aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorStatReceiver;
 import org.apache.drill.exec.util.AssertionUtil;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -80,14 +80,14 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker
{
   private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles
= Maps.newConcurrentMap();
 
   private final FileSystem underlyingFs;
-  private final OperatorStats operatorStats;
+  private final OperatorStatReceiver operatorStats;
   private final CompressionCodecFactory codecFactory;
 
   public DrillFileSystem(Configuration fsConf) throws IOException {
     this(fsConf, null);
   }
 
-  public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException
{
+  public DrillFileSystem(Configuration fsConf, OperatorStatReceiver operatorStats) throws
IOException {
     this.underlyingFs = FileSystem.get(fsConf);
     this.codecFactory = new CompressionCodecFactory(fsConf);
     this.operatorStats = operatorStats;

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
index 4c90769..c58abd6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
@@ -27,19 +27,19 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrapper.BatchMerger;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.RowSetSchema;
 import org.apache.drill.test.rowSet.SchemaBuilder;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 
 import com.google.common.collect.Lists;
 
@@ -67,7 +67,7 @@ public class SortTestUtilities {
     FieldReference expr = FieldReference.getWithQuotedRef("key");
     Ordering ordering = new Ordering(sortOrder, expr, nullOrder);
     Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false);
-    OperExecContext opContext = fixture.newOperExecContext(popConfig);
+    OperatorContext opContext = fixture.operatorContext(popConfig);
     return new PriorityQueueCopierWrapper(opContext);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index a8d6e82..d83a765 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -30,7 +30,7 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
@@ -82,7 +82,7 @@ public class TestSortImpl extends DrillTest {
     FieldReference expr = FieldReference.getWithQuotedRef("key");
     Ordering ordering = new Ordering(sortOrder, expr, nullOrder);
     Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false);
-    OperExecContext opContext = fixture.newOperExecContext(popConfig);
+    OperatorContext opContext = fixture.operatorContext(popConfig);
     QueryId queryId = QueryId.newBuilder()
         .setPart1(1234)
         .setPart2(5678)
@@ -92,9 +92,9 @@ public class TestSortImpl extends DrillTest {
           .setMinorFragmentId(3)
           .setQueryId(queryId)
           .build();
-    SortConfig sortConfig = new SortConfig(opContext.getConfig());
+    SortConfig sortConfig = new SortConfig(opContext.getFragmentContext().getConfig());
 
-    SpillSet spillSet = new SpillSet(opContext.getConfig(), handle, popConfig);
+    SpillSet spillSet = new SpillSet(opContext.getFragmentContext().getConfig(), handle,
popConfig);
     PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
     SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
     return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch);

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
index 7d0019d..5f04da6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
@@ -29,7 +29,7 @@ import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.DrillTest;
@@ -82,7 +82,7 @@ public class TestSorter extends DrillTest {
   }
 
   public void runSorterTest(Sort popConfig, SingleRowSet rowSet, SingleRowSet expected) throws
Exception {
-    OperExecContext opContext = fixture.newOperExecContext(popConfig);
+    OperatorContext opContext = fixture.operatorContext(popConfig);
     SorterWrapper sorter = new SorterWrapper(opContext);
 
     sorter.sortBatch(rowSet.container(), rowSet.getSv2());
@@ -149,7 +149,7 @@ public class TestSorter extends DrillTest {
       Sort popConfig = makeSortConfig("key", sortOrder, nullOrder);
       this.nullable = nullable;
 
-      OperExecContext opContext = fixture.newOperExecContext(popConfig);
+      OperatorContext opContext = fixture.operatorContext(popConfig);
       sorter = new SorterWrapper(opContext);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index 66588b1..62c961d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -75,6 +75,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
         .go();
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testSimpleHashJoin() {
     HashJoinPOP joinConf = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS",
"x1")), JoinRelType.LEFT);
@@ -100,6 +101,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
         .go();
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testSimpleMergeJoin() {
     MergeJoinPOP joinConf = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("x",
"EQUALS", "x1")), JoinRelType.LEFT);
@@ -293,6 +295,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
 
   // TODO(DRILL-4439) - doesn't expect incoming batches, uses instead RawFragmentBatch
   // need to figure out how to mock these
+  @SuppressWarnings("unchecked")
   @Ignore
   @Test
   public void testSimpleMergingReceiver() {

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index c53536a..ca226e6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -158,7 +158,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
           return; // successful
         }
       }
-      Map<String, List<Object>> actualSuperVectors = new TreeMap();
+      Map<String, List<Object>> actualSuperVectors = new TreeMap<String, List<Object>>();
 
       int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema,
actualSuperVectors);
       if (expectBatchNum != null) {
@@ -234,7 +234,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
    */
 
   public class PopBuilder  {
-    private PhysicalOperator popConfig;
+    protected PhysicalOperator popConfig;
     protected long initReservation = INIT_ALLOCATION;
     protected long maxAllocation = MAX_ALLOCATION;
 
@@ -307,21 +307,24 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
       return this;
     }
 
+    @SuppressWarnings("resource")
     public PopBuilder buildAddAsInput() throws Exception {
-      mockOpContext(initReservation, maxAllocation);
+      mockOpContext(popConfig, initReservation, maxAllocation);
+      @SuppressWarnings("unchecked")
       BatchCreator<PhysicalOperator> opCreator =  (BatchCreator<PhysicalOperator>)
getOpCreatorReg().getOperatorCreator(popConfig.getClass());
       RecordBatch batch= opCreator.getBatch(fragContext, popConfig, inputs);
       return parent.addInput(batch);
     }
 
     public RecordBatch build() throws Exception {
-      mockOpContext(initReservation, maxAllocation);
+      mockOpContext(popConfig, initReservation, maxAllocation);
+      @SuppressWarnings("unchecked")
       BatchCreator<PhysicalOperator> opCreator =  (BatchCreator<PhysicalOperator>)
getOpCreatorReg().getOperatorCreator(popConfig.getClass());
       return opCreator.getBatch(fragContext, popConfig, inputs);
     }
   }
 
-  public abstract class ScanPopBuider<T extends ScanPopBuider> extends PopBuilder {
+  public abstract class ScanPopBuider<T extends ScanPopBuider<?>> extends PopBuilder
{
     List<SchemaPath> columnsToRead = Collections.singletonList(SchemaPath.getSimplePath("*"));
     DrillFileSystem fs = null;
 
@@ -333,16 +336,19 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
       super(parent);
     }
 
+    @SuppressWarnings("unchecked")
     public T fileSystem(DrillFileSystem fs) {
       this.fs = fs;
       return (T) this;
     }
 
+    @SuppressWarnings("unchecked")
     public T columnsToRead(SchemaPath ... columnsToRead) {
       this.columnsToRead = Lists.newArrayList(columnsToRead);
       return (T) this;
     }
 
+    @SuppressWarnings("unchecked")
     public T columnsToRead(String ... columnsToRead) {
       this.columnsToRead = Lists.newArrayList();
 
@@ -360,7 +366,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
    */
   public class JsonScanBuilder extends ScanPopBuider<JsonScanBuilder> {
     List<String> jsonBatches = null;
-    List<String> inputPaths = Collections.EMPTY_LIST;
+    List<String> inputPaths = Collections.emptyList();
 
     public JsonScanBuilder(PopBuilder parent) {
       super(parent);
@@ -380,14 +386,16 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
       return this;
     }
 
+    @Override
     public PopBuilder buildAddAsInput() throws Exception {
-      mockOpContext(this.initReservation, this.maxAllocation);
+      mockOpContext(popConfig, this.initReservation, this.maxAllocation);
       RecordBatch scanBatch = getScanBatch();
       return parent.addInput(scanBatch);
     }
 
+    @Override
     public RecordBatch build() throws Exception {
-      mockOpContext(this.initReservation, this.maxAllocation);
+      mockOpContext(popConfig, this.initReservation, this.maxAllocation);
       return getScanBatch();
     }
 
@@ -414,7 +422,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
    * Builder for parquet Scan RecordBatch.
    */
   public class ParquetScanBuilder extends ScanPopBuider<ParquetScanBuilder> {
-    List<String> inputPaths = Collections.EMPTY_LIST;
+    List<String> inputPaths = Collections.emptyList();
 
     public ParquetScanBuilder() {
       super();
@@ -429,14 +437,16 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
       return this;
     }
 
+    @Override
     public PopBuilder buildAddAsInput() throws Exception {
-      mockOpContext(this.initReservation, this.maxAllocation);
+      mockOpContext(popConfig, this.initReservation, this.maxAllocation);
       RecordBatch scanBatch = getScanBatch();
       return parent.addInput(scanBatch);
     }
 
+    @Override
     public RecordBatch build() throws Exception {
-      mockOpContext(this.initReservation, this.maxAllocation);
+      mockOpContext(popConfig, this.initReservation, this.maxAllocation);
       return getScanBatch();
     }
 
@@ -465,8 +475,8 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
   } // end of ParquetScanBuilder
 
   @Override
-  protected void mockOpContext(long initReservation, long maxAllocation) throws Exception
{
-    super.mockOpContext(initReservation, maxAllocation);
+  protected void mockOpContext(PhysicalOperator popConfig, long initReservation, long maxAllocation)
throws Exception {
+    super.mockOpContext(popConfig, initReservation, maxAllocation);
 
     // mock ScanExecutor used by parquet reader.
     new NonStrictExpectations() {

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index 157f1d7..26345c9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -78,8 +78,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.drill.exec.physical.base.AbstractBase.INIT_ALLOCATION;
-
 /**
  * Look! Doesn't extend BaseTestQuery!!
  */
@@ -195,11 +193,12 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     private long initReservation = AbstractBase.INIT_ALLOCATION;
     private long maxAllocation = AbstractBase.MAX_ALLOCATION;
 
+    @SuppressWarnings({ "unchecked", "resource" })
     public void go() {
       BatchCreator<PhysicalOperator> opCreator;
       RecordBatch testOperator;
       try {
-        mockOpContext(initReservation, maxAllocation);
+        mockOpContext(popConfig, initReservation, maxAllocation);
 
         opCreator = (BatchCreator<PhysicalOperator>)
             opCreatorReg.getOperatorCreator(popConfig.getClass());
@@ -297,12 +296,14 @@ public class PhysicalOpUnitTestBase extends ExecTest {
 //        optManager.getOption(withAny(new TypeValidators.StringValidator("", "try"))); result
= "try";
 //        optManager.getOption(withAny(new TypeValidators.PositiveLongValidator("", 1l, 1l)));
result = 10;
         fragContext.getOptions(); result = optionManager;
+        fragContext.getOptionSet(); result = optionManager;
         fragContext.getManagedBuffer(); result = bufManager.getManagedBuffer();
         fragContext.shouldContinue(); result = true;
         fragContext.getExecutionControls(); result = executionControls;
         fragContext.getFunctionRegistry(); result = funcReg;
         fragContext.getConfig(); result = drillConf;
         fragContext.getHandle(); result = ExecProtos.FragmentHandle.getDefaultInstance();
+        fragContext.getFunctionRegistry(); result = funcReg;
         try {
           CodeGenerator<?> cg = CodeGenerator.get(templateClassDefinition, funcReg);
           cg.plainJavaCapable(true);
@@ -332,13 +333,16 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     };
   }
 
-  protected void mockOpContext(long initReservation, long maxAllocation) throws Exception{
+  protected void mockOpContext(final PhysicalOperator popConfig, long initReservation, long
maxAllocation) throws Exception{
     final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test",
initReservation, maxAllocation);
     new NonStrictExpectations() {
       {
         opContext.getStats();result = opStats;
+        opContext.getStatsWriter(); result = opStats;
         opContext.getAllocator(); result = allocator;
-        fragContext.newOperatorContext(withAny(popConf));result = opContext;
+        opContext.getFragmentContext(); result = fragContext;
+        opContext.getOperatorDefn(); result = popConfig;
+        fragContext.newOperatorContext(withAny(popConf)); result = opContext;
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index 09abf12..c03f0b7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,32 +17,33 @@
  */
 package org.apache.drill.test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.CodeCompiler;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
-import org.apache.drill.exec.ops.AbstractOperatorExecContext;
-import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.BaseFragmentContext;
+import org.apache.drill.exec.ops.BaseOperatorContext;
+import org.apache.drill.exec.ops.BufferManager;
+import org.apache.drill.exec.ops.BufferManagerImpl;
+import org.apache.drill.exec.ops.FragmentContextInterface;
 import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperExecContext;
-import org.apache.drill.exec.ops.OperExecContextImpl;
-import org.apache.drill.exec.ops.OperatorExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStatReceiver;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.testing.ExecutionControls;
@@ -53,6 +54,9 @@ import org.apache.drill.test.rowSet.IndirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Test fixture for operator and (especially) "sub-operator" tests.
@@ -120,29 +124,30 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable
{
    * provide test-specific versions of various other services.
    */
 
-  public static class TestCodeGenContext implements FragmentExecContext {
+  public static class TestFragmentContext extends BaseFragmentContext {
 
     private final DrillConfig config;
     private final OptionSet options;
     private final CodeCompiler compiler;
-    private final FunctionImplementationRegistry functionRegistry;
     private ExecutionControls controls;
+    private final BufferManagerImpl bufferManager;
 
-    public TestCodeGenContext(DrillConfig config, OptionSet options) {
+    public TestFragmentContext(DrillConfig config, OptionSet options, BufferAllocator allocator)
{
+      super(newFunctionRegistry(config, options));
       this.config = config;
       this.options = options;
-      ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
-      functionRegistry = new FunctionImplementationRegistry(config, classpathScan, options);
       compiler = new CodeCompiler(config, options);
+      bufferManager = new BufferManagerImpl(allocator);
     }
 
-    public void setExecutionControls(ExecutionControls controls) {
-      this.controls = controls;
+    private static FunctionImplementationRegistry newFunctionRegistry(
+        DrillConfig config, OptionSet options) {
+      ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
+      return new FunctionImplementationRegistry(config, classpathScan, options);
     }
 
-    @Override
-    public FunctionImplementationRegistry getFunctionRegistry() {
-      return functionRegistry;
+    public void setExecutionControls(ExecutionControls controls) {
+      this.controls = controls;
     }
 
     @Override
@@ -151,40 +156,33 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable
{
     }
 
     @Override
-    public <T> T getImplementationClass(final ClassGenerator<T> cg)
-        throws ClassTransformationException, IOException {
-      return getImplementationClass(cg.getCodeGenerator());
+    public boolean shouldContinue() {
+      return true;
     }
 
     @Override
-    public <T> T getImplementationClass(final CodeGenerator<T> cg)
-        throws ClassTransformationException, IOException {
-      return compiler.createInstance(cg);
+    public ExecutionControls getExecutionControls() {
+      return controls;
     }
 
     @Override
-    public <T> List<T> getImplementationClass(final ClassGenerator<T> cg,
final int instanceCount) throws ClassTransformationException, IOException {
-      return getImplementationClass(cg.getCodeGenerator(), instanceCount);
+    public DrillConfig getConfig() {
+      return config;
     }
 
     @Override
-    public <T> List<T> getImplementationClass(final CodeGenerator<T> cg,
final int instanceCount) throws ClassTransformationException, IOException {
-      return compiler.createInstances(cg, instanceCount);
+    public DrillbitContext getDrillbitContext() {
+      throw new UnsupportedOperationException("Drillbit context not available for operator
unit tests");
     }
 
     @Override
-    public boolean shouldContinue() {
-      return true;
+    protected CodeCompiler getCompiler() {
+       return compiler;
     }
 
     @Override
-    public ExecutionControls getExecutionControls() {
-      return controls;
-    }
-
-    @Override
-    public DrillConfig getConfig() {
-      return config;
+    protected BufferManager getBufferManager() {
+      return bufferManager;
     }
   }
 
@@ -234,10 +232,17 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable
{
     private void setStat(int metricId, double value) {
       stats.put(metricId, value);
     }
+
+    // Timing stats not supported for test.
+    @Override
+    public void startWait() { }
+
+    @Override
+    public void stopWait() {  }
   }
 
   private final SystemOptionManager options;
-  private final TestCodeGenContext context;
+  private final TestFragmentContext context;
   private final OperatorStatReceiver stats;
 
   protected OperatorFixture(OperatorFixtureBuilder builder) {
@@ -252,7 +257,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable
{
     if (builder.systemOptions != null) {
       applySystemOptions(builder.systemOptions);
     }
-    context = new TestCodeGenContext(config, options);
+    context = new TestFragmentContext(config, options, allocator);
     stats = new MockStats();
    }
 
@@ -263,7 +268,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable
{
   }
 
   public SystemOptionManager options() { return options; }
-  public FragmentExecContext fragmentExecContext() { return context; }
+  public FragmentContextInterface fragmentExecContext() { return context; }
 
   @Override
   public void close() throws Exception {
@@ -284,10 +289,6 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable
{
     return builder().build();
   }
 
-  public OperExecContext newOperExecContext(PhysicalOperator opDefn) {
-    return new OperExecContextImpl(context, allocator, stats, opDefn, null);
-  }
-
   public RowSetBuilder rowSetBuilder(BatchSchema schema) {
     return new RowSetBuilder(allocator, schema);
   }
@@ -309,7 +310,36 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable
{
     }
   }
 
-  public OperatorExecContext operatorContext(PhysicalOperator config) {
-    return new AbstractOperatorExecContext(allocator(), config, context.getExecutionControls(),
stats);
+  public static class TestOperatorContext extends BaseOperatorContext {
+
+    private final OperatorStatReceiver stats;
+
+    public TestOperatorContext(FragmentContextInterface fragContext,
+        BufferAllocator allocator,
+        PhysicalOperator config,
+        OperatorStatReceiver stats) {
+      super(fragContext, allocator, config);
+      this.stats = stats;
+    }
+
+    @Override
+    public OperatorStatReceiver getStatsWriter() {
+      return stats;
+    }
+
+    @Override
+    public OperatorStats getStats() {
+      throw new UnsupportedOperationException("getStats() not supported for tests");
+    }
+
+    @Override
+    public <RESULT> ListenableFuture<RESULT> runCallableAs(
+        UserGroupInformation proxyUgi, Callable<RESULT> callable) {
+      throw new UnsupportedOperationException("Not yet");
+    }
+  }
+
+  public OperatorContext operatorContext(PhysicalOperator config) {
+    return new TestOperatorContext(context, allocator(), config, stats);
   }
 }


Mime
View raw message