carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3832]Added block and blocket pruning for the polygon expression processing
Date Mon, 13 Jul 2020 12:36:06 GMT
This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 779320b  [CARBONDATA-3832]Added block and blocket pruning for the polygon expression
processing
779320b is described below

commit 779320b7b70a9af7ee74ff7e25adfc1cc5ebce43
Author: Venu Reddy <k.venureddy2103@gmail.com>
AuthorDate: Sun Mar 22 21:56:45 2020 +0530

    [CARBONDATA-3832]Added block and blocket pruning for the polygon expression processing
    
    Why is this PR needed?
    At present, carbon doesn't do block/blocklet pruning for polygon fileter queries.
    It does rowlevel filtering at carbon layer and returns result. With this approach,
    all the carbon files are scanned irrespective of the where there are any matching
    rows in the block. It also has spark overhead to launch many jobs and tasks to process
them.
    Thus affects the overall performance of polygon query.
    
    What changes were proposed in this PR?
    Leverage the existing block pruning mechanism in the carbon and avoided the unwanted
    blocks with block pruning. Thus reduce the number of splits. And at the executor side,
    used blocklet pruning and reduced the number of blocklets to be read and scanned.
    
    This closes #3772
---
 .../core/scan/expression/UnknownExpression.java    |   8 ++
 .../carbondata/core/scan/filter/FilterUtil.java    |   9 ++
 .../executer/RowLevelFilterExecuterImpl.java       |   2 +-
 .../geo/scan/expression/PolygonExpression.java     |  30 ++++--
 .../filter/executor/PolygonFilterExecutorImpl.java | 112 +++++++++++++++++++++
 .../scala/org/apache/carbondata/geo/GeoTest.scala  |  59 +++++------
 6 files changed, 185 insertions(+), 35 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
