carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [06/50] [abbrv] carbondata git commit: [CARBONDATA-1731, CARBONDATA-1728] Update, Delete fails incorrectly with error
Date Sun, 07 Jan 2018 03:05:14 GMT
[CARBONDATA-1731,CARBONDATA-1728] Update,Delete fails incorrectly with error

[QueryExecution] Fetch BlockletId in Executor. Currently the blockletId are not propagated
to the excutrs properly. Due to this reason the Implicit columns i.e. tupleId formed in executers
can wrongly form duplicate tupleId. For e.g. a Block having two blocklets i.e. 0 and 1, in
executors each tasks picks each blocklets. As the ID is not propagated each executor will
term the Blocklets as ID 0. Solution is to propagate the Blocklet IDs to the executor from
driver

This closes #1719


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0c8fa590
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0c8fa590
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0c8fa590

Branch: refs/heads/carbonstore
Commit: 0c8fa5908afd153e28d08009ca53241dbd1a506a
Parents: f635106
Author: anubhav100 <anubhav.tarar@knoldus.in>
Authored: Thu Dec 14 12:54:08 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Wed Dec 27 09:58:35 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/datastore/DataRefNode.java  |  6 ++
 .../core/datastore/block/TableBlockInfo.java    | 53 ++++++++++++---
 .../datastore/impl/btree/BTreeNonLeafNode.java  |  4 ++
 .../impl/btree/BlockBTreeLeafNode.java          |  8 +++
 .../impl/btree/BlockletBTreeLeafNode.java       |  4 ++
 .../core/indexstore/BlockletDetailInfo.java     | 12 ++++
 .../blockletindex/BlockletDataMap.java          |  1 +
 .../BlockletDataRefNodeWrapper.java             |  5 ++
 .../scan/scanner/AbstractBlockletScanner.java   |  2 +-
 .../core/scan/scanner/impl/FilterScanner.java   |  2 +-
 .../carbondata/hadoop/CarbonInputFormat.java    | 34 +++++-----
 .../carbondata/hadoop/CarbonInputSplit.java     | 54 +++++++++------
 .../hadoop/api/CarbonTableInputFormat.java      | 14 ++--
 .../internal/index/impl/InMemoryBTreeIndex.java | 14 ++--
 .../presto/impl/CarbonLocalInputSplit.java      |  4 +-
 .../iud/TestUpdateAndDeleteWithLargeData.scala  | 69 ++++++++++++++++++++
 .../CarbonProjectForDeleteCommand.scala         |  5 ++
 17 files changed, 228 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
index 37acb02..13d5f69 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
@@ -52,6 +52,12 @@ public interface DataRefNode {
   long nodeNumber();
 
   /**
+   * Method is used for retreiving the BlockletId.
+   * @return the blockletid related to the data block.
+   */
+  String blockletId();
+
+  /**
    * This method will be used to get the max value of all the columns this can
    * be used in case of filter query
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 910c9bb..69d10ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -64,6 +64,11 @@ public class TableBlockInfo implements Distributable, Serializable {
    */
   private String segmentId;
 
+  /**
+   * id of the Blocklet.
+   */
+  private String blockletId;
+
   private String[] locations;
 
   private ColumnarFormatVersion version;
@@ -76,7 +81,7 @@ public class TableBlockInfo implements Distributable, Serializable {
    * map of block location and storage id
    */
   private Map<String, String> blockStorageIdMap =
-          new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+      new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
   /**
    * delete delta files path for this block
@@ -85,9 +90,11 @@ public class TableBlockInfo implements Distributable, Serializable {
 
   private BlockletDetailInfo detailInfo;
 
-  public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
-      long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
+  public TableBlockInfo(String filePath, long blockOffset, String segmentId,
+      String[] locations, long blockLength, ColumnarFormatVersion version,
+      String[] deletedDeltaFilePath) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
+    this.blockletId = "0";
     this.blockOffset = blockOffset;
     this.segmentId = segmentId;
     this.locations = locations;
@@ -113,11 +120,30 @@ public class TableBlockInfo implements Distributable, Serializable {
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
       long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version,
       String[] deletedDeltaFilePath) {
-    this(filePath, blockOffset, segmentId, locations, blockLength, version, deletedDeltaFilePath);
+    this(filePath, blockOffset, segmentId, locations, blockLength, version,
+        deletedDeltaFilePath);
     this.blockletInfos = blockletInfos;
   }
 
   /**
+   * constructor to initialize the TableBlockInfo with blockletIds
+   *
+   * @param filePath
+   * @param blockOffset
+   * @param segmentId
+   * @param locations
+   * @param blockLength
+   * @param blockletInfos
+   */
+  public TableBlockInfo(String filePath, String blockletId, long blockOffset, String segmentId,
+      String[] locations, long blockLength, BlockletInfos blockletInfos,
+      ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
+    this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version,
+        deletedDeltaFilePath);
+    this.blockletId = blockletId;
+  }
+
+  /**
    * constructor to initialize the TableBlockInfo with blockStorageIdMap
    *
    * @param filePath
@@ -129,11 +155,12 @@ public class TableBlockInfo implements Distributable, Serializable {
    * @param version
    * @param blockStorageIdMap
    */
-  public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
-      long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version,
-      Map<String, String> blockStorageIdMap, String[] deletedDeltaFilePath) {
-    this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version,
-        deletedDeltaFilePath);
+  public TableBlockInfo(String filePath, String blockletId, long blockOffset, String segmentId,
+      String[] locations, long blockLength, BlockletInfos blockletInfos,
+      ColumnarFormatVersion version, Map<String, String> blockStorageIdMap,
+      String[] deletedDeltaFilePath) {
+    this(filePath, blockletId, blockOffset, segmentId, locations, blockLength, blockletInfos,
+        version, deletedDeltaFilePath);
     this.blockStorageIdMap = blockStorageIdMap;
   }
 
