drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [1/2] drill git commit: DRILL-4589: Reduce planning time for file system partition pruning by reducing filter evaluation overhead
Date Wed, 13 Apr 2016 00:23:48 GMT
Repository: drill
Updated Branches:
  refs/heads/master 9514cbe75 -> 9f4fff800


DRILL-4589: Reduce planning time for file system partition pruning by reducing filter evaluation
overhead


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dbf4b15e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dbf4b15e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dbf4b15e

Branch: refs/heads/master
Commit: dbf4b15eda14f55462ff0872266bf61c13bdb1bc
Parents: 9514cbe
Author: Jinfeng Ni <jni@apache.org>
Authored: Thu Feb 25 10:13:43 2016 -0800
Committer: Jinfeng Ni <jni@apache.org>
Committed: Mon Apr 11 16:19:18 2016 -0700

----------------------------------------------------------------------
 .../planner/sql/HivePartitionDescriptor.java    |  8 +--
 .../exec/planner/sql/HivePartitionLocation.java |  3 +-
 .../exec/planner/DFSDirPartitionLocation.java   | 70 ++++++++++++++++++
 .../exec/planner/DFSFilePartitionLocation.java  | 75 ++++++++++++++++++++
 .../exec/planner/DFSPartitionLocation.java      | 71 ------------------
 .../planner/FileSystemPartitionDescriptor.java  | 72 +++++++++++++------
 .../planner/ParquetPartitionDescriptor.java     |  7 +-
 .../exec/planner/ParquetPartitionLocation.java  |  2 +-
 .../drill/exec/planner/PartitionDescriptor.java |  2 +-
 .../drill/exec/planner/PartitionLocation.java   | 36 ++++++++--
 .../exec/planner/SimplePartitionLocation.java   | 41 +++++++++++
 .../logical/partition/PruneScanRule.java        | 32 +++++----
 12 files changed, 298 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index e531f38..c8e45ca 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -151,7 +151,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor
{
   }
 
   @Override
