carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [27/50] [abbrv] carbondata git commit: Adding the Pages support in the Delete Method.
Date Thu, 06 Jul 2017 14:41:52 GMT
Adding the Pages support in the Delete Method.

correcting the size of the vector batch excluding the filtered rows.

changing page id from string to integer.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bbf5dc18
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bbf5dc18
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bbf5dc18

Branch: refs/heads/branch-1.1
Commit: bbf5dc1815e34921b52e9d15c6552e04dcd114d6
Parents: 2c83e02
Author: ravikiran23 <ravikiran.sn042@gmail.com>
Authored: Fri Jun 2 20:31:57 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Jun 15 13:26:50 2017 +0530

----------------------------------------------------------------------
 .../BlockletLevelDeleteDeltaDataCache.java      | 28 +++++++++++++-------
 .../core/mutate/DeleteDeltaBlockDetails.java    |  4 +--
 .../core/mutate/DeleteDeltaBlockletDetails.java | 11 ++++++--
 .../carbondata/core/mutate/TupleIdEnum.java     |  3 ++-
 .../data/BlockletDeleteDeltaCacheLoader.java    | 11 +++++---
 .../reader/CarbonDeleteFilesDataReader.java     | 25 ++++++++++-------
 .../impl/DictionaryBasedResultCollector.java    |  3 ++-
 .../DictionaryBasedVectorResultCollector.java   |  7 +++--
 .../collector/impl/RawBasedResultCollector.java |  3 ++-
 ...structureBasedDictionaryResultCollector.java |  3 ++-
 .../RestructureBasedRawResultCollector.java     |  3 ++-
 .../RestructureBasedVectorResultCollector.java  |  3 +++
 .../core/scan/result/AbstractScannedResult.java | 13 ++++++---
 .../SegmentUpdateStatusManager.java             |  2 +-
 .../sql/execution/command/IUDCommands.scala     |  4 ++-
 .../sql/execution/command/IUDCommands.scala     |  4 ++-
 16 files changed, 84 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java