@@ -356,4 +383,12 @@ public class TableBlockInfo implements Distributable, Serializable {
   public void setDetailInfo(BlockletDetailInfo detailInfo) {
     this.detailInfo = detailInfo;
   }
+
+  public String getBlockletId() {
+    return blockletId;
+  }
+
+  public void setBlockletId(String blockletId) {
+    this.blockletId = blockletId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
index 62fcabf..ccc5e12 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
@@ -135,6 +135,10 @@ public class BTreeNonLeafNode implements BTreeNode {
     throw new UnsupportedOperationException("Unsupported operation");
   }
 
+  @Override public String blockletId() {
+    throw new UnsupportedOperationException("Unsupported operation");
+  }
+
   /**
    * This method will be used to get the max value of all the columns this can
    * be used in case of filter query

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java
index f248ce0..25817f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java
@@ -60,6 +60,14 @@ public class BlockBTreeLeafNode extends AbstractBTreeLeafNode {
   }
 
   /**
+   * Below method is suppose to return the Blocklet ID.
+   * @return
+   */
+  @Override public String blockletId() {
+    return blockInfo.getTableBlockInfo().getDetailInfo().getBlockletId().toString();
+  }
+
+  /**
    * number of pages in blocklet
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
index cf8a2ad..82c4169 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
@@ -98,6 +98,10 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode {
     }
   }
 
+  @Override public String blockletId() {
+    return "0";
+  }
+
   /**
    * Below method will be used to get the dimension chunks
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index 68dedd8..658a1e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -36,6 +36,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
 
   private short versionNumber;
 
+  private short blockletId;
+
   private int[] dimLens;
 
   private long schemaUpdatedTimeStamp;
@@ -94,6 +96,7 @@ public class BlockletDetailInfo implements Serializable, Writable {
     out.writeInt(rowCount);
     out.writeShort(pagesCount);
     out.writeShort(versionNumber);
+    out.writeShort(blockletId);
     out.writeShort(dimLens.length);
     for (int i = 0; i < dimLens.length; i++) {
       out.writeInt(dimLens[i]);
@@ -106,6 +109,7 @@ public class BlockletDetailInfo implements Serializable, Writable {
     rowCount = in.readInt();
     pagesCount = in.readShort();
     versionNumber = in.readShort();
+    blockletId = in.readShort();
     dimLens = new int[in.readShort()];
     for (int i = 0; i < dimLens.length; i++) {
       dimLens[i] = in.readInt();
@@ -114,4 +118,12 @@ public class BlockletDetailInfo implements Serializable, Writable {
     blockletInfo = new BlockletInfo();
     blockletInfo.readFields(in);
   }
+
+  public Short getBlockletId() {
+    return blockletId;
+  }
+
+  public void setBlockletId(Short blockletId) {
+    this.blockletId = blockletId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index f1188cb..1e54019 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -599,6 +599,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
     detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
     detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
     detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
+    detailInfo.setBlockletId((short) blockletId);
     detailInfo.setDimLens(columnCardinality);
     detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
     BlockletInfo blockletInfo = new BlockletInfo();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
index b1331bf..7868308 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -53,6 +53,7 @@ public class BlockletDataRefNodeWrapper implements DataRefNode {
       BlockletDetailInfo detailInfo = blockInfo.getDetailInfo();
       detailInfo.getBlockletInfo().setNumberOfRows(detailInfo.getRowCount());
       detailInfo.getBlockletInfo().setNumberOfPages(detailInfo.getPagesCount());
+      detailInfo.setBlockletId(blockInfo.getDetailInfo().getBlockletId());
       int[] pageRowCount = new int[detailInfo.getPagesCount()];
       int numberOfPagesCompletelyFilled = detailInfo.getRowCount()
           / CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
@@ -86,6 +87,10 @@ public class BlockletDataRefNodeWrapper implements DataRefNode {
     return index;
   }
 
+  @Override public String blockletId() {
+    return blockInfos.get(index).getDetailInfo().getBlockletId().toString();
+  }
+
   @Override public byte[][] getColumnsMaxValue() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
index 1e4becd..19d6f48 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -76,7 +76,7 @@ public abstract class AbstractBlockletScanner implements BlockletScanner
{
         totalPagesScanned.getCount() + blocksChunkHolder.getDataBlock().numberOfPages());
     scannedResult.setBlockletId(
         blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
-            .getDataBlock().nodeNumber());
+            .getDataBlock().blockletId());
     DimensionRawColumnChunk[] dimensionRawColumnChunks =
         blocksChunkHolder.getDimensionRawDataChunk();
     DimensionColumnDataChunk[][] dimensionColumnDataChunks =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
index abfc5f4..53c647c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -175,7 +175,7 @@ public class FilterScanner extends AbstractBlockletScanner {
     AbstractScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
     scannedResult.setBlockletId(
         blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
-            .getDataBlock().nodeNumber());
+            .getDataBlock().blockletId());
     // valid scanned blocklet
     QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 3e8ede2..ac5028f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -409,7 +409,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       if (segmentId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
         continue;
       }
-      carbonSplits.add(CarbonInputSplit.from(segmentId, fileSplit,
+      carbonSplits.add(CarbonInputSplit.from(segmentId, "0", fileSplit,
           ColumnarFormatVersion.valueOf(
               CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION)));
     }
@@ -452,9 +452,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     Boolean  isIUDTable = false;
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
-            getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+        getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
     SegmentUpdateStatusManager updateStatusManager =
-            new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
 
     isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
 
@@ -476,22 +476,23 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
           // In case IUD is not performed in this table avoid searching for
           // invalidated blocks.
           if (CarbonUtil
-                  .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
-                          invalidBlockVOForSegmentId, updateStatusManager)) {
+              .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+                  invalidBlockVOForSegmentId, updateStatusManager)) {
             continue;
           }
           // When iud is done then only get delete delta files for a block
           try {
             deleteDeltaFilePath =
-                    updateStatusManager.getDeleteDeltaFilePath(tableBlockInfo.getFilePath());
+                updateStatusManager.getDeleteDeltaFilePath(tableBlockInfo.getFilePath());
           } catch (Exception e) {
             throw new IOException(e);
           }
         }
-        result.add(new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()),
-            tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
-            tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
-            tableBlockInfo.getVersion(), deleteDeltaFilePath));
+        result.add(new CarbonInputSplit(segmentNo, tableBlockInfo.getBlockletId(),
+            new Path(tableBlockInfo.getFilePath()), tableBlockInfo.getBlockOffset(),
+            tableBlockInfo.getBlockLength(), tableBlockInfo.getLocations(),
+            tableBlockInfo.getBlockletInfos().getNoOfBlockLets(), tableBlockInfo.getVersion(),
+            deleteDeltaFilePath));
       }
     }
     return result;
@@ -583,7 +584,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, UpdateVO updateDetails,
       SegmentUpdateStatusManager updateStatusManager,
       String segmentId, Set<SegmentTaskIndexStore.TaskBucketHolder> validTaskKeys)
-    throws IOException {
+      throws IOException {
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
 
     // get file location of all files of given segment
@@ -603,7 +604,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
         BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(),
0,
             carbonInputSplit.getNumberOfBlocklets());
         tableBlockInfoList.add(
-            new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+            new TableBlockInfo(carbonInputSplit.getPath().toString(),
+                carbonInputSplit.getBlockletId(), carbonInputSplit.getStart(),
                 tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(),
                 carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(),
                 carbonInputSplit.getBlockStorageIdMap(), carbonInputSplit.getDeleteDeltaFiles()));
@@ -613,9 +615,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
   }
 
   private boolean isValidBlockBasedOnUpdateDetails(
-          Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, CarbonInputSplit carbonInputSplit,
-          UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String
segmentId,
-          Set<SegmentTaskIndexStore.TaskBucketHolder> validTaskKeys) {
+      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, CarbonInputSplit carbonInputSplit,
+      UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId,
+      Set<SegmentTaskIndexStore.TaskBucketHolder> validTaskKeys) {
     String taskID = null;
     if (null != carbonInputSplit) {
       if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName()))
{
@@ -741,7 +743,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
                 updateStatusManager);
         for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskMap
:
             taskAbstractIndexMap
-            .entrySet()) {
+                .entrySet()) {
           AbstractIndex taskAbstractIndex = taskMap.getValue();
           countOfBlocksInSeg += new BlockLevelTraverser()
               .getBlockRowMapping(taskAbstractIndex, blockRowCountMapping, eachValidSeg,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index e89c2d6..0bc7e60 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -55,6 +55,8 @@ public class CarbonInputSplit extends FileSplit
   private String segmentId;
 
   private String bucketId;
+
+  private String blockletId;
   /*
    * Invalid segments that need to be removed in task side index
    */
@@ -71,7 +73,7 @@ public class CarbonInputSplit extends FileSplit
    * map of blocklocation and storage id
    */
   private Map<String, String> blockStorageIdMap =
-          new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+      new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
   private List<UpdateVO> invalidTimestampsList;
 
@@ -88,13 +90,14 @@ public class CarbonInputSplit extends FileSplit
     segmentId = null;
     taskId = "0";
     bucketId = "0";
+    blockletId = "0";
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
   }
 
-  private CarbonInputSplit(String segmentId, Path path, long start, long length, String[]
locations,
-      ColumnarFormatVersion version, String[] deleteDeltaFiles) {
+  private CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long
length,
+      String[] locations, ColumnarFormatVersion version, String[] deleteDeltaFiles) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
     String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
@@ -103,14 +106,16 @@ public class CarbonInputSplit extends FileSplit
     }
     this.taskId = taskNo;
     this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName());
