DRILL-1143: Enable project pushdown into HiveRecordReader Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/111c1a8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/111c1a8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/111c1a8a Branch: refs/heads/master Commit: 111c1a8adf537e8f368de96c4be4c73327451202 Parents: 65c8358 Author: vkorukanti Authored: Tue Jun 17 17:56:45 2014 -0700 Committer: Jacques Nadeau Committed: Sun Jul 20 16:49:19 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/hive/HiveRecordReader.java | 2 +- .../apache/drill/exec/store/hive/HiveScan.java | 11 ++- .../exec/store/hive/HiveStoragePlugin.java | 2 +- .../drill/exec/TestHiveProjectPushDown.java | 91 ++++++++++++++++++++ .../exec/store/hive/HiveTestDataGenerator.java | 21 ++++- .../java/org/apache/drill/BaseTestQuery.java | 4 + 6 files changed, 125 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/111c1a8a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index ddb08c6..8a84ac2 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -169,7 +169,7 @@ public class HiveRecordReader implements RecordReader { List columnIds = Lists.newArrayList(); selectedColumnNames = Lists.newArrayList(); for (SchemaPath field : projectedColumns) { - String columnName = field.getRootSegment().getPath(); //TODO? + String columnName = field.getRootSegment().getPath(); if (!tableColumns.contains(columnName)) { if (partitionNames.contains(columnName)) { selectedPartitionNames.add(columnName); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/111c1a8a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index 8c02eb3..61b6c7f 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -89,9 +89,10 @@ public class HiveScan extends AbstractGroupScan { Map partitionMap = new HashMap(); @JsonCreator - public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storage-plugin") String storagePluginName, - @JsonProperty("columns") List columns, - @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { + public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, + @JsonProperty("storage-plugin") String storagePluginName, + @JsonProperty("columns") List columns, + @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { this.hiveReadEntry = hiveReadEntry; this.table = hiveReadEntry.getTable(); this.storagePluginName = storagePluginName; @@ -300,4 +301,8 @@ public class HiveScan extends AbstractGroupScan { return newScan; } + @Override + public boolean canPushdownProjects(List columns) { + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/111c1a8a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index c5a6e2c..da8a41a 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -72,7 +72,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { throw new UnsupportedOperationException("Querying Hive views from Drill is not supported in current version."); } - return new HiveScan(hiveReadEntry, this, null); + return new HiveScan(hiveReadEntry, this, columns); } catch (ExecutionSetupException e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/111c1a8a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java new file mode 100644 index 0000000..6ced636 --- /dev/null +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.store.hive.HiveTestDataGenerator; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestHiveProjectPushDown extends PlanTestBase { + + @BeforeClass + public static void generateHive() throws Exception{ + new HiveTestDataGenerator().createAndAddHiveTestPlugin(bit.getContext().getStorage()); + } + + private void testHelper(String query, String expectedColNamesInPlan, int expectedRecordCount)throws Exception { + testPhysicalPlan(query, expectedColNamesInPlan); + + int actualRecordCount = testSql(query); + assertEquals(String.format("Received unexepcted number of rows in output: expected=%d, received=%s", + expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount); + } + + @Test + public void testSingleColumnProject() throws Exception { + String query = "SELECT `value` as v FROM hive.`default`.kv"; + String expectedColNames = " \"columns\" : [ \"`value`\" ]"; + + testHelper(query, expectedColNames, 5); + } + + @Test + public void testMultipleColumnsProject() throws Exception { + String query = "SELECT boolean_field as b_f, tinyint_field as ti_f FROM hive.`default`.readtest"; + String expectedColNames = " \"columns\" : [ \"`boolean_field`\", \"`tinyint_field`\" ]"; + + testHelper(query, expectedColNames, 1); + } + + @Test + public void testPartitionColumnProject() throws Exception { + String query = "SELECT double_part as dbl_p FROM hive.`default`.readtest"; + String expectedColNames = " \"columns\" : [ \"`double_part`\" ]"; + + testHelper(query, expectedColNames, 1); + } + + @Test + public void testMultiplePartitionColumnsProject() throws Exception { + String query = "SELECT double_part as dbl_p, decimal_part as dec_p FROM hive.`default`.readtest"; + String expectedColNames = " \"columns\" : [ \"`double_part`\", \"`decimal_part`\" ]"; + + testHelper(query, expectedColNames, 1); + } + + @Test + public void testPartitionAndRegularColumnProjectColumn() throws Exception { + String query = "SELECT boolean_field as b_f, tinyint_field as ti_f, " + + "double_part as dbl_p, decimal_part as dec_p FROM hive.`default`.readtest"; + String expectedColNames = " \"columns\" : [ \"`boolean_field`\", \"`tinyint_field`\", " + + "\"`double_part`\", \"`decimal_part`\" ]"; + + testHelper(query, expectedColNames, 1); + } + + @Test + public void testStarProject() throws Exception { + String query = "SELECT * FROM hive.`default`.kv"; + String expectedColNames = " \"columns\" : [ \"`key`\", \"`value`\" ]"; + + testHelper(query, expectedColNames, 5); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/111c1a8a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index 3fe36f5..adb007d 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -23,8 +23,11 @@ import java.io.IOException; import java.io.PrintWriter; import java.sql.Date; import java.sql.Timestamp; +import java.util.Map; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; +import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -52,7 +55,23 @@ public class HiveTestDataGenerator { FileUtils.forceDelete(f); } } - + + public void createAndAddHiveTestPlugin(StoragePluginRegistry pluginRegistry) throws Exception { + // generate test tables and data + generateTestData(); + + // add Hive plugin to given registry + Map config = Maps.newHashMap(); + config.put("hive.metastore.uris", ""); + config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", DB_DIR)); + config.put("hive.metastore.warehouse.dir", WH_DIR); + config.put("fs.default.name", "file:///"); + HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(config); + pluginConfig.setEnabled(true); + + pluginRegistry.createOrUpdate("hive", pluginConfig, true); + } + public void generateTestData() throws Exception { // remove data from previous runs. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/111c1a8a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index 5acb596..b1c1ec8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -175,6 +175,10 @@ public class BaseTestQuery extends ExecTest{ return testRunAndPrint(QueryType.PHYSICAL, query); } + protected int testSql(String query) throws Exception{ + return testRunAndPrint(QueryType.SQL, query); + } + protected void testPhysicalFromFile(String file) throws Exception{ testPhysical(getFile(file)); }