carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akash...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3592] Fix query on bloom in case of multiple data files in one segment
Date Thu, 16 Jan 2020 07:06:27 GMT
This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new cbab1ac  [CARBONDATA-3592] Fix query on bloom in case of multiple data files in one
segment
cbab1ac is described below

commit cbab1ac208eaf1597e81ab7d39c2d122601cad1d
Author: kunal642 <kunalkapoor642@gmail.com>
AuthorDate: Thu Sep 26 10:48:02 2019 +0530

    [CARBONDATA-3592] Fix query on bloom in case of multiple data files in one segment
    
    Problem:
    1. Query on bloom datamap fails when there are multiple data files in one segment.
    2. Query on bloom is giving wrong results in case of multiple carbondata files.
    
    Solution:
    1. Old pruned index files were cleared from the FilteredIndexSharedNames list. So further
    pruning was not done on all the valid index files. Hence added a check to clear the index
    files only in valid scenarios. Also handled the case where wrong blocklet id is passed
while
    creating the blocklet from relative blocklet id.
    2. Make the partitions based on block path so that all the CarbonInputSplits in a MultiBlockSplit
    are used for bloom reading. This means 1 task for 1 shard(unique block path).
    
    This closes #3474
---
 .../carbondata/core/datamap/DataMapUtil.java       | 22 ++++++++++++++++------
 .../apache/carbondata/core/datamap/Segment.java    |  4 ++--
 .../indexstore/blockletindex/BlockDataMap.java     |  3 +++
 .../datamap/IndexDataMapRebuildRDD.scala           |  4 +++-
 4 files changed, 24 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index ca56962..0db5901 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -20,8 +20,10 @@ package org.apache.carbondata.core.datamap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -152,18 +154,26 @@ public class DataMapUtil {
    * Prune the segments from the already pruned blocklets.
    */
   public static void pruneSegments(List<Segment> segments, List<ExtendedBlocklet>
prunedBlocklets) {
-    Set<Segment> validSegments = new HashSet<>();
+    Map<Segment, Set<String>> validSegments = new HashMap<>();
     for (ExtendedBlocklet blocklet : prunedBlocklets) {
-      // Clear the old pruned index files if any present
-      blocklet.getSegment().getFilteredIndexShardNames().clear();
       // Set the pruned index file to the segment
       // for further pruning.
       String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());
-      blocklet.getSegment().setFilteredIndexShardName(shardName);
-      validSegments.add(blocklet.getSegment());
+      // Add the existing shards to corresponding segments
+      Set<String> existingShards = validSegments.get(blocklet.getSegment());
+      if (existingShards == null) {
+        existingShards = new HashSet<>();
+        validSegments.put(blocklet.getSegment(), existingShards);
+      }
+      existingShards.add(shardName);
+    }
+    // override the shards list in the segments.
+    for (Map.Entry<Segment, Set<String>> entry : validSegments.entrySet()) {
+      entry.getKey().setFilteredIndexShardNames(entry.getValue());
     }
     segments.clear();
-    segments.addAll(validSegments);
+    // add the new segments to the segments list.
+    segments.addAll(validSegments.keySet());
   }
 
   static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 532fd74..6384ee9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -274,8 +274,8 @@ public class Segment implements Serializable, Writable {
     return filteredIndexShardNames;
   }
 
-  public void setFilteredIndexShardName(String filteredIndexShardName) {
-    this.filteredIndexShardNames.add(filteredIndexShardName);
+  public void setFilteredIndexShardNames(Set<String> filteredIndexShardNames) {
+    this.filteredIndexShardNames = filteredIndexShardNames;
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index be29e63..30f9943 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -916,6 +916,9 @@ public class BlockDataMap extends CoarseGrainDataMap
         if (diff < 0) {
           relativeBlockletId = (short) (diff + blockletCount);
           break;
+        } else if (diff == 0) {
+          relativeBlockletId++;
+          break;
         }
         rowIndex++;
       }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index f42cc8f..8079fa0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -489,11 +489,13 @@ class IndexDataMapRebuildRDD[K, V](
       job.getConfiguration,
       tableInfo.getFactTable.getTableName)
 
+    // make the partitions based on block path so that all the CarbonInputSplits in a
+    // MultiBlockSplit are used for bloom reading. This means 1 task for 1 shard(unique block
path).
     format
       .getSplits(job)
       .asScala
       .map(_.asInstanceOf[CarbonInputSplit])
-      .groupBy(p => (p.getSegmentId, p.taskId))
+      .groupBy(p => (p.getSegmentId, p.taskId, p.getBlockPath))
       .map { group =>
         new CarbonMultiBlockSplit(
           group._2.asJava,


Mime
View raw message