+    this.blockletId = blockletId;
     this.invalidSegments = new ArrayList<>();
     this.version = version;
     this.deleteDeltaFiles = deleteDeltaFiles;
   }
 
-  public CarbonInputSplit(String segmentId, Path path, long start, long length, String[]
locations,
-      int numberOfBlocklets, ColumnarFormatVersion version, String[] deleteDeltaFiles) {
-    this(segmentId, path, start, length, locations, version, deleteDeltaFiles);
+  public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long
length,
+      String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
+      String[] deleteDeltaFiles) {
+    this(segmentId, blockletId, path, start, length, locations, version, deleteDeltaFiles);
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
@@ -121,6 +126,7 @@ public class CarbonInputSplit extends FileSplit
     this.fileFormat = fileFormat;
     taskId = "0";
     bucketId = "0";
+    blockletId = "0";
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
@@ -133,6 +139,7 @@ public class CarbonInputSplit extends FileSplit
     this.fileFormat = fileFormat;
     taskId = "0";
     bucketId = "0";
+    blockletId = "0";
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
@@ -149,18 +156,18 @@ public class CarbonInputSplit extends FileSplit
    * @param version
    * @param blockStorageIdMap
    */
-  public CarbonInputSplit(String segmentId, Path path, long start, long length, String[]
locations,
-      int numberOfBlocklets, ColumnarFormatVersion version, Map<String, String> blockStorageIdMap,
-      String[] deleteDeltaFiles) {
-    this(segmentId, path, start, length, locations, numberOfBlocklets, version, deleteDeltaFiles);
+  public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long
length,
+      String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
+      Map<String, String> blockStorageIdMap, String[] deleteDeltaFiles) {
+    this(segmentId, blockletId, path, start, length, locations, numberOfBlocklets, version,
+        deleteDeltaFiles);
     this.blockStorageIdMap = blockStorageIdMap;
   }
 
-  public static CarbonInputSplit from(String segmentId, FileSplit split,
-      ColumnarFormatVersion version)
-      throws IOException {
-    return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
-        split.getLocations(), version, null);
+  public static CarbonInputSplit from(String segmentId, String blockletId, FileSplit split,
+      ColumnarFormatVersion version) throws IOException {
+    return new CarbonInputSplit(segmentId, blockletId, split.getPath(), split.getStart(),
+        split.getLength(), split.getLocations(), version, null);
   }
 
   public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList)
{
@@ -170,9 +177,9 @@ public class CarbonInputSplit extends FileSplit
           new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
       try {
         TableBlockInfo blockInfo =
-            new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
-                split.getLocations(), split.getLength(), blockletInfos, split.getVersion(),
-                split.getDeleteDeltaFiles());
+            new TableBlockInfo(split.getPath().toString(), split.blockletId, split.getStart(),
+                split.getSegmentId(), split.getLocations(), split.getLength(), blockletInfos,
+                split.getVersion(), split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
         tableBlockInfoList.add(blockInfo);
       } catch (IOException e) {
@@ -187,9 +194,10 @@ public class CarbonInputSplit extends FileSplit
         new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
     try {
       TableBlockInfo blockInfo =
-          new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
-              inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
-              blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+          new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.blockletId,
+              inputSplit.getStart(), inputSplit.getSegmentId(), inputSplit.getLocations(),
+              inputSplit.getLength(), blockletInfos, inputSplit.getVersion(),
+              inputSplit.getDeleteDeltaFiles());
       blockInfo.setDetailInfo(inputSplit.getDetailInfo());
       return blockInfo;
     } catch (IOException e) {
@@ -206,6 +214,7 @@ public class CarbonInputSplit extends FileSplit
     this.segmentId = in.readUTF();
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
     this.bucketId = in.readUTF();
+    this.blockletId = in.readUTF();
     int numInvalidSegment = in.readInt();
     invalidSegments = new ArrayList<>(numInvalidSegment);
     for (int i = 0; i < numInvalidSegment; i++) {
@@ -228,6 +237,7 @@ public class CarbonInputSplit extends FileSplit
     out.writeUTF(segmentId);
     out.writeShort(version.number());
     out.writeUTF(bucketId);
+    out.writeUTF(blockletId);
     out.writeInt(invalidSegments.size());
     for (String invalidSegment : invalidSegments) {
       out.writeUTF(invalidSegment);
@@ -281,6 +291,8 @@ public class CarbonInputSplit extends FileSplit
     return bucketId;
   }
 
+  public String getBlockletId() { return blockletId; }
+
   @Override public int compareTo(Distributable o) {
     if (o == null) {
       return -1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 22eca9f..6d975a6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -112,7 +112,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
   public static final String INPUT_SEGMENT_NUMBERS =
       "mapreduce.input.carboninputformat.segmentnumbers";
   public static final String VALIDATE_INPUT_SEGMENT_IDs =
-            "mapreduce.input.carboninputformat.validsegments";
+      "mapreduce.input.carboninputformat.validsegments";
   // comma separated list of input files
   public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
   public static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
@@ -155,7 +155,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
       TableInfo output = new TableInfo();
       output.readFields(
           new DataInputStream(
-            new ByteArrayInputStream(ObjectSerializationUtil.decodeStringToBytes(tableInfoStr))));
+              new ByteArrayInputStream(ObjectSerializationUtil.decodeStringToBytes(tableInfoStr))));
       return output;
     }
   }
@@ -681,14 +681,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
         // In case IUD is not performed in this table avoid searching for
         // invalidated blocks.
         if (CarbonUtil
-                .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
-                        invalidBlockVOForSegmentId, updateStatusManager)) {
+            .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+                invalidBlockVOForSegmentId, updateStatusManager)) {
           continue;
         }
         // When iud is done then only get delete delta files for a block
         try {
           deleteDeltaFilePath =
-                  updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString());
+              updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString());
         } catch (Exception e) {
           throw new IOException(e);
         }
