drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [7/7] drill git commit: DRILL-5459: Extend physical operator test framework to test mini plans consisting of multiple operators.
Date Sat, 13 May 2017 17:39:05 GMT
DRILL-5459: Extend physical operator test framework to test mini plans consisting of multiple operators.

This closes #823


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0dc237e3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0dc237e3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0dc237e3

Branch: refs/heads/master
Commit: 0dc237e3161cf284212cc63f740b229d4fee8fdf
Parents: cb9547a
Author: Jinfeng Ni <jni@apache.org>
Authored: Fri Apr 21 17:34:15 2017 -0700
Committer: Parth Chandra <pchandra@maprtech.com>
Committed: Fri May 12 17:08:41 2017 -0700

----------------------------------------------------------------------
 .../exec/store/hive/HiveTestDataGenerator.java  |   4 +-
 .../drill/exec/physical/base/AbstractBase.java  |   9 +-
 .../apache/drill/exec/util/TestUtilities.java   |  51 +++
 .../java/org/apache/drill/DrillTestWrapper.java |  35 +-
 .../apache/drill/exec/TestRepeatedReaders.java  |   2 +-
 .../drill/exec/cache/TestWriteToDisk.java       |   2 +-
 .../TestCorruptParquetDateCorrection.java       |   2 +-
 .../physical/impl/writer/TestParquetWriter.java |   2 +-
 .../writer/TestParquetWriterEmptyFiles.java     |   2 +-
 .../exec/physical/impl/writer/TestWriter.java   |   2 +-
 .../physical/unit/BasicPhysicalOpUnitTest.java  |   1 +
 .../physical/unit/MiniPlanUnitTestBase.java     | 442 +++++++++++++++++++
 .../physical/unit/PhysicalOpUnitTestBase.java   | 117 ++---
 .../drill/exec/physical/unit/TestMiniPlan.java  | 206 +++++++++
 .../exec/store/dfs/TestDrillFileSystem.java     |   2 +-
 .../parquet/TestParquetFilterPushDown.java      |   2 +-
 .../apache/drill/test/rowSet/SchemaBuilder.java |  13 +-
 17 files changed, 821 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 435c66b..580cf78 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -70,7 +70,7 @@ public class HiveTestDataGenerator {
     config.put("hive.metastore.uris", "");
     config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
     config.put("hive.metastore.warehouse.dir", whDir);
-    config.put(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    config.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
   }
 
   /**
@@ -115,7 +115,7 @@ public class HiveTestDataGenerator {
     HiveConf conf = new HiveConf(SessionState.class);
 
     conf.set("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
     conf.set("hive.metastore.warehouse.dir", whDir);
     conf.set("mapred.job.tracker", "local");
     conf.set(ConfVars.SCRATCHDIR.varname,  getTempDir("scratch_dir"));

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index 526d728..a547e26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -25,10 +25,13 @@ import com.google.common.base.Preconditions;
 public abstract class AbstractBase implements PhysicalOperator{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
 
-  private final String userName;
+  public static long INIT_ALLOCATION = 1_000_000L;
+  public static long MAX_ALLOCATION = 10_000_000_000L;
+
+  protected long initialAllocation = INIT_ALLOCATION;
+  protected long maxAllocation = MAX_ALLOCATION;
 
-  protected long initialAllocation = 1_000_000L;
-  protected long maxAllocation = 10_000_000_000L;
+  private final String userName;
   private int id;
   private double cost;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
index 3532956..0200dc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
@@ -18,14 +18,25 @@
 package org.apache.drill.exec.util;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 
 import com.google.common.io.Files;
+import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 
 /**
  * This class contains utility methods to speed up tests. Some of the production code currently calls this method
@@ -95,4 +106,44 @@ public class TestUtilities {
 
     pluginRegistry.createOrUpdate(dfsPluginName, dfsPluginConfig, true);
   }
+
+  /**
+   * Create JSONRecordReader from input strings.
+   * @param jsonBatches : list of input strings, each element represent a batch. Each string could either
+   *                    be in the form of "[{...}, {...}, ..., {...}]", or in the form of "{...}".
+   * @param fragContext : fragment context
+   * @param columnsToRead : list of schema pathes to read from JSON reader.
+   * @return
+   */
+  public static Iterator<RecordReader> getJsonReadersFromBatchString(List<String> jsonBatches, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
+    ObjectMapper mapper = new ObjectMapper();
+    List<RecordReader> readers = new ArrayList<>();
+    for (String batchJason : jsonBatches) {
+      JsonNode records;
+      try {
+        records = mapper.readTree(batchJason);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      readers.add(new JSONRecordReader(fragContext, records, null, columnsToRead));
+    }
+    return readers.iterator();
+  }
+
+  /**
+   * Create JSONRecordReader from files on a file system.
+   * @param fs : file system.
+   * @param inputPaths : list of .json file paths.
+   * @param fragContext
+   * @param columnsToRead
+   * @return
+   */
+  public static Iterator<RecordReader> getJsonReadersFromInputFiles(DrillFileSystem fs, List<String> inputPaths, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
+    List<RecordReader> readers = new ArrayList<>();
+    for (String inputPath : inputPaths) {
+      readers.add(new JSONRecordReader(fragContext, inputPath, fs, columnsToRead));
+    }
+    return readers.iterator();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index f217632..64aeef8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -117,9 +118,9 @@ public class DrillTestWrapper {
   private int expectedNumBatches;
 
   public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType,
-                          String baselineOptionSettingQueries, String testOptionSettingQueries,
-                          QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
-                          List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
+      String baselineOptionSettingQueries, String testOptionSettingQueries,
+      QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
+      List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
     this.testBuilder = testBuilder;
     this.services = services;
     this.query = query;
@@ -150,7 +151,7 @@ public class DrillTestWrapper {
   }
 
   private void compareHyperVectors(Map<String, HyperVectorValueIterator> expectedRecords,
-                                         Map<String, HyperVectorValueIterator> actualRecords) throws Exception {
+      Map<String, HyperVectorValueIterator> actualRecords) throws Exception {
     for (String s : expectedRecords.keySet()) {
       assertNotNull("Expected column '" + s + "' not found.", actualRecords.get(s));
       assertEquals(expectedRecords.get(s).getTotalRecords(), actualRecords.get(s).getTotalRecords());
@@ -224,7 +225,7 @@ public class DrillTestWrapper {
   }
 
   private Map<String, HyperVectorValueIterator> addToHyperVectorMap(final List<QueryDataBatch> records,
-                                                                    final RecordBatchLoader loader)
+      final RecordBatchLoader loader)
       throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
     Map<String, HyperVectorValueIterator> combinedVectors = new TreeMap<>();
@@ -307,12 +308,28 @@ public class DrillTestWrapper {
   }
 
   /**
+   * Iterate over batches, and combine the batches into a map, where key is schema path, and value is
+   * the list of column values across all the batches.
    * @param batches
    * @return
    * @throws SchemaChangeException
    * @throws UnsupportedEncodingException
    */
   public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
+      throws SchemaChangeException, UnsupportedEncodingException {
+    return addToCombinedVectorResults(batches, null);
+  }
+
+  /**
+   * Add to result vectors and compare batch schema against expected schema while iterating batches.
+   * @param batches
+   * @param  expectedSchema: the expected schema the batches should contain. Through SchemaChangeException
+   *                       if encounter different batch schema.
+   * @return
+   * @throws SchemaChangeException
+   * @throws UnsupportedEncodingException
+   */
+  public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema)
        throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
     Map<String, List<Object>> combinedVectors = new TreeMap<>();
@@ -320,6 +337,14 @@ public class DrillTestWrapper {
     long totalRecords = 0;
     BatchSchema schema = null;
     for (VectorAccessible loader : batches)  {
+      if (expectedSchema != null) {
+        if (! expectedSchema.equals(loader.getSchema())) {
+          throw new SchemaChangeException(String.format("Batch schema does not match expected schema\n" +
+                  "Actual schema: %s.  Expected schema : %s",
+              loader.getSchema(), expectedSchema));
+        }
+      }
+
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
       // SchemaChangeException, so check/clean throws clause above.
       if (schema == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java
index ddb67bb..be6fe79 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java
@@ -32,7 +32,7 @@ public class TestRepeatedReaders extends BaseTestQuery {
   @BeforeClass
   public static void initFs() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
 
     fs = FileSystem.get(conf);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
index a382a42..96dae6a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -97,7 +97,7 @@ public class TestWriteToDisk extends ExecTest {
             batch, context.getAllocator());
 
         Configuration conf = new Configuration();
-        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
 
         final VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(
             context.getAllocator());

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
index 8cd1a85..0c98eee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
@@ -112,7 +112,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
   @BeforeClass
   public static void initFs() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
     fs = FileSystem.get(conf);
     path = new Path(getDfsTestTmpSchemaLocation());
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 65e9c38..e5c6ce4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -120,7 +120,7 @@ public class TestParquetWriter extends BaseTestQuery {
   @BeforeClass
   public static void initFs() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
 
     fs = FileSystem.get(conf);
     test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
index 2848b68..d57605b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
@@ -33,7 +33,7 @@ public class TestParquetWriterEmptyFiles extends BaseTestQuery {
   @BeforeClass
   public static void initFs() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
 
     fs = FileSystem.get(conf);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
index f4d505d..5f306c6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
@@ -49,7 +49,7 @@ public class TestWriter extends BaseTestQuery {
   @BeforeClass
   public static void initFs() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
 
     fs = FileSystem.get(conf);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index a0ea6fe..e39a644 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -195,6 +195,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
         "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
     opTestBuilder()
         .physicalOperator(sortConf)
+        .maxAllocation(15_000_000L)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("a", "b")
         .baselineValues(5l, 1l)

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
new file mode 100644
index 0000000..302d0e5
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import mockit.NonStrictExpectations;
+import org.apache.drill.DrillTestWrapper;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.util.TestUtilities;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.drill.exec.physical.base.AbstractBase.INIT_ALLOCATION;
+import static org.apache.drill.exec.physical.base.AbstractBase.MAX_ALLOCATION;
+import static org.apache.drill.exec.physical.unit.TestMiniPlan.fs;
+
+/**
+ * A MiniPlanUnitTestBase extends PhysicalOpUnitTestBase, to construct MiniPlan (aka plan fragment).
+ * in the form of physical operator tree, and verify both the expected schema and output row results.
+ * Steps to construct a unit:
+ * 1. Call PopBuilder / ScanPopBuilder to construct the MiniPlan
+ * 2. Create a MiniPlanTestBuilder, and specify the expected schema and base line values, or if there
+ * is no batch expected.
+ */
+
+public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
+
+  private final ExecutorService scanExecutor =  Executors.newFixedThreadPool(2, new NamedThreadFactory("scan-"));
+
+  public static class MiniPlanTestBuilder {
+    protected List<Map<String, Object>> baselineRecords;
+    protected RecordBatch root;
+    protected boolean expectedZeroBatch;
+    protected BatchSchema expectedSchema;
+
+    /**
+     * Specify the root operator for a MiniPlan.
+     * @param root
+     * @return
+     */
+    public MiniPlanTestBuilder root(RecordBatch root) {
+      this.root = root;
+      return this;
+    }
+
+    /**
+     * Specify the expected batch schema.
+     * @param batchSchema
+     * @return
+     */
+    public MiniPlanTestBuilder expectedSchema(BatchSchema batchSchema) {
+      this.expectedSchema = batchSchema;
+      return this;
+    }
+
+    /**
+     * Specify one row of expected values. The number of values have to be same as # of fields in expected batch schema.
+     * @param baselineValues
+     * @return
+     */
+    public MiniPlanTestBuilder baselineValues(Object ... baselineValues) {
+      if (baselineRecords == null) {
+        baselineRecords = new ArrayList<>();
+      }
+
+      Map<String, Object> ret = new HashMap<>();
+      int i = 0;
+      Preconditions.checkArgument(expectedSchema != null , "Expected schema should be set before specify baseline values.");
+      Preconditions.checkArgument(baselineValues.length == expectedSchema.getFieldCount(),
+          "Must supply the same number of baseline values as columns in expected schema.");
+
+      for (MaterializedField field : expectedSchema) {
+        ret.put(SchemaPath.getSimplePath(field.getPath()).toExpr(), baselineValues[i]);
+        i++;
+      }
+
+      this.baselineRecords.add(ret);
+      return this;
+    }
+
+    /**
+     * Specify one special case, where the operator tree should return 0 batch.
+     * @param expectedZeroBatch
+     * @return
+     */
+    public MiniPlanTestBuilder expectZeroBatch(boolean expectedZeroBatch) {
+      this.expectedZeroBatch = expectedZeroBatch;
+      return this;
+    }
+
+    public void go() throws Exception {
+      final BatchIterator batchIterator = new BatchIterator(root);
+
+      // verify case of zero batch.
+      if (expectedZeroBatch) {
+        if (batchIterator.iterator().hasNext()) {
+          throw new AssertionError("Expected zero batches from scan. But scan return at least 1 batch!");
+        } else {
+          return; // successful
+        }
+      }
+
+      Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectedSchema);
+      Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+      DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+    }
+  }
+
+  /**
+   * Similar to {@link OperatorTestBuilder}, build a physical operator (RecordBatch) and specify its input record batches.
+   * The input record batch could be a non-scan operator by calling {@link PopBuilder#addInputAsChild},
+   * or a scan operator by calling {@link PopBuilder#addJsonScanAsChild()} if it's SCAN operator.
+   *
+   * A miniplan rooted as join operator like following could be constructed in either the following way:
+   *
+   * <pre><code>
+   *                 Join
+   *                /    \
+   *          JSON_T1    Filter
+   *                       \
+   *                     JSON_T2
+   * </code></pre>
+   *
+   * <pre><code>
+   * new PopBuilder()
+   *  .physicalOperator(joinPopConfig)
+   *  .addScanAsChild()
+   *      .fileSystem(..)
+   *      .columnsToRead(...)
+   *      .inputPath(...)
+   *      .buildAddAsInput()
+   *  .addInputAsChild()
+   *      .physicalOperator(filterPopConfig)
+   *      .addScanAsChild()
+   *          .fileSystem(...)
+   *          .columnsToRead(...)
+   *          .inputPath(...)
+   *          .buildAddAsInput()
+   *      .buildAddAsInput()
+   *  .build();
+   * </code></pre>
+   *
+   * <pre><code>
+   *   RecordBatch scan1 = new ScanPopBuilder()
+   *                          .fileSystem(...)
+   *                          .columnsToRead(..)
+   *                          .inputPath(...)
+   *                          .build();
+   *   RecordBatch scan2 = ... ;
+   *
+   *   RecordBatch filter = new PopBuilder()
+   *                          .physicalOperator(filterPopConfig)
+   *                          .addInput(scan2);
+   *   RecordBatch join = new PopBuilder()
+   *                          .physicalOperator(joinPopConfig)
+   *                          .addInput(scan1)
+   *                          .addInput(filter)
+   *                          .build();
+   *
+   * </pre></code>
+   */
+
+  public class PopBuilder  {
+    private PhysicalOperator popConfig;
+    protected long initReservation = INIT_ALLOCATION;
+    protected long maxAllocation = MAX_ALLOCATION;
+
+    final private List<RecordBatch> inputs = Lists.newArrayList();
+    final PopBuilder parent ;
+
+    public PopBuilder() {
+      this.parent = null;
+    }
+
+    public PopBuilder(PopBuilder parent) {
+      this.parent = parent;
+    }
+
+    public PopBuilder physicalOperator(PhysicalOperator popConfig) {
+      this.popConfig = popConfig;
+      return this;
+    }
+
+    /**
+     * Set initial memory reservation used by this operator's allocator. Default is {@link PhysicalOpUnitTestBase#INIT_ALLOCATION}
+     * @param initReservation
+     * @return
+     */
+    public PopBuilder initReservation(long initReservation) {
+      this.initReservation = initReservation;
+      return this;
+    }
+
+    /**
+     * Set max memory reservation used by this operator's allocator. Default is {@link PhysicalOpUnitTestBase#MAX_ALLOCATION}
+     * @param maxAllocation
+     * @return
+     */
+    public PopBuilder maxAllocation(long maxAllocation) {
+      this.maxAllocation = maxAllocation;
+      return this;
+    }
+
+    /**
+     * Return a ScanPopBuilder to build a Scan recordBatch, which will be added as input batch after
+     * call {@link PopBuilder#buildAddAsInput()}
+     * @return  ScanPopBuilder
+     */
+    public JsonScanBuilder addJsonScanAsChild() {
+      return  new JsonScanBuilder(this);
+    }
+
+    /**
+     * Return a ScanPopBuilder to build a Scan recordBatch, which will be added as input batch after
+     * call {@link PopBuilder#buildAddAsInput()}
+     * @return  ScanPopBuilder
+     */
+    public ParquetScanBuilder addParquetScanAsChild() {
+      return  new ParquetScanBuilder(this);
+    }
+
+    /**
+     * Return a nested PopBuilder to build a non-scan recordBatch, which will be added as input batch after
+     * call {@link PopBuilder#buildAddAsInput()}
+     * @return a nested PopBuild for non-scan recordbatch.
+     */
+    public PopBuilder addInputAsChild() {
+      return  new PopBuilder(this) {
+      };
+    }
+
+    public PopBuilder addInput(RecordBatch batch) {
+      inputs.add(batch);
+      return this;
+    }
+
+    public PopBuilder buildAddAsInput() throws Exception {
+      mockOpContext(initReservation, maxAllocation);
+      BatchCreator<PhysicalOperator> opCreator =  (BatchCreator<PhysicalOperator>) getOpCreatorReg().getOperatorCreator(popConfig.getClass());
+      RecordBatch batch= opCreator.getBatch(fragContext, popConfig, inputs);
+      return parent.addInput(batch);
+    }
+
+    public RecordBatch build() throws Exception {
+      mockOpContext(initReservation, maxAllocation);
+      BatchCreator<PhysicalOperator> opCreator =  (BatchCreator<PhysicalOperator>) getOpCreatorReg().getOperatorCreator(popConfig.getClass());
+      return opCreator.getBatch(fragContext, popConfig, inputs);
+    }
+  }
+
+  public abstract class ScanPopBuider<T extends ScanPopBuider> extends PopBuilder {
+    List<SchemaPath> columnsToRead = Collections.singletonList(SchemaPath.getSimplePath("*"));
+    DrillFileSystem fs = null;
+
+    public ScanPopBuider() {
+      super(null); // Scan is root operator.
+    }
+
+    public ScanPopBuider(PopBuilder parent) {
+      super(parent);
+    }
+
+    public T fileSystem(DrillFileSystem fs) {
+      this.fs = fs;
+      return (T) this;
+    }
+
+    public T columnsToRead(SchemaPath ... columnsToRead) {
+      this.columnsToRead = Lists.newArrayList(columnsToRead);
+      return (T) this;
+    }
+
+    public T columnsToRead(String ... columnsToRead) {
+      this.columnsToRead = Lists.newArrayList();
+
+      for (String column : columnsToRead) {
+
+        this.columnsToRead.add(SchemaPath.getSimplePath(column));
+      }
+      return (T) this;
+    }
+
+  }
+
+  /**
+   * Builder for Json Scan RecordBatch.
+   */
+  public class JsonScanBuilder extends ScanPopBuider<JsonScanBuilder> {
+    List<String> jsonBatches = null;
+    List<String> inputPaths = Collections.EMPTY_LIST;
+
+    public JsonScanBuilder(PopBuilder parent) {
+      super(parent);
+    }
+
+    public JsonScanBuilder() {
+      super();
+    }
+
+    public JsonScanBuilder jsonBatches(List<String> jsonBatches) {
+      this.jsonBatches = jsonBatches;
+      return this;
+    }
+
+    public JsonScanBuilder inputPaths(List<String> inputPaths) {
+      this.inputPaths = inputPaths;
+      return this;
+    }
+
+    public PopBuilder buildAddAsInput() throws Exception {
+      mockOpContext(this.initReservation, this.maxAllocation);
+      RecordBatch scanBatch = getScanBatch();
+      return parent.addInput(scanBatch);
+    }
+
+    public RecordBatch build() throws Exception {
+      mockOpContext(this.initReservation, this.maxAllocation);
+      return getScanBatch();
+    }
+
+    private RecordBatch getScanBatch() throws Exception {
+      Iterator<RecordReader> readers = null;
+
+      if (jsonBatches != null) {
+        readers = TestUtilities.getJsonReadersFromBatchString(jsonBatches, fragContext, columnsToRead);
+      } else {
+        readers = TestUtilities.getJsonReadersFromInputFiles(fs, inputPaths, fragContext, columnsToRead);
+      }
+
+      RecordBatch scanBatch = new ScanBatch(null, fragContext, readers);
+      return scanBatch;
+    }
+  }
+
+  /**
+   * Builder for parquet Scan RecordBatch.
+   */
+  public class ParquetScanBuilder extends ScanPopBuider<ParquetScanBuilder> {
+    List<String> inputPaths = Collections.EMPTY_LIST;
+
+    public ParquetScanBuilder() {
+      super();
+    }
+
+    public ParquetScanBuilder(PopBuilder parent) {
+      super(parent);
+    }
+
+    public ParquetScanBuilder inputPaths(List<String> inputPaths) {
+      this.inputPaths = inputPaths;
+      return this;
+    }
+
+    public PopBuilder buildAddAsInput() throws Exception {
+      mockOpContext(this.initReservation, this.maxAllocation);
+      RecordBatch scanBatch = getScanBatch();
+      return parent.addInput(scanBatch);
+    }
+
+    public RecordBatch build() throws Exception {
+      mockOpContext(this.initReservation, this.maxAllocation);
+      return getScanBatch();
+    }
+
+    private RecordBatch getScanBatch() throws Exception {
+      List<RecordReader> readers = Lists.newArrayList();
+
+      for (String path : inputPaths) {
+        ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), new Path(path));
+
+        for (int i = 0; i < footer.getBlocks().size(); i++) {
+          readers.add(new ParquetRecordReader(fragContext,
+              path,
+              i,
+              fs,
+              CodecFactory.createDirectCodecFactory(fs.getConf(),
+                  new ParquetDirectByteBufferAllocator(opContext.getAllocator()), 0),
+              footer,
+              columnsToRead,
+              ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION));
+        }
+      }
+
+      RecordBatch scanBatch = new ScanBatch(null, fragContext, readers.iterator());
+      return scanBatch;
+    }
+  } // end of ParquetScanBuilder
+
+  @Override
+  protected void mockOpContext(long initReservation, long maxAllocation) throws Exception {
+    super.mockOpContext(initReservation, maxAllocation);
+
+    // mock ScanExecutor used by parquet reader.
+    new NonStrictExpectations() {
+      {
+        opContext.getScanExecutor();result = scanExecutor;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index ad7367f..7d09ca5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.physical.unit;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import mockit.Delegate;
@@ -56,6 +54,7 @@ import org.apache.drill.exec.ops.BufferManagerImpl;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
@@ -66,8 +65,9 @@ import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.TestUtilities;
+import org.junit.Before;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -78,10 +78,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.drill.exec.physical.base.AbstractBase.INIT_ALLOCATION;
+
 /**
  * Look! Doesn't extend BaseTestQuery!!
  */
 public class PhysicalOpUnitTestBase extends ExecTest {
+//  public static long INIT_ALLOCATION = 1_000_000l;
+//  public static long MAX_ALLOCATION = 10_000_000L;
 
   @Injectable FragmentContext fragContext;
   @Injectable OperatorContext opContext;
@@ -97,6 +101,11 @@ public class PhysicalOpUnitTestBase extends ExecTest {
   private final TemplateClassDefinition<Projector> templateClassDefinition = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class);
   private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
 
+  @Before
+  public void setup() throws Exception {
+    mockFragmentContext();
+  }
+
   @Override
   protected LogicalExpression parseExpr(String expr) {
     ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
@@ -128,37 +137,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     return ret;
   }
 
-  @SuppressWarnings("resource")
-  void runTest(OperatorTestBuilder testBuilder) {
-    BatchCreator<PhysicalOperator> opCreator;
-    RecordBatch testOperator;
-    try {
-      mockFragmentContext(testBuilder.initReservation, testBuilder.maxAllocation);
-      opCreator = (BatchCreator<PhysicalOperator>)
-          opCreatorReg.getOperatorCreator(testBuilder.popConfig.getClass());
-       List<RecordBatch> incomingStreams = Lists.newArrayList();
-       for (List<String> batchesJson : testBuilder.inputStreamsJSON) {
-         incomingStreams.add(new ScanBatch(null, fragContext,
-             getRecordReadersForJsonBatches(batchesJson, fragContext)));
-       }
-       testOperator = opCreator.getBatch(fragContext, testBuilder.popConfig, incomingStreams);
-
-      Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator));
-      Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(testBuilder.baselineRecords);
-      DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
-
-    } catch (ExecutionSetupException e) {
-      throw new RuntimeException(e);
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    } catch (SchemaChangeException e) {
-      throw new RuntimeException(e);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static class BatchIterator implements Iterable<VectorAccessible> {
+  protected static class BatchIterator implements Iterable<VectorAccessible> {
 
     private RecordBatch operator;
     public BatchIterator(RecordBatch operator) {
@@ -213,11 +192,40 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     private String[] baselineColumns;
     private List<Map<String, Object>> baselineRecords;
     private List<List<String>> inputStreamsJSON;
-    private long initReservation = 10000000;
-    private long maxAllocation = 15000000;
+    private long initReservation = AbstractBase.INIT_ALLOCATION;
+    private long maxAllocation = AbstractBase.MAX_ALLOCATION;
 
     public void go() {
-      runTest(this);
+      BatchCreator<PhysicalOperator> opCreator;
+      RecordBatch testOperator;
+      try {
+        mockOpContext(initReservation, maxAllocation);
+
+        opCreator = (BatchCreator<PhysicalOperator>)
+            opCreatorReg.getOperatorCreator(popConfig.getClass());
+        List<RecordBatch> incomingStreams = Lists.newArrayList();
+        if (inputStreamsJSON != null) {
+          for (List<String> batchesJson : inputStreamsJSON) {
+            incomingStreams.add(new ScanBatch(null, fragContext,
+                getRecordReadersForJsonBatches(batchesJson, fragContext)));
+          }
+        }
+
+        testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams);
+
+        Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator));
+        Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+        DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+
+      } catch (ExecutionSetupException e) {
+        throw new RuntimeException(e);
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      } catch (SchemaChangeException e) {
+        throw new RuntimeException(e);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
     }
 
     public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
@@ -276,9 +284,9 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     }
   }
 
-  private void mockFragmentContext(long initReservation, long maxAllocation) throws Exception{
+  protected void mockFragmentContext() throws Exception{
     final CodeCompiler compiler = new CodeCompiler(drillConf, optionManager);
-    final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation);
+//    final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation);
     new NonStrictExpectations() {
       {
 //        optManager.getOption(withAny(new TypeValidators.BooleanValidator("", false))); result = false;
@@ -320,27 +328,28 @@ public class PhysicalOpUnitTestBase extends ExecTest {
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
+      }
+    };
+  }
+
+  protected void mockOpContext(long initReservation, long maxAllocation) throws Exception{
+    final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation);
+    new NonStrictExpectations() {
+      {
         opContext.getStats();result = opStats;
         opContext.getAllocator(); result = allocator;
-        fragContext.newOperatorContext(withAny(popConf));
-        result = opContext;
+        fragContext.newOperatorContext(withAny(popConf));result = opContext;
       }
     };
   }
 
-  @SuppressWarnings("resource")
+  protected OperatorCreatorRegistry getOpCreatorReg() {
+    return opCreatorReg;
+  }
+
   private Iterator<RecordReader> getRecordReadersForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) {
-    ObjectMapper mapper = new ObjectMapper();
-    List<RecordReader> readers = new ArrayList<>();
-    for (String batchJason : jsonBatches) {
-      JsonNode records;
-      try {
-        records = mapper.readTree(batchJason);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      readers.add(new JSONRecordReader(fragContext, records, null, Collections.singletonList(SchemaPath.getSimplePath("*"))));
-    }
-    return readers.iterator();
+    return TestUtilities.getJsonReadersFromBatchString(jsonBatches, fragContext, Collections.singletonList(SchemaPath.getSimplePath("*")));
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
new file mode 100644
index 0000000..d0a64f4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.UnionAll;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class contains examples to show how to use MiniPlanTestBuilder to test a
+ * specific plan fragment (MiniPlan). Each testcase requires 1) a RecordBatch,
+ * built from PopBuilder/ScanBuilder, 2)an expected schema and base line values,
+ * or 3) indicating no batch is expected.
+ */
+public class TestMiniPlan extends MiniPlanUnitTestBase {
+
+  protected static DrillFileSystem fs;
+
+  @BeforeClass
+  public static void initFS() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+    fs = new DrillFileSystem(conf);
+  }
+
+  @Test
+  @Ignore("DRILL-5464: A bug in JsonRecordReader handling empty file")
+  public void testEmptyJsonInput() throws Exception {
+    String emptyFile = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString();
+
+    RecordBatch scanBatch = new JsonScanBuilder()
+        .fileSystem(fs)
+        .inputPaths(Lists.newArrayList(emptyFile))
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(scanBatch)
+        .expectZeroBatch(true)
+        .go();
+  }
+
+  @Test
+  public void testSimpleParquetScan() throws Exception {
+    String file = FileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString();
+
+    RecordBatch scanBatch = new ParquetScanBuilder()
+        .fileSystem(fs)
+        .columnsToRead("R_REGIONKEY")
+        .inputPaths(Lists.newArrayList(file))
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("R_REGIONKEY", TypeProtos.MinorType.BIGINT)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(scanBatch)
+        .expectedSchema(expectedSchema)
+        .baselineValues(0L)
+        .baselineValues(1L)
+        .go();
+  }
+
+  @Test
+  public void testSimpleJson() throws Exception {
+    List<String> jsonBatches = Lists.newArrayList(
+        "{\"a\":100}"
+    );
+
+    RecordBatch scanBatch = new JsonScanBuilder()
+        .jsonBatches(jsonBatches)
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("a", TypeProtos.MinorType.BIGINT)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(scanBatch)
+        .expectedSchema(expectedSchema)
+        .baselineValues(100L)
+        .go();
+  }
+
+  @Test
+  public void testUnionFilter() throws Exception {
+    List<String> leftJsonBatches = Lists.newArrayList(
+        "[{\"a\": 5, \"b\" : 1 }]",
+        "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
+        "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
+
+    List<String> rightJsonBatches = Lists.newArrayList(
+        "[{\"a\": 5, \"b\" : 10 }]",
+        "[{\"a\": 50, \"b\" : 100}]");
+
+    RecordBatch batch = new PopBuilder()
+        .physicalOperator(new UnionAll(Collections.EMPTY_LIST)) // Children list is provided through RecordBatch
+        .addInputAsChild()
+          .physicalOperator(new Filter(null, parseExpr("a=5"), 1.0f))
+          .addJsonScanAsChild()
+            .jsonBatches(leftJsonBatches)
+            .columnsToRead("a", "b")
+            .buildAddAsInput()
+          .buildAddAsInput()
+        .addInputAsChild()
+          .physicalOperator(new Filter(null, parseExpr("a=50"), 1.0f))
+          .addJsonScanAsChild()
+            .jsonBatches(rightJsonBatches)
+            .columnsToRead("a", "b")
+            .buildAddAsInput()
+          .buildAddAsInput()
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("a", TypeProtos.MinorType.BIGINT)
+        .addNullable("b", TypeProtos.MinorType.BIGINT)
+        .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(batch)
+        .expectedSchema(expectedSchema)
+        .baselineValues(5l, 1l)
+        .baselineValues(5l, 5l)
+        .baselineValues(50l, 100l)
+        .go();
+  }
+
+  @Test
+  @Ignore ("DRILL-5327: A bug in UnionAll handling empty inputs from both sides")
+  public void testUnionFilterAll() throws Exception {
+    List<String> leftJsonBatches = Lists.newArrayList(
+        "[{\"a\": 5, \"b\" : 1 }]");
+
+    List<String> rightJsonBatches = Lists.newArrayList(
+        "[{\"a\": 50, \"b\" : 10 }]");
+
+    RecordBatch leftScan = new JsonScanBuilder()
+        .jsonBatches(leftJsonBatches)
+        .columnsToRead("a", "b")
+        .build();
+
+    RecordBatch leftFilter = new PopBuilder()
+        .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f))
+        .addInput(leftScan)
+        .build();
+
+    RecordBatch rightScan = new JsonScanBuilder()
+        .jsonBatches(rightJsonBatches)
+        .columnsToRead("a", "b")
+        .build();
+
+    RecordBatch rightFilter = new PopBuilder()
+        .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f))
+        .addInput(rightScan)
+        .build();
+
+    RecordBatch batch = new PopBuilder()
+        .physicalOperator(new UnionAll(Collections.EMPTY_LIST)) // Children list is provided through RecordBatch
+        .addInput(leftFilter)
+        .addInput(rightFilter)
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("a", TypeProtos.MinorType.BIGINT)
+        .addNullable("b", TypeProtos.MinorType.BIGINT)
+        .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(batch)
+        .expectedSchema(expectedSchema)
+        .go();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
index 550f56f..7d66795 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
@@ -66,7 +66,7 @@ public class TestDrillFileSystem {
     DrillFileSystem dfs = null;
     InputStream is = null;
     Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
     OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/);
     OperatorStats stats = new OperatorStats(profileDef, null /*allocator*/);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index 782973d..277e6f9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -52,7 +52,7 @@ public class TestParquetFilterPushDown extends PlanTestBase {
         BitControl.PlanFragment.getDefaultInstance(), null, bits[0].getContext().getFunctionImplementationRegistry());
 
     Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
 
     fs = FileSystem.get(conf);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
index 48657c7..b946ab9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
@@ -84,9 +84,15 @@ public class SchemaBuilder {
       parent.finishMap(col);
       return parent;
     }
+
+    @Override
+    public SchemaBuilder withSVMode(SelectionVectorMode svMode) {
+      throw new IllegalStateException("Cannot set SVMode for a nested schema");
+    }
   }
 
   protected List<MaterializedField> columns = new ArrayList<>( );
+  private SelectionVectorMode svMode = SelectionVectorMode.NONE;
 
   public SchemaBuilder() { }
 
@@ -128,8 +134,13 @@ public class SchemaBuilder {
     return new MapBuilder(this, pathName);
   }
 
+  public SchemaBuilder withSVMode(SelectionVectorMode svMode) {
+    this.svMode = svMode;
+    return this;
+  }
+
   public BatchSchema build() {
-    return new BatchSchema(SelectionVectorMode.NONE, columns);
+    return new BatchSchema(svMode, columns);
   }
 
   void finishMap(MaterializedField map) {


Mime
View raw message