Repository: drill Updated Branches: refs/heads/master 9514cbe75 -> 9f4fff800 DRILL-4589: Reduce planning time for file system partition pruning by reducing filter evaluation overhead Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dbf4b15e Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dbf4b15e Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dbf4b15e Branch: refs/heads/master Commit: dbf4b15eda14f55462ff0872266bf61c13bdb1bc Parents: 9514cbe Author: Jinfeng Ni Authored: Thu Feb 25 10:13:43 2016 -0800 Committer: Jinfeng Ni Committed: Mon Apr 11 16:19:18 2016 -0700 ---------------------------------------------------------------------- .../planner/sql/HivePartitionDescriptor.java | 8 +-- .../exec/planner/sql/HivePartitionLocation.java | 3 +- .../exec/planner/DFSDirPartitionLocation.java | 70 ++++++++++++++++++ .../exec/planner/DFSFilePartitionLocation.java | 75 ++++++++++++++++++++ .../exec/planner/DFSPartitionLocation.java | 71 ------------------ .../planner/FileSystemPartitionDescriptor.java | 72 +++++++++++++------ .../planner/ParquetPartitionDescriptor.java | 7 +- .../exec/planner/ParquetPartitionLocation.java | 2 +- .../drill/exec/planner/PartitionDescriptor.java | 2 +- .../drill/exec/planner/PartitionLocation.java | 36 ++++++++-- .../exec/planner/SimplePartitionLocation.java | 41 +++++++++++ .../logical/partition/PruneScanRule.java | 32 +++++---- 12 files changed, 298 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java index e531f38..c8e45ca 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java @@ -151,7 +151,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { } @Override - public TableScan createTableScan(List newPartitions) throws Exception { + public TableScan createTableScan(List newPartitions) throws Exception { GroupScan newGroupScan = createNewGroupScan(newPartitions); return new DrillScanRel(scanRel.getCluster(), scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), @@ -162,7 +162,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { true /*filter pushdown*/); } - private GroupScan createNewGroupScan(List newFiles) throws ExecutionSetupException { + private GroupScan createNewGroupScan(List newPartitionLocations) throws ExecutionSetupException { HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); HiveReadEntry origReadEntry = hiveScan.hiveReadEntry; List oldPartitions = origReadEntry.partitions; @@ -170,8 +170,8 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { for (HiveTable.HivePartition part: oldPartitions) { String partitionLocation = part.getPartition().getSd().getLocation(); - for (String newPartitionLocation: newFiles) { - if (partitionLocation.equals(newPartitionLocation)) { + for (PartitionLocation newPartitionLocation: newPartitionLocations) { + if (partitionLocation.equals(newPartitionLocation.getEntirePartitionLocation())) { newPartitions.add(part); } } http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java index 49e3361..5a2ae30 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java @@ -19,10 +19,11 @@ package org.apache.drill.exec.planner.sql; import com.google.common.collect.ImmutableList; import org.apache.drill.exec.planner.PartitionLocation; +import org.apache.drill.exec.planner.SimplePartitionLocation; import java.util.List; -public class HivePartitionLocation implements PartitionLocation { +public class HivePartitionLocation extends SimplePartitionLocation { private final String partitionLocation; private final List partitionValues; http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java new file mode 100644 index 0000000..da3aa68 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Class defines a single partition corresponding to a directory in a DFS table. + */ +package org.apache.drill.exec.planner; + + +import com.google.common.collect.Lists; + +import java.util.Collection; +import java.util.List; + +/** + * Composite partition location corresponds to a directory in the file system. + * */ +public class DFSDirPartitionLocation implements PartitionLocation { + // Similar to directory / file structures, subPartitions could be either a DFSDirPartitionLocation or DFSFilePartitionLocation + private final Collection subPartitions; + private final String[] dirs; + + public DFSDirPartitionLocation(String[] dirs, Collection subPartitions) { + this.subPartitions = subPartitions; + this.dirs = dirs; + } + + @Override + public String getPartitionValue(int index) { + assert index < dirs.length; + return dirs[index]; + } + + @Override + public String getEntirePartitionLocation() { + throw new UnsupportedOperationException("Should not call getEntirePartitionLocation for composite partition location!"); + } + + @Override + public List getPartitionLocationRecursive() { + List results = Lists.newArrayList(); + + for (final PartitionLocation partitionLocation : subPartitions) { + results.addAll(partitionLocation.getPartitionLocationRecursive()); + } + + return results; + } + + @Override + public boolean isCompositePartition() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java new file mode 100644 index 0000000..6e42f3b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner; + +import org.apache.hadoop.fs.Path; + +/** + * Class defines a single partition in a DFS table. + */ +public class DFSFilePartitionLocation extends SimplePartitionLocation { + private final String[] dirs; + private final String file; + + public DFSFilePartitionLocation(int max, String selectionRoot, String file) { + this.file = file; + this.dirs = new String[max]; + + // strip the scheme and authority if they exist + selectionRoot = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString(); + + int start = file.indexOf(selectionRoot) + selectionRoot.length(); + String postPath = file.substring(start); + if (postPath.length() == 0) { + return; + } + if(postPath.charAt(0) == '/'){ + postPath = postPath.substring(1); + } + String[] mostDirs = postPath.split("/"); + int maxLoop = Math.min(max, mostDirs.length - 1); + for(int i =0; i < maxLoop; i++){ + this.dirs[i] = mostDirs[i]; + } + } + + /** + * Returns the value for a give partition key + * @param index - Index of the partition key whose value is to be returned + * @return + */ + @Override + public String getPartitionValue(int index) { + assert index < dirs.length; + return dirs[index]; + } + + /** + * Return the full location of this partition + * @return + */ + @Override + public String getEntirePartitionLocation() { + return file; + } + + public String[] getDirs() { + return dirs; + } +} + http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java deleted file mode 100644 index e058aa2..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.planner; - -import org.apache.hadoop.fs.Path; - -/** - * Class defines a single partition in a DFS table. - */ -public class DFSPartitionLocation implements PartitionLocation { - private final String[] dirs; - private final String file; - - public DFSPartitionLocation(int max, String selectionRoot, String file) { - this.file = file; - this.dirs = new String[max]; - - // strip the scheme and authority if they exist - selectionRoot = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString(); - - int start = file.indexOf(selectionRoot) + selectionRoot.length(); - String postPath = file.substring(start); - if (postPath.length() == 0) { - return; - } - if(postPath.charAt(0) == '/'){ - postPath = postPath.substring(1); - } - String[] mostDirs = postPath.split("/"); - int maxLoop = Math.min(max, mostDirs.length - 1); - for(int i =0; i < maxLoop; i++){ - this.dirs[i] = mostDirs[i]; - } - } - - /** - * Returns the value for a give partition key - * @param index - Index of the partition key whose value is to be returned - * @return - */ - @Override - public String getPartitionValue(int index) { - assert index < dirs.length; - return dirs[index]; - } - - /** - * Return the full location of this partition - * @return - */ - @Override - public String getEntirePartitionLocation() { - return file; - } -} - http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java index f0fcee7..cfc8542 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java @@ -17,8 +17,11 @@ */ package org.apache.drill.exec.planner; +import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -44,7 +47,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FormatSelection; -import org.apache.drill.exec.store.parquet.ParquetGroupScan; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; @@ -99,17 +101,6 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { return MAX_NESTED_SUBDIRS; } -// @Override -// public GroupScan createNewGroupScan(List newFiles) throws IOException { -// if (scanRel instanceof DrillScanRel) { -// final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation()); -// final FileGroupScan newScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection); -// return newScan; -// } else { -// throw new UnsupportedOperationException("Does not allow to get groupScan for EnumerableTableScan"); -// } -// } - public DrillTable getTable() { return table; } @@ -155,6 +146,41 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { @Override protected void createPartitionSublists() { + final Collection fileLocations = getFileLocations(); + List locations = new LinkedList<>(); + + final String selectionRoot = getBaseTableLocation(); + + // map used to map the partition keys (dir0, dir1, ..), to the list of partitions that share the same partition keys. + // For example, + // 1990/Q1/1.parquet, 2.parquet + // would have <1990, Q1> as key, and value as list of partition location for 1.parquet and 2.parquet. + HashMap, List> dirToFileMap = new HashMap<>(); + + // Figure out the list of leaf subdirectories. For each leaf subdirectory, find the list of files (DFSFilePartitionLocation) + // it contains. + for (String file: fileLocations) { + DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, selectionRoot, file); + + final String[] dirs = dfsFilePartitionLocation.getDirs(); + final List dirList = Arrays.asList(dirs); + + if (!dirToFileMap.containsKey(dirList)) { + dirToFileMap.put(dirList, new ArrayList()); + } + dirToFileMap.get(dirList).add(dfsFilePartitionLocation); + } + + // build a list of DFSDirPartitionLocation. + for (final List dirs : dirToFileMap.keySet()) { + locations.add( new DFSDirPartitionLocation((String [])dirs.toArray(), dirToFileMap.get(dirs))); + } + + locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE); + sublistsCreated = true; + } + + protected Collection getFileLocations() { Collection fileLocations = null; if (scanRel instanceof DrillScanRel) { // If a particular GroupScan provides files, get the list of files from there rather than @@ -168,17 +194,23 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { } else if (scanRel instanceof EnumerableTableScan) { fileLocations = ((FormatSelection) table.getSelection()).getAsFiles(); } - - List locations = new LinkedList<>(); - for (String file: fileLocations) { - locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file)); - } - locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE); - sublistsCreated = true; + return fileLocations; } @Override - public TableScan createTableScan(List newFiles) throws Exception { + public TableScan createTableScan(List newPartitionLocation) throws Exception { + List newFiles = Lists.newArrayList(); + for (final PartitionLocation location : newPartitionLocation) { + if (!location.isCompositePartition()) { + newFiles.add(location.getEntirePartitionLocation()); + } else { + final Collection subPartitions = location.getPartitionLocationRecursive(); + for (final PartitionLocation subPart : subPartitions) { + newFiles.add(subPart.getEntirePartitionLocation()); + } + } + } + if (scanRel instanceof DrillScanRel) { final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation()); final FileGroupScan newGroupScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection); http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java index 81bcf03..07e1412 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java @@ -130,7 +130,12 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { } @Override - public TableScan createTableScan(List newFiles) throws Exception { + public TableScan createTableScan(List newPartitionLocation) throws Exception { + List newFiles = Lists.newArrayList(); + for (final PartitionLocation location : newPartitionLocation) { + newFiles.add(location.getEntirePartitionLocation()); + } + final GroupScan newGroupScan = createNewGroupScan(newFiles); return new DrillScanRel(scanRel.getCluster(), http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java index 719b080..70e5f86 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java @@ -24,7 +24,7 @@ package org.apache.drill.exec.planner; * partitioning scheme) we throw UnsupportedOperationException when getPartitionValue() is * invoked. */ -public class ParquetPartitionLocation implements PartitionLocation { +public class ParquetPartitionLocation extends SimplePartitionLocation { private final String file; public ParquetPartitionLocation(String file) { http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java index dd3b084..f08d713 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java @@ -80,6 +80,6 @@ public interface PartitionDescriptor extends Iterable> { * @return * @throws Exception */ - public TableScan createTableScan(List newPartitions) throws Exception; + public TableScan createTableScan(List newPartitions) throws Exception; } http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java index 656e3a9..f94e8cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java @@ -17,19 +17,41 @@ */ package org.apache.drill.exec.planner; -/* - * Interface to define a single partition. It contains the - * location of the entire partition and also stores the - * value of the individual partition keys for this partition. +import java.util.List; + +/** + * Interface to define a partition. Partition could be simple, + * which represents a basic unit for partition, determined by + * the underlying storage plugin. On file system, a simple partition + * represents a file. Partition could be composite, consisting of + * other partitions. On file system storage plugin, a composite + * partition corresponds to a directory. + * + * Simple partition location keeps track the string representation of + * partition and also stores the value of the individual partition keys + * for this partition. Composite partition location keeps track the common + * partition keys, but does not keep track the the string representation of + * partition and leave it to each individual simple partition it consists of. */ public interface PartitionLocation { - /* + /** * Returns the value of the 'index' partition column */ public String getPartitionValue(int index); - /* - * Returns the string representation of this partition + /** + * Returns the string representation of this partition. + * Only a non-composite partition supports this. */ public String getEntirePartitionLocation(); + + /** + * Returns the list of the non-composite partitions that this partition consists of. + */ + public List getPartitionLocationRecursive(); + + /** + * Returns if this is a simple or composite partition. + */ + public boolean isCompositePartition(); } http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java new file mode 100644 index 0000000..523169e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * Abstract class for simple partition. It contains the + * location of the entire partition and also stores the + * value of the individual partition keys for this partition. + */ +public abstract class SimplePartitionLocation implements PartitionLocation{ + @Override + public boolean isCompositePartition() { + return false; + } + + @Override + public List getPartitionLocationRecursive() { + return ImmutableList.of(this); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java index 1c91d3a..a9fb101 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java @@ -205,10 +205,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { } // set up the partitions - List newFiles = Lists.newArrayList(); + List newPartitions = Lists.newArrayList(); long numTotal = 0; // total number of partitions int batchIndex = 0; - String firstLocation = null; + PartitionLocation firstLocation = null; LogicalExpression materializedExpr = null; // Outer loop: iterate over a list of batches of PartitionLocations @@ -216,7 +216,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { numTotal += partitions.size(); logger.debug("Evaluating partition pruning for batch {}", batchIndex); if (batchIndex == 0) { // save the first location in case everything is pruned - firstLocation = partitions.get(0).getEntirePartitionLocation(); + firstLocation = partitions.get(0); } final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator); final VectorContainer container = new VectorContainer(); @@ -262,8 +262,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr); - logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}", - miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex); + logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {} with # of partitions : {}", + miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex, partitions.size()); miscTimer.reset(); int recordCount = 0; @@ -272,7 +272,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { // Inner loop: within each batch iterate over the PartitionLocations for(PartitionLocation part: partitions){ if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){ - newFiles.add(part.getEntirePartitionLocation()); + newPartitions.add(part); qualifiedCount++; } recordCount++; @@ -292,21 +292,23 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { } try { + if (newPartitions.size() == numTotal) { + logger.info("No partitions were eligible for pruning"); + return; + } + // handle the case all partitions are filtered out. boolean canDropFilter = true; - if (newFiles.isEmpty()) { + if (newPartitions.isEmpty()) { assert firstLocation != null; - newFiles.add(firstLocation); + // Add the first non-composite partition location, since execution requires schema. + // In such case, we should not drop filter. + newPartitions.add(firstLocation.getPartitionLocationRecursive().get(0)); canDropFilter = false; } - if (newFiles.size() == numTotal) { - logger.info("No partitions were eligible for pruning"); - return; - } - - logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size()); + logger.info("Pruned {} partitions down to {}", numTotal, newPartitions.size()); List conjuncts = RelOptUtil.conjunctions(condition); List pruneConjuncts = RelOptUtil.conjunctions(pruneCondition); @@ -318,7 +320,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { condition = condition.accept(reverseVisitor); pruneCondition = pruneCondition.accept(reverseVisitor); - RelNode inputRel = descriptor.createTableScan(newFiles); + RelNode inputRel = descriptor.createTableScan(newPartitions); if (projectRel != null) { inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));