@@ -794,8 +794,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     }
     org.apache.carbondata.hadoop.CarbonInputSplit split =
         org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
-            new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(),
-                blocklet.getLocations()),
+            blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
+                blocklet.getLength(), blocklet.getLocations()),
             ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
     split.setDetailInfo(blocklet.getDetailInfo());
     return split;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index 37796db..a590a5b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -87,10 +87,11 @@ class InMemoryBTreeIndex implements Index {
     for (DataRefNode dataRefNode : dataRefNodes) {
       BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
       TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
-      result.add(new CarbonInputSplit(segment.getId(), new Path(tableBlockInfo.getFilePath()),
-          tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
-          tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
-          tableBlockInfo.getVersion(), null));
+      result.add(new CarbonInputSplit(segment.getId(),
+          tableBlockInfo.getDetailInfo().getBlockletId().toString(),
+          new Path(tableBlockInfo.getFilePath()), tableBlockInfo.getBlockOffset(),
+          tableBlockInfo.getBlockLength(), tableBlockInfo.getLocations(),
+          tableBlockInfo.getBlockletInfos().getNoOfBlockLets(), tableBlockInfo.getVersion(),
null));
     }
     return result;
   }
@@ -140,8 +141,9 @@ class InMemoryBTreeIndex implements Index {
       BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(),
0,
           carbonInputSplit.getNumberOfBlocklets());
       tableBlockInfoList.add(
-          new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
-              segment.getId(), carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
+          new TableBlockInfo(carbonInputSplit.getPath().toString(),
+              carbonInputSplit.getBlockletId(),carbonInputSplit.getStart(), segment.getId(),
+              carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
               blockletInfos, carbonInputSplit.getVersion(),
               carbonInputSplit.getDeleteDeltaFiles()));
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index e209b97..1873b8c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -100,7 +100,7 @@ public class CarbonLocalInputSplit {
       @JsonProperty("version") short version,
       @JsonProperty("deleteDeltaFiles") String[] deleteDeltaFiles,
       @JsonProperty("detailInfo") String detailInfo
-      ) {
+  ) {
     this.path = path;
     this.start = start;
     this.length = length;
@@ -115,7 +115,7 @@ public class CarbonLocalInputSplit {
   }
 
   public static  CarbonInputSplit convertSplit(CarbonLocalInputSplit carbonLocalInputSplit)
{
-    CarbonInputSplit inputSplit = new CarbonInputSplit(carbonLocalInputSplit.getSegmentId(),
+    CarbonInputSplit inputSplit = new CarbonInputSplit(carbonLocalInputSplit.getSegmentId(),
"0",
         new Path(carbonLocalInputSplit.getPath()), carbonLocalInputSplit.getStart(),
         carbonLocalInputSplit.getLength(), carbonLocalInputSplit.getLocations()
         .toArray(new String[carbonLocalInputSplit.getLocations().size()]),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestUpdateAndDeleteWithLargeData.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestUpdateAndDeleteWithLargeData.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestUpdateAndDeleteWithLargeData.scala
new file mode 100644
index 0000000..980b2b7
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestUpdateAndDeleteWithLargeData.scala
@@ -0,0 +1,69 @@
+package org.apache.carbondata.spark.testsuite.iud
+
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestUpdateAndDeleteWithLargeData extends QueryTest with BeforeAndAfterAll {
+  var df: DataFrame = _
+
+  override def beforeAll {
+    dropTable()
+    buildTestData()
+  }
+
+ private def buildTestData(): Unit = {
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+
+    // Simulate data and write to table orders
+    import sqlContext.implicits._
+
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 1500000)
+      .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime),
+        "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,"ordersTable"+value))
+      .toDF("o_id", "o_date", "o_country", "o_name",
+        "o_phonetype", "o_serialname", "o_salary","o_comment")
+      createTable()
+
+  }
+
+ private def dropTable() = {
+    sql("DROP TABLE IF EXISTS orders")
+
+  }
+
+  private def createTable(): Unit ={
+    df.write
+      .format("carbondata")
+      .option("tableName", "orders")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
+
+  test("test the update and delete delete functionality for large data") {
+
+    sql(
+      """
+            update ORDERS set (o_comment) = ('yyy')""").show()
+    checkAnswer(sql(
+      """select o_comment from orders limit 2 """),Seq(Row("yyy"),Row("yyy")))
+
+    sql("delete from orders where exists (select 1 from orders)")
+
+    checkAnswer(sql(
+      """
+           SELECT count(*) FROM orders
+           """), Row(0))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c8fa590/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index ecc48cf..e54ea9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -82,6 +82,11 @@ private[sql] case class CarbonProjectForDeleteCommand(
         HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
           isUpdateOperation = false)
 
+
+        if (executorErrors.failureCauses != FailureCauses.NONE) {
+          throw new Exception(executorErrors.errorMsg)
+        }
+
         // trigger post event for Delete from table
         val deleteFromTablePostEvent: DeleteFromTablePostEvent =
           DeleteFromTablePostEvent(sparkSession, carbonTable)


Mime
View raw message