b/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java
index 5d2e8ce..abad924 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java
@@ -17,26 +17,36 @@
 
 package org.apache.carbondata.core.cache.update;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
 import org.roaringbitmap.RoaringBitmap;
 
 /**
  * This class maintains delete delta data cache of each blocklet along with the block timestamp
  */
 public class BlockletLevelDeleteDeltaDataCache {
-  private RoaringBitmap deleteDelataDataCache;
+  private Map<Integer, RoaringBitmap> deleteDelataDataCache =
+      new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   private String timeStamp;
 
-  public BlockletLevelDeleteDeltaDataCache(int[] deleteDeltaFileData, String timeStamp) {
-    deleteDelataDataCache = RoaringBitmap.bitmapOf(deleteDeltaFileData);
+  public BlockletLevelDeleteDeltaDataCache(Map<Integer, Integer[]> deleteDeltaFileData,
+      String timeStamp) {
+    for (Map.Entry<Integer, Integer[]> entry : deleteDeltaFileData.entrySet()) {
+      int[] dest = new int[entry.getValue().length];
+      int i = 0;
+      for (Integer val : entry.getValue()) {
+        dest[i++] = val.intValue();
+      }
+      deleteDelataDataCache.put(entry.getKey(), RoaringBitmap.bitmapOf(dest));
+    }
     this.timeStamp = timeStamp;
   }
 
-  public boolean contains(int key) {
-    return deleteDelataDataCache.contains(key);
-  }
-
-  public int getSize() {
-    return deleteDelataDataCache.getCardinality();
+  public boolean contains(int key, Integer pageId) {
+    return deleteDelataDataCache.get(pageId).contains(key);
   }
 
   public String getCacheTimeStamp() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
index c4e9ea2..0f66d7e 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
@@ -80,8 +80,8 @@ public class DeleteDeltaBlockDetails implements Serializable {
     }
   }
 
-  public boolean addBlocklet(String blockletId, String offset) throws Exception {
-    DeleteDeltaBlockletDetails blocklet = new DeleteDeltaBlockletDetails(blockletId);
+  public boolean addBlocklet(String blockletId, String offset, Integer pageId) throws Exception
{
+    DeleteDeltaBlockletDetails blocklet = new DeleteDeltaBlockletDetails(blockletId, pageId);
     try {
       blocklet.addDeletedRow(CarbonUpdateUtil.getIntegerValue(offset));
       return addBlockletDetails(blocklet);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
index 5418211..7df5f22 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
@@ -31,6 +31,8 @@ public class DeleteDeltaBlockletDetails implements Serializable {
 
   private static final long serialVersionUID = 1206104914911491724L;
   private String id;
+  private Integer pageId;
+
   private Set<Integer> deletedRows;
 
   /**
@@ -39,9 +41,10 @@ public class DeleteDeltaBlockletDetails implements Serializable {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DeleteDeltaBlockletDetails.class.getName());
 
-  public DeleteDeltaBlockletDetails(String id) {
+  public DeleteDeltaBlockletDetails(String id, Integer pageId) {
     this.id = id;
     deletedRows = new TreeSet<Integer>();
+    this.pageId = pageId;
   }
 
   public boolean addDeletedRows(Set<Integer> rows) {
@@ -60,6 +63,10 @@ public class DeleteDeltaBlockletDetails implements Serializable {
     this.id = id;
   }
 
+  public Integer getPageId() {
+    return pageId;
+  }
+
   public Set<Integer> getDeletedRows() {
     return deletedRows;
   }
@@ -73,7 +80,7 @@ public class DeleteDeltaBlockletDetails implements Serializable {
     }
 
     DeleteDeltaBlockletDetails that = (DeleteDeltaBlockletDetails) obj;
-    return id.equals(that.id);
+    return id.equals(that.id) && pageId == that.pageId;
   }
 
   @Override public int hashCode() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java b/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
index 0c1318c..e8c60b3 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
@@ -24,7 +24,8 @@ public enum TupleIdEnum {
   SEGMENT_ID(1),
   BLOCK_ID(2),
   BLOCKLET_ID(3),
-  OFFSET(4);
+  PAGE_ID(4),
+  OFFSET(5);
 
   private int index;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
index 6665c5b..309e486 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.mutate.data;
 
+import java.util.Map;
+
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
@@ -35,8 +37,8 @@ public class BlockletDeleteDeltaCacheLoader implements DeleteDeltaCacheLoaderInt
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDeleteDeltaCacheLoader.class.getName());
 
-  public BlockletDeleteDeltaCacheLoader(String blockletID,
-       DataRefNode blockletNode, AbsoluteTableIdentifier absoluteIdentifier) {
+  public BlockletDeleteDeltaCacheLoader(String blockletID, DataRefNode blockletNode,
+      AbsoluteTableIdentifier absoluteIdentifier) {
     this.blockletID = blockletID;
     this.blockletNode = blockletNode;
     this.absoluteIdentifier = absoluteIdentifier;
@@ -49,11 +51,12 @@ public class BlockletDeleteDeltaCacheLoader implements DeleteDeltaCacheLoaderInt
   public void loadDeleteDeltaFileDataToCache() {
     SegmentUpdateStatusManager segmentUpdateStatusManager =
         new SegmentUpdateStatusManager(absoluteIdentifier);
-    int[] deleteDeltaFileData = null;
+    Map<Integer, Integer[]> deleteDeltaFileData = null;
     BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = null;
     if (null == blockletNode.getDeleteDeltaDataCache()) {
       try {
-        deleteDeltaFileData = segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
+        deleteDeltaFileData =
+            segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
         deleteDeltaDataCache = new BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData,
             segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, null));
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
index 89219e1..e689566 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -19,9 +19,10 @@ package org.apache.carbondata.core.reader;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,8 +37,6 @@ import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockletDetails;
 import org.apache.carbondata.core.util.CarbonProperties;
 
-import org.apache.commons.lang.ArrayUtils;
-
 
 /**
  * This class perform the functionality of reading multiple delete delta files
@@ -80,8 +79,8 @@ public class CarbonDeleteFilesDataReader {
    * @return
    * @throws Exception
    */
-  public int[] getDeleteDataFromAllFiles(List<String> deltaFiles, String blockletId)
-      throws Exception {
+  public Map<Integer, Integer[]> getDeleteDataFromAllFiles(List<String> deltaFiles,
+      String blockletId) throws Exception {
 
     List<Future<DeleteDeltaBlockDetails>> taskSubmitList = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
@@ -101,20 +100,26 @@ public class CarbonDeleteFilesDataReader {
       LOGGER.error("Error while reading the delete delta files : " + e.getMessage());
     }
 
-    Set<Integer> result = new TreeSet<Integer>();
+    Map<Integer, Integer[]> pageIdDeleteRowsMap =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (int i = 0; i < taskSubmitList.size(); i++) {
       try {
         List<DeleteDeltaBlockletDetails> blockletDetails =
             taskSubmitList.get(i).get().getBlockletDetails();
-        result.addAll(
-            blockletDetails.get(blockletDetails.indexOf(new DeleteDeltaBlockletDetails(blockletId)))
-                .getDeletedRows());
+        for (DeleteDeltaBlockletDetails eachBlockletDetails : blockletDetails) {
+          Integer pageId = eachBlockletDetails.getPageId();
+          Set<Integer> rows = blockletDetails
+              .get(blockletDetails.indexOf(new DeleteDeltaBlockletDetails(blockletId, pageId)))
+              .getDeletedRows();
+          pageIdDeleteRowsMap.put(pageId, rows.toArray(new Integer[rows.size()]));
+        }
+
       } catch (Throwable e) {
         LOGGER.error(e.getMessage());
         throw new Exception(e.getMessage());
       }
     }
-    return ArrayUtils.toPrimitive(result.toArray(new Integer[result.size()]));
+    return pageIdDeleteRowsMap;
 
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index b784f94..d4d16d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -109,7 +109,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
         scannedResult.incrementCounter();
       }
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId())) {
+          .contains(scannedResult.getCurrentRowId(), scannedResult.getCurrentPageCounter()))
{
         continue;
       }
       fillMeasureData(scannedResult, row);
@@ -128,6 +128,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
             .equals(queryDimensions[i].getDimension().getColName())) {
           row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
               scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR + scannedResult
+                  .getCurrentPageCounter() + CarbonCommonConstants.FILE_SEPARATOR + scannedResult
                   .getCurrentRowId(), DataType.STRING);
         } else {
           row[order[i]] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 7a8fe06..3203934 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -144,9 +144,10 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
         return;
       }
       fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
-      scannedResult.markFilteredRows(
-          columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
+      int filteredRows = scannedResult
+          .markFilteredRows(columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
       scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
+      columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
     }
   }
 
@@ -164,8 +165,6 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
       // Or set the row counter.
       scannedResult.setRowCounter(rowCounter + requiredRows);
     }
-    columnarBatch.setActualSize(
-        columnarBatch.getActualSize() + requiredRows - columnarBatch.getRowsFilteredCount());
     columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
index 0af4957..478dc8c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -61,7 +61,8 @@ public class RawBasedResultCollector extends AbstractScannedResultCollector
{
     while (scannedResult.hasNext() && rowCounter < batchSize) {
       scanResultAndGetData(scannedResult);
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId())) {
+          .contains(scannedResult.getCurrentRowId(),
+              scannedResult.getCurrentPageCounter())) {
         continue;
       }
       prepareRow(scannedResult, listBasedResult, queryMeasures);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 71045ff..4fa1494 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -81,7 +81,8 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
         scannedResult.incrementCounter();
       }
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId())) {
+          .contains(scannedResult.getCurrentRowId(),
+              scannedResult.getCurrentPageCounter())) {
         continue;
       }
       fillMeasureData(scannedResult, row);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index aa5802d..2de74fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -159,7 +159,8 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector
     while (scannedResult.hasNext() && rowCounter < batchSize) {
       scanResultAndGetData(scannedResult);
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId())) {
+          .contains(scannedResult.getCurrentRowId(),
+              scannedResult.getCurrentPageCounter())) {
         continue;
       }
       // re-fill dictionary and no dictionary key arrays for the newly added columns

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
index 3df4541..6f45c47 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
@@ -109,11 +109,14 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector
         return;
       }
       fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+      int filteredRows = scannedResult