-  public TableScan createTableScan(List<String> newPartitions) throws Exception {
+  public TableScan createTableScan(List<PartitionLocation> newPartitions) throws Exception
{
     GroupScan newGroupScan = createNewGroupScan(newPartitions);
     return new DrillScanRel(scanRel.getCluster(),
         scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
@@ -162,7 +162,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor
{
         true /*filter pushdown*/);
   }
 
-  private GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException
{
+  private GroupScan createNewGroupScan(List<PartitionLocation> newPartitionLocations)
throws ExecutionSetupException {
     HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
     HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
     List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
@@ -170,8 +170,8 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor
{
 
     for (HiveTable.HivePartition part: oldPartitions) {
       String partitionLocation = part.getPartition().getSd().getLocation();
-      for (String newPartitionLocation: newFiles) {
-        if (partitionLocation.equals(newPartitionLocation)) {
+      for (PartitionLocation newPartitionLocation: newPartitionLocations) {
+        if (partitionLocation.equals(newPartitionLocation.getEntirePartitionLocation()))
{
           newPartitions.add(part);
         }
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
index 49e3361..5a2ae30 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
@@ -19,10 +19,11 @@ package org.apache.drill.exec.planner.sql;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.drill.exec.planner.PartitionLocation;
+import org.apache.drill.exec.planner.SimplePartitionLocation;
 
 import java.util.List;
 
-public class HivePartitionLocation implements PartitionLocation {
+public class HivePartitionLocation extends SimplePartitionLocation {
   private final String partitionLocation;
   private final List<String> partitionValues;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
new file mode 100644
index 0000000..da3aa68
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Class defines a single partition corresponding to a directory in a DFS table.
+ */
+package org.apache.drill.exec.planner;
+
+
+import com.google.common.collect.Lists;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Composite partition location corresponds to a directory in the file system.
+ * */
+public class DFSDirPartitionLocation implements PartitionLocation {
+  // Similar to directory / file structures, subPartitions could be either a DFSDirPartitionLocation
or DFSFilePartitionLocation
+  private final Collection<PartitionLocation> subPartitions;
+  private final String[] dirs;
+
+  public DFSDirPartitionLocation(String[] dirs, Collection<PartitionLocation> subPartitions)
{
+    this.subPartitions = subPartitions;
+    this.dirs = dirs;
+  }
+
+  @Override
+  public String getPartitionValue(int index) {
+    assert index < dirs.length;
+    return dirs[index];
+  }
+
+  @Override
+  public String getEntirePartitionLocation() {
+    throw new UnsupportedOperationException("Should not call getEntirePartitionLocation for
composite partition location!");
+  }
+
+  @Override
+  public List<SimplePartitionLocation> getPartitionLocationRecursive() {
+    List<SimplePartitionLocation> results = Lists.newArrayList();
+
+    for (final PartitionLocation partitionLocation : subPartitions) {
+      results.addAll(partitionLocation.getPartitionLocationRecursive());
+    }
+
+    return results;
+  }
+
+  @Override
+  public boolean isCompositePartition() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
new file mode 100644
index 0000000..6e42f3b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Class defines a single partition in a DFS table.
+ */
+public class DFSFilePartitionLocation extends SimplePartitionLocation {
+  private final String[] dirs;
+  private final String file;
+
+  public DFSFilePartitionLocation(int max, String selectionRoot, String file) {
+    this.file = file;
+    this.dirs = new String[max];
+
+    // strip the scheme and authority if they exist
+    selectionRoot = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString();
+
+    int start = file.indexOf(selectionRoot) + selectionRoot.length();
+    String postPath = file.substring(start);
+    if (postPath.length() == 0) {
+      return;
+    }
+    if(postPath.charAt(0) == '/'){
+      postPath = postPath.substring(1);
+    }
+    String[] mostDirs = postPath.split("/");
+    int maxLoop = Math.min(max, mostDirs.length - 1);
+    for(int i =0; i < maxLoop; i++){
+      this.dirs[i] = mostDirs[i];
+    }
+  }
+
+  /**
+   * Returns the value for a give partition key
+   * @param index - Index of the partition key whose value is to be returned
+   * @return
+   */
+  @Override
+  public String getPartitionValue(int index) {
+    assert index < dirs.length;
+    return dirs[index];
+  }
+
+  /**
+   * Return the full location of this partition
+   * @return
+   */
+  @Override
+  public String getEntirePartitionLocation() {
+    return file;
+  }
+
+  public String[] getDirs() {
+    return dirs;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java
deleted file mode 100644
index e058aa2..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * Class defines a single partition in a DFS table.
- */
-public class DFSPartitionLocation implements PartitionLocation {
-  private final String[] dirs;
-  private final String file;
-
-  public DFSPartitionLocation(int max, String selectionRoot, String file) {
-    this.file = file;
-    this.dirs = new String[max];
-
-    // strip the scheme and authority if they exist
-    selectionRoot = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString();
-
-    int start = file.indexOf(selectionRoot) + selectionRoot.length();
-    String postPath = file.substring(start);
-    if (postPath.length() == 0) {
-      return;
-    }
-    if(postPath.charAt(0) == '/'){
-      postPath = postPath.substring(1);
-    }
-    String[] mostDirs = postPath.split("/");
-    int maxLoop = Math.min(max, mostDirs.length - 1);
-    for(int i =0; i < maxLoop; i++){
-      this.dirs[i] = mostDirs[i];
-    }
-  }
-
-  /**
-   * Returns the value for a give partition key
-   * @param index - Index of the partition key whose value is to be returned
-   * @return
-   */
-  @Override
-  public String getPartitionValue(int index) {
-    assert index < dirs.length;
-    return dirs[index];
-  }
-
-  /**
-   * Return the full location of this partition
-   * @return
-   */
-  @Override
-  public String getEntirePartitionLocation() {
-    return file;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index f0fcee7..cfc8542 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -17,8 +17,11 @@
  */
 package org.apache.drill.exec.planner;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +47,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -99,17 +101,6 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor
{
     return MAX_NESTED_SUBDIRS;
   }
 
-//  @Override
-//  public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
-//    if (scanRel instanceof DrillScanRel) {
-//      final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
-//      final FileGroupScan newScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
-//      return newScan;
-//    } else {
-//      throw new UnsupportedOperationException("Does not allow to get groupScan for EnumerableTableScan");
-//    }
-//  }
-
   public DrillTable getTable() {
     return table;
   }
@@ -155,6 +146,41 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor
{
 
   @Override
   protected void createPartitionSublists() {
+    final Collection<String> fileLocations = getFileLocations();
+    List<PartitionLocation> locations = new LinkedList<>();
+
+    final String selectionRoot = getBaseTableLocation();
+
+    // map used to map the partition keys (dir0, dir1, ..), to the list of partitions that
share the same partition keys.
+    // For example,
+    //   1990/Q1/1.parquet, 2.parquet
+    // would have <1990, Q1> as key, and value as list of partition location for 1.parquet
and 2.parquet.
+    HashMap<List<String>, List<PartitionLocation>> dirToFileMap = new HashMap<>();
+
+    // Figure out the list of leaf subdirectories. For each leaf subdirectory, find the list
of files (DFSFilePartitionLocation)
+    // it contains.
+    for (String file: fileLocations) {
+      DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS,
selectionRoot, file);
+
+      final String[] dirs = dfsFilePartitionLocation.getDirs();
+      final List<String> dirList = Arrays.asList(dirs);
+
+      if (!dirToFileMap.containsKey(dirList)) {
+        dirToFileMap.put(dirList, new ArrayList<PartitionLocation>());
+      }
+      dirToFileMap.get(dirList).add(dfsFilePartitionLocation);
+    }
+
+    // build a list of DFSDirPartitionLocation.
+    for (final List<String> dirs : dirToFileMap.keySet()) {
+      locations.add( new DFSDirPartitionLocation((String [])dirs.toArray(), dirToFileMap.get(dirs)));
+    }
+
+    locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
+    sublistsCreated = true;
+  }
+
+  protected Collection<String> getFileLocations() {
     Collection<String> fileLocations = null;
     if (scanRel instanceof DrillScanRel) {
       // If a particular GroupScan provides files, get the list of files from there rather
than
@@ -168,17 +194,23 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor
{
     } else if (scanRel instanceof EnumerableTableScan) {
       fileLocations = ((FormatSelection) table.getSelection()).getAsFiles();
     }
-
-    List<PartitionLocation> locations = new LinkedList<>();
-    for (String file: fileLocations) {
-      locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(),
file));
-    }
-    locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
-    sublistsCreated = true;
+    return fileLocations;
   }
 
   @Override
-  public TableScan createTableScan(List<String> newFiles) throws Exception {
+  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation) throws
Exception {
+    List<String> newFiles = Lists.newArrayList();
+    for (final PartitionLocation location : newPartitionLocation) {
+      if (!location.isCompositePartition()) {
+        newFiles.add(location.getEntirePartitionLocation());
+      } else {
+        final Collection<SimplePartitionLocation> subPartitions = location.getPartitionLocationRecursive();
+        for (final PartitionLocation subPart : subPartitions) {
+          newFiles.add(subPart.getEntirePartitionLocation());
+        }
+      }
+    }
+
     if (scanRel instanceof DrillScanRel) {
       final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
       final FileGroupScan newGroupScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index 81bcf03..07e1412 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -130,7 +130,12 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor
{
   }
 
   @Override
-  public TableScan createTableScan(List<String> newFiles) throws Exception {
+  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation) throws
Exception {
+    List<String> newFiles = Lists.newArrayList();
+    for (final PartitionLocation location : newPartitionLocation) {
+      newFiles.add(location.getEntirePartitionLocation());
+    }
+
     final GroupScan newGroupScan = createNewGroupScan(newFiles);
 
     return new DrillScanRel(scanRel.getCluster(),

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
index 719b080..70e5f86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
@@ -24,7 +24,7 @@ package org.apache.drill.exec.planner;
  * partitioning scheme) we throw UnsupportedOperationException when getPartitionValue() is
  * invoked.
  */
-public class ParquetPartitionLocation implements PartitionLocation {
+public class ParquetPartitionLocation extends SimplePartitionLocation {
   private final String file;
 
   public ParquetPartitionLocation(String file) {

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index dd3b084..f08d713 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -80,6 +80,6 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>>
{
    * @return
    * @throws Exception
    */
-  public TableScan createTableScan(List<String> newPartitions) throws Exception;
+  public TableScan createTableScan(List<PartitionLocation> newPartitions) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
index 656e3a9..f94e8cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
@@ -17,19 +17,41 @@
  */
 package org.apache.drill.exec.planner;
 
-/*
- * Interface to define a single partition. It contains the
- * location of the entire partition and also stores the
- * value of the individual partition keys for this partition.
+import java.util.List;
+
+/**
+ * Interface to define a partition. Partition could be simple,
+ * which represents a basic unit for partition, determined by
+ * the underlying storage plugin. On file system, a simple partition
+ * represents a file. Partition could be composite, consisting of
+ * other partitions. On file system storage plugin, a composite
+ * partition corresponds to a directory.
+ *
+ * Simple partition location keeps track the string representation of
+ * partition and also stores the value of the individual partition keys
+ * for this partition. Composite partition location keeps track the common
+ * partition keys, but does not keep track the the string representation of
+ * partition and leave it to each individual simple partition it consists of.
  */
 public interface PartitionLocation {
-  /*
+  /**
    * Returns the value of the 'index' partition column
    */
   public String getPartitionValue(int index);
 
-  /*
-   * Returns the string representation of this partition
+  /**
+   * Returns the string representation of this partition.
+   * Only a non-composite partition supports this.
    */
   public String getEntirePartitionLocation();
+
+  /**
+   * Returns the list of the non-composite partitions that this partition consists of.
+   */
+  public List<SimplePartitionLocation> getPartitionLocationRecursive();
+
+  /**
+   * Returns if this is a simple or composite partition.
+   */
+  public boolean isCompositePartition();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
new file mode 100644
index 0000000..523169e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * Abstract class for simple partition. It contains the
+ * location of the entire partition and also stores the
+ * value of the individual partition keys for this partition.
+ */
+public abstract  class SimplePartitionLocation implements PartitionLocation{
+  @Override
+  public boolean isCompositePartition() {
+    return false;
+  }
+
+  @Override
+  public List<SimplePartitionLocation> getPartitionLocationRecursive() {
+    return ImmutableList.of(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/dbf4b15e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 1c91d3a..a9fb101 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -205,10 +205,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule
{
     }
 
     // set up the partitions
-    List<String> newFiles = Lists.newArrayList();
+    List<PartitionLocation> newPartitions = Lists.newArrayList();
     long numTotal = 0; // total number of partitions
     int batchIndex = 0;
-    String firstLocation = null;
+    PartitionLocation firstLocation = null;
     LogicalExpression materializedExpr = null;
 
     // Outer loop: iterate over a list of batches of PartitionLocations
@@ -216,7 +216,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule
{
       numTotal += partitions.size();
       logger.debug("Evaluating partition pruning for batch {}", batchIndex);
       if (batchIndex == 0) { // save the first location in case everything is pruned
-        firstLocation = partitions.get(0).getEntirePartitionLocation();
+        firstLocation = partitions.get(0);
       }
       final NullableBitVector output = new NullableBitVector(MaterializedField.create("",
Types.optional(MinorType.BIT)), allocator);
       final VectorContainer container = new VectorContainer();
@@ -262,8 +262,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule
{
 
         InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output,
materializedExpr);
 
-        logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}",
-            miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
+        logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}
with # of partitions : {}",
+            miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex, partitions.size());
         miscTimer.reset();
 
         int recordCount = 0;
@@ -272,7 +272,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule
{
         // Inner loop: within each batch iterate over the PartitionLocations
         for(PartitionLocation part: partitions){
           if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount)
== 1){
-            newFiles.add(part.getEntirePartitionLocation());
+            newPartitions.add(part);
             qualifiedCount++;
           }
           recordCount++;
@@ -292,21 +292,23 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule
{
     }
 
     try {
+      if (newPartitions.size() == numTotal) {
+        logger.info("No partitions were eligible for pruning");
+        return;
+      }
 
+      // handle the case all partitions are filtered out.
       boolean canDropFilter = true;
 
-      if (newFiles.isEmpty()) {
+      if (newPartitions.isEmpty()) {
         assert firstLocation != null;
-        newFiles.add(firstLocation);
+        // Add the first non-composite partition location, since execution requires schema.
+        // In such case, we should not drop filter.
+        newPartitions.add(firstLocation.getPartitionLocationRecursive().get(0));
         canDropFilter = false;
       }
 
-      if (newFiles.size() == numTotal) {
-        logger.info("No partitions were eligible for pruning");
-        return;
-      }
-
-      logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size());
+      logger.info("Pruned {} partitions down to {}", numTotal, newPartitions.size());
 
       List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
       List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
@@ -318,7 +320,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule
{
       condition = condition.accept(reverseVisitor);
       pruneCondition = pruneCondition.accept(reverseVisitor);
 
-      RelNode inputRel = descriptor.createTableScan(newFiles);
+      RelNode inputRel = descriptor.createTableScan(newPartitions);
 
       if (projectRel != null) {
         inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));


Mime
View raw message