b/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
index 2cb26a6..cbea664 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
@@ -19,8 +19,16 @@ package org.apache.carbondata.core.scan.expression;
 
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
 public abstract class UnknownExpression extends Expression {
 
   public abstract List<ColumnExpression> getAllColumnList();
 
+  public FilterExecuter getFilterExecuter(FilterResolverIntf filterResolverIntf,
+      SegmentProperties segmentProperties) {
+    return null;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 748bafd..6f121ca 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -49,6 +49,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.UnknownExpression;
 import org.apache.carbondata.core.scan.expression.conditional.*;
 import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
@@ -188,6 +189,14 @@ public final class FilterUtil {
           return new FalseFilterExecutor();
         case ROWLEVEL:
         default:
+          if (filterExpressionResolverTree.getFilterExpression() instanceof UnknownExpression)
{
+            FilterExecuter filterExecuter =
+                ((UnknownExpression) filterExpressionResolverTree.getFilterExpression())
+                    .getFilterExecuter(filterExpressionResolverTree, segmentProperties);
+            if (filterExecuter != null) {
+              return filterExecuter;
+            }
+          }
           return new RowLevelFilterExecuterImpl(
               ((RowLevelFilterResolverImpl) filterExpressionResolverTree)
                   .getDimColEvaluatorInfoList(),
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index c4d3031..d0429e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -70,7 +70,7 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
   /**
    * it has index at which given dimension is stored in file
    */
-  int[] dimensionChunkIndex;
+  protected int[] dimensionChunkIndex;
 
   /**
    * it has index at which given measure is stored in file.
diff --git a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
index ee9971a..953ba48 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
@@ -25,15 +25,20 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.ExpressionResult;
 import org.apache.carbondata.core.scan.expression.UnknownExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
 import org.apache.carbondata.core.util.CustomIndex;
+import org.apache.carbondata.geo.scan.filter.executor.PolygonFilterExecutorImpl;
 
 /**
  * InPolygon expression processor. It inputs the InPolygon string to the Geo implementation's
@@ -46,15 +51,16 @@ public class PolygonExpression extends UnknownExpression implements ConditionalE
   private CustomIndex<List<Long[]>> instance;
   private List<Long[]> ranges = new ArrayList<Long[]>();
   private ColumnExpression column;
-  private ExpressionResult trueExpRes;
-  private ExpressionResult falseExpRes;
+  private static final ExpressionResult trueExpRes =
+      new ExpressionResult(DataTypes.BOOLEAN, true);
+  private static final ExpressionResult falseExpRes =
+      new ExpressionResult(DataTypes.BOOLEAN, false);
+
 
   public PolygonExpression(String polygon, String columnName, CustomIndex indexInstance)
{
     this.polygon = polygon;
     this.instance = indexInstance;
     this.column = new ColumnExpression(columnName, DataTypes.LONG);
-    this.trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
-    this.falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false);
   }
 
   private void validate(List<Long[]> ranges) {
@@ -79,6 +85,10 @@ public class PolygonExpression extends UnknownExpression implements ConditionalE
     }
   }
 
+  public List<Long[]> getRanges() {
+    return ranges;
+  }
+
   private boolean rangeBinarySearch(List<Long[]> ranges, long searchForNumber) {
     Long[] range;
     int low = 0, mid, high = ranges.size() - 1;
@@ -147,8 +157,6 @@ public class PolygonExpression extends UnknownExpression implements ConditionalE
     instance = (CustomIndex<List<Long[]>>) in.readObject();
     column = (ColumnExpression) in.readObject();
     ranges = new ArrayList<Long[]>();
-    trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
-    falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false);
   }
 
   @Override
@@ -170,4 +178,14 @@ public class PolygonExpression extends UnknownExpression implements ConditionalE
   public List<ExpressionResult> getLiterals() {
     return null;
   }
+
+  @Override
+  public FilterExecuter getFilterExecuter(FilterResolverIntf resolver,
+      SegmentProperties segmentProperties) {
+    assert (resolver instanceof RowLevelFilterResolverImpl);
+    RowLevelFilterResolverImpl rowLevelResolver = (RowLevelFilterResolverImpl) resolver;
+    return new PolygonFilterExecutorImpl(rowLevelResolver.getDimColEvaluatorInfoList(),
+        rowLevelResolver.getMsrColEvalutorInfoList(), rowLevelResolver.getFilterExpresion(),
+        rowLevelResolver.getTableIdentifier(), segmentProperties, null);
+  }
 }
diff --git a/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
new file mode 100644
index 0000000..094dbe8
--- /dev/null
+++ b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo.scan.filter.executor;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.geo.scan.expression.PolygonExpression;
+
+/**
+ * Polygon filter executor. Prunes Blocks and Blocklets based on the selected ranges of polygon.
+ */
+public class PolygonFilterExecutorImpl extends RowLevelFilterExecuterImpl {
+  public PolygonFilterExecutorImpl(List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
+      AbsoluteTableIdentifier tableIdentifier, SegmentProperties segmentProperties,
+      Map<Integer, GenericQueryType> complexDimensionInfoMap) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
+        complexDimensionInfoMap);
+  }
+
+  private int getNearestRangeIndex(List<Long[]> ranges, long searchForNumber) {
+    Long[] range;
+    int low = 0, mid = 0, high = ranges.size() - 1;
+    while (low <= high) {
+      mid = low + ((high - low) / 2);
+      range = ranges.get(mid);
+      if (searchForNumber >= range[0]) {
+        if (searchForNumber <= range[1]) {
+          // Return the range index if the number is between min and max values of the range
+          return mid;
+        } else {
+          // Number is bigger than this range's min and max. Search on the right side of
the range
+          low = mid + 1;
+        }
+      } else {
+        // Number is smaller than this range's min and max. Search on the left side of the
range
+        high = mid - 1;
+      }
+    }
+    return mid;
+  }
+
+  /**
+   * Checks if the current block or blocklet needs to be scanned
+   * @param maxValue Max value in the current block or blocklet
+   * @param minValue Min value in te current block or blocklet
+   * @return True or False  True if current block or blocket needs to be scanned. Otherwise
False.
+   */
+  private boolean isScanRequired(byte[] maxValue, byte[] minValue) {
+    PolygonExpression polygon = (PolygonExpression) exp;
+    List<Long[]> ranges = polygon.getRanges();
+    Long min =
+        (Long) DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minValue, DataTypes.LONG);
+    Long max =
+        (Long) DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxValue, DataTypes.LONG);
+
+    // Find the nearest possible range index for both the min and max values. If a value
do not
+    // exist in the any of the range, get the preceding range index where it fits best
+    int startIndex = getNearestRangeIndex(ranges, min);
+    int endIndex = getNearestRangeIndex(ranges, max);
+    if (endIndex > startIndex) {
+       // Multiple ranges fall between min and max. Need to scan this block or blocklet
+      return true;
+    }
+    // Got same index for both min and max values.
+    Long[] oneRange = ranges.subList(startIndex, endIndex + 1).get(0);
+    if ((min >= oneRange[0] && min <= oneRange[1]) || (max >= oneRange[0]
&& max <= oneRange[1])) {
+      // Either min or max is within the range
+      return true;
+    }
+    // No range between min and max values. Scan can be avoided for this block or blocklet
+    return false;
+  }
+
+  @Override
+  public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue,
+      boolean[] isMinMaxSet) {
+    assert (exp instanceof PolygonExpression);
+    int dimIndex = dimensionChunkIndex[0];
+    BitSet bitSet = new BitSet(1);
+    if (isMinMaxSet[dimIndex] && isScanRequired(blockMaxValue[dimIndex], blockMinValue[dimIndex]))
{
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+}
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 0c72e5c..baf2f61 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -10,6 +10,13 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
   val table1 = "geoTable1"
   val table2 = "geotable2"
+  val result = Seq(Row(116187332, 39979316),
+    Row(116362699, 39942444),
+    Row(116288955, 39999101),
+    Row(116325378, 39963129),
+    Row(116337069, 39951887),
+    Row(116285807, 40084087))
+
   override def beforeAll(): Unit = {
     drop()
   }
@@ -89,12 +96,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach
{
     checkAnswer(
       sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 40.123503,
" +
           s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
-      Seq(Row(116187332, 39979316),
-        Row(116362699, 39942444),
-        Row(116288955, 39999101),
-        Row(116325378, 39963129),
-        Row(116337069, 39951887),
-        Row(116285807, 40084087)))
+      result)
   }
 
   test("test insert into table select from another table") {
@@ -107,16 +109,10 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach
{
     checkAnswer(
       sql(s"select longitude, latitude from $targetTable where IN_POLYGON('116.321011 40.123503,
" +
           s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
-      Seq(Row(116187332, 39979316),
-        Row(116362699, 39942444),
-        Row(116288955, 39999101),
-        Row(116325378, 39963129),
-        Row(116337069, 39951887),
-        Row(116285807, 40084087)))
+      result)
   }
 
-  test("test insert into table select from another table with target table sort scope as
global")
-  {
+  test("test insert into table select from another table with target table sort scope as
global") {
     val sourceTable = table1;
     val targetTable = table2;
     createTable(sourceTable)
@@ -126,16 +122,28 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach
{
     checkAnswer(
       sql(s"select longitude, latitude from $targetTable where IN_POLYGON('116.321011 40.123503,
" +
           s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
-      Seq(Row(116187332, 39979316),
-        Row(116362699, 39942444),
-        Row(116288955, 39999101),
-        Row(116325378, 39963129),
-        Row(116337069, 39951887),
-        Row(116285807, 40084087)))
+      result)
+  }
+
+  test("test block pruning for polygon query") {
+    createTable()
+    sql(s"insert into $table1 select 1575428400000,116285807,40084087")
+    sql(s"insert into $table1 select 1575428400000,116372142,40129503")
+    sql(s"insert into $table1 select 1575428400000,116187332,39979316")
+    sql(s"insert into $table1 select 1575428400000,116337069,39951887")
+    sql(s"insert into $table1 select 1575428400000,116359102,40154684")
+    sql(s"insert into $table1 select 1575428400000,116736367,39970323")
+    sql(s"insert into $table1 select 1575428400000,116362699,39942444")
+    sql(s"insert into $table1 select 1575428400000,116325378,39963129")
+    sql(s"insert into $table1 select 1575428400000,116302895,39930753")
+    sql(s"insert into $table1 select 1575428400000,116288955,39999101")
+    val df = sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 "
+
+                 s"40.123503, 116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')")
+    assert(df.rdd.getNumPartitions == 6)
+    checkAnswer(df, result)
   }
 
-  test("test polygon query on table partitioned by timevalue column")
-  {
+  test("test polygon query on table partitioned by timevalue column") {
     sql(s"""
            | CREATE TABLE $table1(
            | longitude LONG,
@@ -156,12 +164,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach
{
     checkAnswer(
       sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 40.123503,
" +
           s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
-      Seq(Row(116187332, 39979316),
-        Row(116362699, 39942444),
-        Row(116288955, 39999101),
-        Row(116325378, 39963129),
-        Row(116337069, 39951887),
-        Row(116285807, 40084087)))
+      result)
   }
 
   override def afterEach(): Unit = {


Mime
View raw message