+          .markFilteredRows(columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
       // fill default values for non existing dimensions and measures
       fillDataForNonExistingDimensions();
       fillDataForNonExistingMeasures();
       // fill existing dimensions and measures data
       scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
+      columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index a1074ea..1dda1aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -284,8 +284,10 @@ public abstract class AbstractScannedResult {
         String data = getBlockletId();
         if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
             .equals(columnVectorInfo.dimension.getColumnName())) {
-          data = data + CarbonCommonConstants.FILE_SEPARATOR +
-              (rowMapping == null ? j : rowMapping[pageCounter][j]);
+          data = data + CarbonCommonConstants.FILE_SEPARATOR + pageCounter
+              + CarbonCommonConstants.FILE_SEPARATOR + (rowMapping == null ?
+              j :
+              rowMapping[pageCounter][j]);
         }
         vector.putBytes(vectorOffset++, offset, data.length(), data.getBytes());
       }
@@ -648,17 +650,20 @@ public abstract class AbstractScannedResult {
    * @param size
    * @param vectorOffset
    */
-  public void markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size,
+  public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size,
       int vectorOffset) {
+    int rowsFiltered = 0;
     if (blockletDeleteDeltaCache != null) {
       int len = startRow + size;
       for (int i = startRow; i < len; i++) {
         int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i;
-        if (blockletDeleteDeltaCache.contains(rowId)) {
+        if (blockletDeleteDeltaCache.contains(rowId, pageCounter)) {
           columnarBatch.markFiltered(vectorOffset);
+          rowsFiltered++;
         }
         vectorOffset++;
       }
     }
+    return rowsFiltered;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index c822935..6fab563 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -254,7 +254,7 @@ public class SegmentUpdateStatusManager {
    * @return
    * @throws Exception
    */
-  public int[] getDeleteDeltaDataFromAllFiles(String tupleId) throws Exception {
+  public Map<Integer, Integer[]> getDeleteDeltaDataFromAllFiles(String tupleId) throws
Exception {
     List<String> deltaFiles = getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
     CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
     String blockletId = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCKLET_ID);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index a439c30..a292cde 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -689,7 +689,9 @@ object deleteExecution {
             val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
             val blockletId = CarbonUpdateUtil
               .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
-            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset)
+            val pageId = Integer.parseInt(CarbonUpdateUtil
+              .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
+            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
             // stop delete operation
             if(!IsValidOffset) {
               executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 01395ff..0894f23 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -704,7 +704,9 @@ object deleteExecution {
             val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
             val blockletId = CarbonUpdateUtil
               .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
-            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset)
+            val pageId = Integer.parseInt(CarbonUpdateUtil
+              .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
+            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
             // stop delete operation
             if(!IsValidOffset) {
               executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING


Mime
View raw message