DRILL-5459: Extend physical operator test framework to test mini plans consisting of multiple operators.
This closes #823
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0dc237e3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0dc237e3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0dc237e3
Branch: refs/heads/master
Commit: 0dc237e3161cf284212cc63f740b229d4fee8fdf
Parents: cb9547a
Author: Jinfeng Ni <jni@apache.org>
Authored: Fri Apr 21 17:34:15 2017 -0700
Committer: Parth Chandra <pchandra@maprtech.com>
Committed: Fri May 12 17:08:41 2017 -0700
----------------------------------------------------------------------
.../exec/store/hive/HiveTestDataGenerator.java | 4 +-
.../drill/exec/physical/base/AbstractBase.java | 9 +-
.../apache/drill/exec/util/TestUtilities.java | 51 +++
.../java/org/apache/drill/DrillTestWrapper.java | 35 +-
.../apache/drill/exec/TestRepeatedReaders.java | 2 +-
.../drill/exec/cache/TestWriteToDisk.java | 2 +-
.../TestCorruptParquetDateCorrection.java | 2 +-
.../physical/impl/writer/TestParquetWriter.java | 2 +-
.../writer/TestParquetWriterEmptyFiles.java | 2 +-
.../exec/physical/impl/writer/TestWriter.java | 2 +-
.../physical/unit/BasicPhysicalOpUnitTest.java | 1 +
.../physical/unit/MiniPlanUnitTestBase.java | 442 +++++++++++++++++++
.../physical/unit/PhysicalOpUnitTestBase.java | 117 ++---
.../drill/exec/physical/unit/TestMiniPlan.java | 206 +++++++++
.../exec/store/dfs/TestDrillFileSystem.java | 2 +-
.../parquet/TestParquetFilterPushDown.java | 2 +-
.../apache/drill/test/rowSet/SchemaBuilder.java | 13 +-
17 files changed, 821 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 435c66b..580cf78 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -70,7 +70,7 @@ public class HiveTestDataGenerator {
config.put("hive.metastore.uris", "");
config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
config.put("hive.metastore.warehouse.dir", whDir);
- config.put(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+ config.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
}
/**
@@ -115,7 +115,7 @@ public class HiveTestDataGenerator {
HiveConf conf = new HiveConf(SessionState.class);
conf.set("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
conf.set("hive.metastore.warehouse.dir", whDir);
conf.set("mapred.job.tracker", "local");
conf.set(ConfVars.SCRATCHDIR.varname, getTempDir("scratch_dir"));
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index 526d728..a547e26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -25,10 +25,13 @@ import com.google.common.base.Preconditions;
public abstract class AbstractBase implements PhysicalOperator{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
- private final String userName;
+ public static long INIT_ALLOCATION = 1_000_000L;
+ public static long MAX_ALLOCATION = 10_000_000_000L;
+
+ protected long initialAllocation = INIT_ALLOCATION;
+ protected long maxAllocation = MAX_ALLOCATION;
- protected long initialAllocation = 1_000_000L;
- protected long maxAllocation = 10_000_000_000L;
+ private final String userName;
private int id;
private double cost;
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
index 3532956..0200dc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
@@ -18,14 +18,25 @@
package org.apache.drill.exec.util;
import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;
import com.google.common.io.Files;
+import org.apache.drill.exec.store.easy.json.JSONRecordReader;
/**
* This class contains utility methods to speed up tests. Some of the production code currently calls this method
@@ -95,4 +106,44 @@ public class TestUtilities {
pluginRegistry.createOrUpdate(dfsPluginName, dfsPluginConfig, true);
}
+
+ /**
+ * Create JSONRecordReader from input strings.
+ * @param jsonBatches : list of input strings, each element represent a batch. Each string could either
+ * be in the form of "[{...}, {...}, ..., {...}]", or in the form of "{...}".
+ * @param fragContext : fragment context
+ * @param columnsToRead : list of schema pathes to read from JSON reader.
+ * @return
+ */
+ public static Iterator<RecordReader> getJsonReadersFromBatchString(List<String> jsonBatches, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
+ ObjectMapper mapper = new ObjectMapper();
+ List<RecordReader> readers = new ArrayList<>();
+ for (String batchJason : jsonBatches) {
+ JsonNode records;
+ try {
+ records = mapper.readTree(batchJason);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ readers.add(new JSONRecordReader(fragContext, records, null, columnsToRead));
+ }
+ return readers.iterator();
+ }
+
+ /**
+ * Create JSONRecordReader from files on a file system.
+ * @param fs : file system.
+ * @param inputPaths : list of .json file paths.
+ * @param fragContext
+ * @param columnsToRead
+ * @return
+ */
+ public static Iterator<RecordReader> getJsonReadersFromInputFiles(DrillFileSystem fs, List<String> inputPaths, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
+ List<RecordReader> readers = new ArrayList<>();
+ for (String inputPath : inputPaths) {
+ readers.add(new JSONRecordReader(fragContext, inputPath, fs, columnsToRead));
+ }
+ return readers.iterator();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index f217632..64aeef8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
@@ -117,9 +118,9 @@ public class DrillTestWrapper {
private int expectedNumBatches;
public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType,
- String baselineOptionSettingQueries, String testOptionSettingQueries,
- QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
- List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
+ String baselineOptionSettingQueries, String testOptionSettingQueries,
+ QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
+ List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
this.testBuilder = testBuilder;
this.services = services;
this.query = query;
@@ -150,7 +151,7 @@ public class DrillTestWrapper {
}
private void compareHyperVectors(Map<String, HyperVectorValueIterator> expectedRecords,
- Map<String, HyperVectorValueIterator> actualRecords) throws Exception {
+ Map<String, HyperVectorValueIterator> actualRecords) throws Exception {
for (String s : expectedRecords.keySet()) {
assertNotNull("Expected column '" + s + "' not found.", actualRecords.get(s));
assertEquals(expectedRecords.get(s).getTotalRecords(), actualRecords.get(s).getTotalRecords());
@@ -224,7 +225,7 @@ public class DrillTestWrapper {
}
private Map<String, HyperVectorValueIterator> addToHyperVectorMap(final List<QueryDataBatch> records,
- final RecordBatchLoader loader)
+ final RecordBatchLoader loader)
throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
Map<String, HyperVectorValueIterator> combinedVectors = new TreeMap<>();
@@ -307,12 +308,28 @@ public class DrillTestWrapper {
}
/**
+ * Iterate over batches, and combine the batches into a map, where key is schema path, and value is
+ * the list of column values across all the batches.
* @param batches
* @return
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
+ throws SchemaChangeException, UnsupportedEncodingException {
+ return addToCombinedVectorResults(batches, null);
+ }
+
+ /**
+ * Add to result vectors and compare batch schema against expected schema while iterating batches.
+ * @param batches
+ * @param expectedSchema: the expected schema the batches should contain. Through SchemaChangeException
+ * if encounter different batch schema.
+ * @return
+ * @throws SchemaChangeException
+ * @throws UnsupportedEncodingException
+ */
+ public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema)
throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
Map<String, List<Object>> combinedVectors = new TreeMap<>();
@@ -320,6 +337,14 @@ public class DrillTestWrapper {
long totalRecords = 0;
BatchSchema schema = null;
for (VectorAccessible loader : batches) {
+ if (expectedSchema != null) {
+ if (! expectedSchema.equals(loader.getSchema())) {
+ throw new SchemaChangeException(String.format("Batch schema does not match expected schema\n" +
+ "Actual schema: %s. Expected schema : %s",
+ loader.getSchema(), expectedSchema));
+ }
+ }
+
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean throws clause above.
if (schema == null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java
index ddb67bb..be6fe79 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestRepeatedReaders.java
@@ -32,7 +32,7 @@ public class TestRepeatedReaders extends BaseTestQuery {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
fs = FileSystem.get(conf);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
index a382a42..96dae6a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -97,7 +97,7 @@ public class TestWriteToDisk extends ExecTest {
batch, context.getAllocator());
Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
final VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(
context.getAllocator());
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
index 8cd1a85..0c98eee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
@@ -112,7 +112,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
fs = FileSystem.get(conf);
path = new Path(getDfsTestTmpSchemaLocation());
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 65e9c38..e5c6ce4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -120,7 +120,7 @@ public class TestParquetWriter extends BaseTestQuery {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
fs = FileSystem.get(conf);
test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
index 2848b68..d57605b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
@@ -33,7 +33,7 @@ public class TestParquetWriterEmptyFiles extends BaseTestQuery {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
fs = FileSystem.get(conf);
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
index f4d505d..5f306c6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
@@ -49,7 +49,7 @@ public class TestWriter extends BaseTestQuery {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
fs = FileSystem.get(conf);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index a0ea6fe..e39a644 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -195,6 +195,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
"[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
opTestBuilder()
.physicalOperator(sortConf)
+ .maxAllocation(15_000_000L)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
.baselineValues(5l, 1l)
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
new file mode 100644
index 0000000..302d0e5
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import mockit.NonStrictExpectations;
+import org.apache.drill.DrillTestWrapper;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.util.TestUtilities;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.drill.exec.physical.base.AbstractBase.INIT_ALLOCATION;
+import static org.apache.drill.exec.physical.base.AbstractBase.MAX_ALLOCATION;
+import static org.apache.drill.exec.physical.unit.TestMiniPlan.fs;
+
+/**
+ * A MiniPlanUnitTestBase extends PhysicalOpUnitTestBase, to construct MiniPlan (aka plan fragment).
+ * in the form of physical operator tree, and verify both the expected schema and output row results.
+ * Steps to construct a unit:
+ * 1. Call PopBuilder / ScanPopBuilder to construct the MiniPlan
+ * 2. Create a MiniPlanTestBuilder, and specify the expected schema and base line values, or if there
+ * is no batch expected.
+ */
+
+public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
+
+ private final ExecutorService scanExecutor = Executors.newFixedThreadPool(2, new NamedThreadFactory("scan-"));
+
+ public static class MiniPlanTestBuilder {
+ protected List<Map<String, Object>> baselineRecords;
+ protected RecordBatch root;
+ protected boolean expectedZeroBatch;
+ protected BatchSchema expectedSchema;
+
+ /**
+ * Specify the root operator for a MiniPlan.
+ * @param root
+ * @return
+ */
+ public MiniPlanTestBuilder root(RecordBatch root) {
+ this.root = root;
+ return this;
+ }
+
+ /**
+ * Specify the expected batch schema.
+ * @param batchSchema
+ * @return
+ */
+ public MiniPlanTestBuilder expectedSchema(BatchSchema batchSchema) {
+ this.expectedSchema = batchSchema;
+ return this;
+ }
+
+ /**
+ * Specify one row of expected values. The number of values have to be same as # of fields in expected batch schema.
+ * @param baselineValues
+ * @return
+ */
+ public MiniPlanTestBuilder baselineValues(Object ... baselineValues) {
+ if (baselineRecords == null) {
+ baselineRecords = new ArrayList<>();
+ }
+
+ Map<String, Object> ret = new HashMap<>();
+ int i = 0;
+ Preconditions.checkArgument(expectedSchema != null , "Expected schema should be set before specify baseline values.");
+ Preconditions.checkArgument(baselineValues.length == expectedSchema.getFieldCount(),
+ "Must supply the same number of baseline values as columns in expected schema.");
+
+ for (MaterializedField field : expectedSchema) {
+ ret.put(SchemaPath.getSimplePath(field.getPath()).toExpr(), baselineValues[i]);
+ i++;
+ }
+
+ this.baselineRecords.add(ret);
+ return this;
+ }
+
+ /**
+ * Specify one special case, where the operator tree should return 0 batch.
+ * @param expectedZeroBatch
+ * @return
+ */
+ public MiniPlanTestBuilder expectZeroBatch(boolean expectedZeroBatch) {
+ this.expectedZeroBatch = expectedZeroBatch;
+ return this;
+ }
+
+ public void go() throws Exception {
+ final BatchIterator batchIterator = new BatchIterator(root);
+
+ // verify case of zero batch.
+ if (expectedZeroBatch) {
+ if (batchIterator.iterator().hasNext()) {
+ throw new AssertionError("Expected zero batches from scan. But scan return at least 1 batch!");
+ } else {
+ return; // successful
+ }
+ }
+
+ Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectedSchema);
+ Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+ DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+ }
+ }
+
+ /**
+ * Similar to {@link OperatorTestBuilder}, build a physical operator (RecordBatch) and specify its input record batches.
+ * The input record batch could be a non-scan operator by calling {@link PopBuilder#addInputAsChild},
+ * or a scan operator by calling {@link PopBuilder#addJsonScanAsChild()} if it's SCAN operator.
+ *
+ * A miniplan rooted as join operator like following could be constructed in either the following way:
+ *
+ * <pre><code>
+ * Join
+ * / \
+ * JSON_T1 Filter
+ * \
+ * JSON_T2
+ * </code></pre>
+ *
+ * <pre><code>
+ * new PopBuilder()
+ * .physicalOperator(joinPopConfig)
+ * .addScanAsChild()
+ * .fileSystem(..)
+ * .columnsToRead(...)
+ * .inputPath(...)
+ * .buildAddAsInput()
+ * .addInputAsChild()
+ * .physicalOperator(filterPopConfig)
+ * .addScanAsChild()
+ * .fileSystem(...)
+ * .columnsToRead(...)
+ * .inputPath(...)
+ * .buildAddAsInput()
+ * .buildAddAsInput()
+ * .build();
+ * </code></pre>
+ *
+ * <pre><code>
+ * RecordBatch scan1 = new ScanPopBuilder()
+ * .fileSystem(...)
+ * .columnsToRead(..)
+ * .inputPath(...)
+ * .build();
+ * RecordBatch scan2 = ... ;
+ *
+ * RecordBatch filter = new PopBuilder()
+ * .physicalOperator(filterPopConfig)
+ * .addInput(scan2);
+ * RecordBatch join = new PopBuilder()
+ * .physicalOperator(joinPopConfig)
+ * .addInput(scan1)
+ * .addInput(filter)
+ * .build();
+ *
+ * </pre></code>
+ */
+
+ public class PopBuilder {
+ private PhysicalOperator popConfig;
+ protected long initReservation = INIT_ALLOCATION;
+ protected long maxAllocation = MAX_ALLOCATION;
+
+ final private List<RecordBatch> inputs = Lists.newArrayList();
+ final PopBuilder parent ;
+
+ public PopBuilder() {
+ this.parent = null;
+ }
+
+ public PopBuilder(PopBuilder parent) {
+ this.parent = parent;
+ }
+
+ public PopBuilder physicalOperator(PhysicalOperator popConfig) {
+ this.popConfig = popConfig;
+ return this;
+ }
+
+ /**
+ * Set initial memory reservation used by this operator's allocator. Default is {@link PhysicalOpUnitTestBase#INIT_ALLOCATION}
+ * @param initReservation
+ * @return
+ */
+ public PopBuilder initReservation(long initReservation) {
+ this.initReservation = initReservation;
+ return this;
+ }
+
+ /**
+ * Set max memory reservation used by this operator's allocator. Default is {@link PhysicalOpUnitTestBase#MAX_ALLOCATION}
+ * @param maxAllocation
+ * @return
+ */
+ public PopBuilder maxAllocation(long maxAllocation) {
+ this.maxAllocation = maxAllocation;
+ return this;
+ }
+
+ /**
+ * Return a ScanPopBuilder to build a Scan recordBatch, which will be added as input batch after
+ * call {@link PopBuilder#buildAddAsInput()}
+ * @return ScanPopBuilder
+ */
+ public JsonScanBuilder addJsonScanAsChild() {
+ return new JsonScanBuilder(this);
+ }
+
+ /**
+ * Return a ScanPopBuilder to build a Scan recordBatch, which will be added as input batch after
+ * call {@link PopBuilder#buildAddAsInput()}
+ * @return ScanPopBuilder
+ */
+ public ParquetScanBuilder addParquetScanAsChild() {
+ return new ParquetScanBuilder(this);
+ }
+
+ /**
+ * Return a nested PopBuilder to build a non-scan recordBatch, which will be added as input batch after
+ * call {@link PopBuilder#buildAddAsInput()}
+ * @return a nested PopBuild for non-scan recordbatch.
+ */
+ public PopBuilder addInputAsChild() {
+ return new PopBuilder(this) {
+ };
+ }
+
+ public PopBuilder addInput(RecordBatch batch) {
+ inputs.add(batch);
+ return this;
+ }
+
+ public PopBuilder buildAddAsInput() throws Exception {
+ mockOpContext(initReservation, maxAllocation);
+ BatchCreator<PhysicalOperator> opCreator = (BatchCreator<PhysicalOperator>) getOpCreatorReg().getOperatorCreator(popConfig.getClass());
+ RecordBatch batch= opCreator.getBatch(fragContext, popConfig, inputs);
+ return parent.addInput(batch);
+ }
+
+ public RecordBatch build() throws Exception {
+ mockOpContext(initReservation, maxAllocation);
+ BatchCreator<PhysicalOperator> opCreator = (BatchCreator<PhysicalOperator>) getOpCreatorReg().getOperatorCreator(popConfig.getClass());
+ return opCreator.getBatch(fragContext, popConfig, inputs);
+ }
+ }
+
+ public abstract class ScanPopBuider<T extends ScanPopBuider> extends PopBuilder {
+ List<SchemaPath> columnsToRead = Collections.singletonList(SchemaPath.getSimplePath("*"));
+ DrillFileSystem fs = null;
+
+ public ScanPopBuider() {
+ super(null); // Scan is root operator.
+ }
+
+ public ScanPopBuider(PopBuilder parent) {
+ super(parent);
+ }
+
+ public T fileSystem(DrillFileSystem fs) {
+ this.fs = fs;
+ return (T) this;
+ }
+
+ public T columnsToRead(SchemaPath ... columnsToRead) {
+ this.columnsToRead = Lists.newArrayList(columnsToRead);
+ return (T) this;
+ }
+
+ public T columnsToRead(String ... columnsToRead) {
+ this.columnsToRead = Lists.newArrayList();
+
+ for (String column : columnsToRead) {
+
+ this.columnsToRead.add(SchemaPath.getSimplePath(column));
+ }
+ return (T) this;
+ }
+
+ }
+
+ /**
+ * Builder for Json Scan RecordBatch.
+ */
+ public class JsonScanBuilder extends ScanPopBuider<JsonScanBuilder> {
+ List<String> jsonBatches = null;
+ List<String> inputPaths = Collections.EMPTY_LIST;
+
+ public JsonScanBuilder(PopBuilder parent) {
+ super(parent);
+ }
+
+ public JsonScanBuilder() {
+ super();
+ }
+
+ public JsonScanBuilder jsonBatches(List<String> jsonBatches) {
+ this.jsonBatches = jsonBatches;
+ return this;
+ }
+
+ public JsonScanBuilder inputPaths(List<String> inputPaths) {
+ this.inputPaths = inputPaths;
+ return this;
+ }
+
+ public PopBuilder buildAddAsInput() throws Exception {
+ mockOpContext(this.initReservation, this.maxAllocation);
+ RecordBatch scanBatch = getScanBatch();
+ return parent.addInput(scanBatch);
+ }
+
+ public RecordBatch build() throws Exception {
+ mockOpContext(this.initReservation, this.maxAllocation);
+ return getScanBatch();
+ }
+
+ private RecordBatch getScanBatch() throws Exception {
+ Iterator<RecordReader> readers = null;
+
+ if (jsonBatches != null) {
+ readers = TestUtilities.getJsonReadersFromBatchString(jsonBatches, fragContext, columnsToRead);
+ } else {
+ readers = TestUtilities.getJsonReadersFromInputFiles(fs, inputPaths, fragContext, columnsToRead);
+ }
+
+ RecordBatch scanBatch = new ScanBatch(null, fragContext, readers);
+ return scanBatch;
+ }
+ }
+
+ /**
+ * Builder for parquet Scan RecordBatch.
+ */
+ public class ParquetScanBuilder extends ScanPopBuider<ParquetScanBuilder> {
+ List<String> inputPaths = Collections.EMPTY_LIST;
+
+ public ParquetScanBuilder() {
+ super();
+ }
+
+ public ParquetScanBuilder(PopBuilder parent) {
+ super(parent);
+ }
+
+ public ParquetScanBuilder inputPaths(List<String> inputPaths) {
+ this.inputPaths = inputPaths;
+ return this;
+ }
+
+ public PopBuilder buildAddAsInput() throws Exception {
+ mockOpContext(this.initReservation, this.maxAllocation);
+ RecordBatch scanBatch = getScanBatch();
+ return parent.addInput(scanBatch);
+ }
+
+ public RecordBatch build() throws Exception {
+ mockOpContext(this.initReservation, this.maxAllocation);
+ return getScanBatch();
+ }
+
+ private RecordBatch getScanBatch() throws Exception {
+ List<RecordReader> readers = Lists.newArrayList();
+
+ for (String path : inputPaths) {
+ ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), new Path(path));
+
+ for (int i = 0; i < footer.getBlocks().size(); i++) {
+ readers.add(new ParquetRecordReader(fragContext,
+ path,
+ i,
+ fs,
+ CodecFactory.createDirectCodecFactory(fs.getConf(),
+ new ParquetDirectByteBufferAllocator(opContext.getAllocator()), 0),
+ footer,
+ columnsToRead,
+ ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION));
+ }
+ }
+
+ RecordBatch scanBatch = new ScanBatch(null, fragContext, readers.iterator());
+ return scanBatch;
+ }
+ } // end of ParquetScanBuilder
+
+ @Override
+ protected void mockOpContext(long initReservation, long maxAllocation) throws Exception {
+ super.mockOpContext(initReservation, maxAllocation);
+
+ // mock ScanExecutor used by parquet reader.
+ new NonStrictExpectations() {
+ {
+ opContext.getScanExecutor();result = scanExecutor;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index ad7367f..7d09ca5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.physical.unit;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import mockit.Delegate;
@@ -56,6 +54,7 @@ import org.apache.drill.exec.ops.BufferManagerImpl;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
@@ -66,8 +65,9 @@ import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.easy.json.JSONRecordReader;
import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.TestUtilities;
+import org.junit.Before;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -78,10 +78,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static org.apache.drill.exec.physical.base.AbstractBase.INIT_ALLOCATION;
+
/**
* Look! Doesn't extend BaseTestQuery!!
*/
public class PhysicalOpUnitTestBase extends ExecTest {
+// public static long INIT_ALLOCATION = 1_000_000l;
+// public static long MAX_ALLOCATION = 10_000_000L;
@Injectable FragmentContext fragContext;
@Injectable OperatorContext opContext;
@@ -97,6 +101,11 @@ public class PhysicalOpUnitTestBase extends ExecTest {
private final TemplateClassDefinition<Projector> templateClassDefinition = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class);
private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
+ @Before
+ public void setup() throws Exception {
+ mockFragmentContext();
+ }
+
@Override
protected LogicalExpression parseExpr(String expr) {
ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
@@ -128,37 +137,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
return ret;
}
- @SuppressWarnings("resource")
- void runTest(OperatorTestBuilder testBuilder) {
- BatchCreator<PhysicalOperator> opCreator;
- RecordBatch testOperator;
- try {
- mockFragmentContext(testBuilder.initReservation, testBuilder.maxAllocation);
- opCreator = (BatchCreator<PhysicalOperator>)
- opCreatorReg.getOperatorCreator(testBuilder.popConfig.getClass());
- List<RecordBatch> incomingStreams = Lists.newArrayList();
- for (List<String> batchesJson : testBuilder.inputStreamsJSON) {
- incomingStreams.add(new ScanBatch(null, fragContext,
- getRecordReadersForJsonBatches(batchesJson, fragContext)));
- }
- testOperator = opCreator.getBatch(fragContext, testBuilder.popConfig, incomingStreams);
-
- Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator));
- Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(testBuilder.baselineRecords);
- DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
-
- } catch (ExecutionSetupException e) {
- throw new RuntimeException(e);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- } catch (SchemaChangeException e) {
- throw new RuntimeException(e);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private static class BatchIterator implements Iterable<VectorAccessible> {
+ protected static class BatchIterator implements Iterable<VectorAccessible> {
private RecordBatch operator;
public BatchIterator(RecordBatch operator) {
@@ -213,11 +192,40 @@ public class PhysicalOpUnitTestBase extends ExecTest {
private String[] baselineColumns;
private List<Map<String, Object>> baselineRecords;
private List<List<String>> inputStreamsJSON;
- private long initReservation = 10000000;
- private long maxAllocation = 15000000;
+ private long initReservation = AbstractBase.INIT_ALLOCATION;
+ private long maxAllocation = AbstractBase.MAX_ALLOCATION;
public void go() {
- runTest(this);
+ BatchCreator<PhysicalOperator> opCreator;
+ RecordBatch testOperator;
+ try {
+ mockOpContext(initReservation, maxAllocation);
+
+ opCreator = (BatchCreator<PhysicalOperator>)
+ opCreatorReg.getOperatorCreator(popConfig.getClass());
+ List<RecordBatch> incomingStreams = Lists.newArrayList();
+ if (inputStreamsJSON != null) {
+ for (List<String> batchesJson : inputStreamsJSON) {
+ incomingStreams.add(new ScanBatch(null, fragContext,
+ getRecordReadersForJsonBatches(batchesJson, fragContext)));
+ }
+ }
+
+ testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams);
+
+ Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator));
+ Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+ DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+
+ } catch (ExecutionSetupException e) {
+ throw new RuntimeException(e);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ } catch (SchemaChangeException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
@@ -276,9 +284,9 @@ public class PhysicalOpUnitTestBase extends ExecTest {
}
}
- private void mockFragmentContext(long initReservation, long maxAllocation) throws Exception{
+ protected void mockFragmentContext() throws Exception{
final CodeCompiler compiler = new CodeCompiler(drillConf, optionManager);
- final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation);
+// final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation);
new NonStrictExpectations() {
{
// optManager.getOption(withAny(new TypeValidators.BooleanValidator("", false))); result = false;
@@ -320,27 +328,28 @@ public class PhysicalOpUnitTestBase extends ExecTest {
} catch (IOException e) {
throw new RuntimeException(e);
}
+ }
+ };
+ }
+
+ protected void mockOpContext(long initReservation, long maxAllocation) throws Exception{
+ final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation);
+ new NonStrictExpectations() {
+ {
opContext.getStats();result = opStats;
opContext.getAllocator(); result = allocator;
- fragContext.newOperatorContext(withAny(popConf));
- result = opContext;
+ fragContext.newOperatorContext(withAny(popConf));result = opContext;
}
};
}
- @SuppressWarnings("resource")
+ protected OperatorCreatorRegistry getOpCreatorReg() {
+ return opCreatorReg;
+ }
+
private Iterator<RecordReader> getRecordReadersForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) {
- ObjectMapper mapper = new ObjectMapper();
- List<RecordReader> readers = new ArrayList<>();
- for (String batchJason : jsonBatches) {
- JsonNode records;
- try {
- records = mapper.readTree(batchJason);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- readers.add(new JSONRecordReader(fragContext, records, null, Collections.singletonList(SchemaPath.getSimplePath("*"))));
- }
- return readers.iterator();
+ return TestUtilities.getJsonReadersFromBatchString(jsonBatches, fragContext, Collections.singletonList(SchemaPath.getSimplePath("*")));
}
+
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
new file mode 100644
index 0000000..d0a64f4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.UnionAll;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class contains examples to show how to use MiniPlanTestBuilder to test a
+ * specific plan fragment (MiniPlan). Each testcase requires 1) a RecordBatch,
+ * built from PopBuilder/ScanBuilder, 2)an expected schema and base line values,
+ * or 3) indicating no batch is expected.
+ */
+public class TestMiniPlan extends MiniPlanUnitTestBase {
+
+ protected static DrillFileSystem fs;
+
+ @BeforeClass
+ public static void initFS() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+ fs = new DrillFileSystem(conf);
+ }
+
+ @Test
+ @Ignore("DRILL-5464: A bug in JsonRecordReader handling empty file")
+ public void testEmptyJsonInput() throws Exception {
+ String emptyFile = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString();
+
+ RecordBatch scanBatch = new JsonScanBuilder()
+ .fileSystem(fs)
+ .inputPaths(Lists.newArrayList(emptyFile))
+ .build();
+
+ new MiniPlanTestBuilder()
+ .root(scanBatch)
+ .expectZeroBatch(true)
+ .go();
+ }
+
+ @Test
+ public void testSimpleParquetScan() throws Exception {
+ String file = FileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString();
+
+ RecordBatch scanBatch = new ParquetScanBuilder()
+ .fileSystem(fs)
+ .columnsToRead("R_REGIONKEY")
+ .inputPaths(Lists.newArrayList(file))
+ .build();
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("R_REGIONKEY", TypeProtos.MinorType.BIGINT)
+ .build();
+
+ new MiniPlanTestBuilder()
+ .root(scanBatch)
+ .expectedSchema(expectedSchema)
+ .baselineValues(0L)
+ .baselineValues(1L)
+ .go();
+ }
+
+ @Test
+ public void testSimpleJson() throws Exception {
+ List<String> jsonBatches = Lists.newArrayList(
+ "{\"a\":100}"
+ );
+
+ RecordBatch scanBatch = new JsonScanBuilder()
+ .jsonBatches(jsonBatches)
+ .build();
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .addNullable("a", TypeProtos.MinorType.BIGINT)
+ .build();
+
+ new MiniPlanTestBuilder()
+ .root(scanBatch)
+ .expectedSchema(expectedSchema)
+ .baselineValues(100L)
+ .go();
+ }
+
+ @Test
+ public void testUnionFilter() throws Exception {
+ List<String> leftJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
+ "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
+
+ List<String> rightJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 10 }]",
+ "[{\"a\": 50, \"b\" : 100}]");
+
+ RecordBatch batch = new PopBuilder()
+ .physicalOperator(new UnionAll(Collections.EMPTY_LIST)) // Children list is provided through RecordBatch
+ .addInputAsChild()
+ .physicalOperator(new Filter(null, parseExpr("a=5"), 1.0f))
+ .addJsonScanAsChild()
+ .jsonBatches(leftJsonBatches)
+ .columnsToRead("a", "b")
+ .buildAddAsInput()
+ .buildAddAsInput()
+ .addInputAsChild()
+ .physicalOperator(new Filter(null, parseExpr("a=50"), 1.0f))
+ .addJsonScanAsChild()
+ .jsonBatches(rightJsonBatches)
+ .columnsToRead("a", "b")
+ .buildAddAsInput()
+ .buildAddAsInput()
+ .build();
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .addNullable("a", TypeProtos.MinorType.BIGINT)
+ .addNullable("b", TypeProtos.MinorType.BIGINT)
+ .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+ .build();
+
+ new MiniPlanTestBuilder()
+ .root(batch)
+ .expectedSchema(expectedSchema)
+ .baselineValues(5l, 1l)
+ .baselineValues(5l, 5l)
+ .baselineValues(50l, 100l)
+ .go();
+ }
+
+ @Test
+ @Ignore ("DRILL-5327: A bug in UnionAll handling empty inputs from both sides")
+ public void testUnionFilterAll() throws Exception {
+ List<String> leftJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]");
+
+ List<String> rightJsonBatches = Lists.newArrayList(
+ "[{\"a\": 50, \"b\" : 10 }]");
+
+ RecordBatch leftScan = new JsonScanBuilder()
+ .jsonBatches(leftJsonBatches)
+ .columnsToRead("a", "b")
+ .build();
+
+ RecordBatch leftFilter = new PopBuilder()
+ .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f))
+ .addInput(leftScan)
+ .build();
+
+ RecordBatch rightScan = new JsonScanBuilder()
+ .jsonBatches(rightJsonBatches)
+ .columnsToRead("a", "b")
+ .build();
+
+ RecordBatch rightFilter = new PopBuilder()
+ .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f))
+ .addInput(rightScan)
+ .build();
+
+ RecordBatch batch = new PopBuilder()
+ .physicalOperator(new UnionAll(Collections.EMPTY_LIST)) // Children list is provided through RecordBatch
+ .addInput(leftFilter)
+ .addInput(rightFilter)
+ .build();
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .addNullable("a", TypeProtos.MinorType.BIGINT)
+ .addNullable("b", TypeProtos.MinorType.BIGINT)
+ .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+ .build();
+
+ new MiniPlanTestBuilder()
+ .root(batch)
+ .expectedSchema(expectedSchema)
+ .go();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
index 550f56f..7d66795 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
@@ -66,7 +66,7 @@ public class TestDrillFileSystem {
DrillFileSystem dfs = null;
InputStream is = null;
Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/);
OperatorStats stats = new OperatorStats(profileDef, null /*allocator*/);
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index 782973d..277e6f9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -52,7 +52,7 @@ public class TestParquetFilterPushDown extends PlanTestBase {
BitControl.PlanFragment.getDefaultInstance(), null, bits[0].getContext().getFunctionImplementationRegistry());
Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
fs = FileSystem.get(conf);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dc237e3/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
index 48657c7..b946ab9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
@@ -84,9 +84,15 @@ public class SchemaBuilder {
parent.finishMap(col);
return parent;
}
+
+ @Override
+ public SchemaBuilder withSVMode(SelectionVectorMode svMode) {
+ throw new IllegalStateException("Cannot set SVMode for a nested schema");
+ }
}
protected List<MaterializedField> columns = new ArrayList<>( );
+ private SelectionVectorMode svMode = SelectionVectorMode.NONE;
public SchemaBuilder() { }
@@ -128,8 +134,13 @@ public class SchemaBuilder {
return new MapBuilder(this, pathName);
}
+ public SchemaBuilder withSVMode(SelectionVectorMode svMode) {
+ this.svMode = svMode;
+ return this;
+ }
+
public BatchSchema build() {
- return new BatchSchema(SelectionVectorMode.NONE, columns);
+ return new BatchSchema(svMode, columns);
}
void finishMap(MaterializedField map) {
|