carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [14/22] incubator-carbondata git commit: IUD query flow support for update and delete delta files
Date Fri, 06 Jan 2017 13:57:14 GMT
IUD query flow support for update and delete delta files


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/427b202c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/427b202c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/427b202c

Branch: refs/heads/master
Commit: 427b202c04f26c89a56ef12fc3bbff0263198a2b
Parents: f9fb1b9
Author: sujith71955 <sujithchacko.2010@gmail.com>
Authored: Mon Jan 2 11:30:59 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jan 6 19:16:29 2017 +0530

----------------------------------------------------------------------
 .../BlockletLevelDeleteDeltaDataCache.java      |  29 +++
 .../cache/DeleteDeltaDataCache.java             |  29 +++
 .../iuddata/BlockletDeleteDeltaCacheLoader.java |  59 ++++++
 .../iuddata/DeleteDeltaCacheLoaderIntf.java     |  10 +
 .../iuddata/DeleteDeltaDataUtil.java            |  44 +++++
 .../core/carbon/datastore/DataRefNode.java      |  11 ++
 .../datastore/TableSegmentUniqueIdentifier.java |  16 +-
 .../block/SegmentTaskIndexWrapper.java          |  33 +++-
 .../carbon/datastore/block/TableBlockInfo.java  |  53 +++++-
 .../impl/btree/AbstractBTreeLeafNode.java       |  22 +++
 .../datastore/impl/btree/BTreeNonLeafNode.java  |  28 ++-
 .../impl/btree/BlockBTreeLeafNode.java          |   2 +-
 .../carbondata/hadoop/CarbonInputFormat.java    | 181 ++++++++++++++++---
 .../carbondata/hadoop/CarbonInputSplit.java     |  34 ++++
 .../hadoop/util/BlockLevelTraverser.java        |  69 +++++++
 .../hadoop/test/util/StoreCreator.java          |  10 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |  45 +++++
 .../carbondata/spark/util/QueryPlanUtil.scala   |  56 ++++++
 .../spark/sql/CarbonDatasourceRelation.scala    |   7 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  |   7 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   3 +-
 21 files changed, 694 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/BlockletLevelDeleteDeltaDataCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/BlockletLevelDeleteDeltaDataCache.java b/core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/BlockletLevelDeleteDeltaDataCache.java
new file mode 100644
index 0000000..9ecd40f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/BlockletLevelDeleteDeltaDataCache.java
@@ -0,0 +1,29 @@
+package org.apache.carbondata.common.iudprocessor.cache;
+
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * Created by S71955 on 06-10-2016.
+ */
+public class BlockletLevelDeleteDeltaDataCache {
+  private RoaringBitmap deleteDelataDataCache;
+  private String timeStamp;
+
+  public BlockletLevelDeleteDeltaDataCache(int[] deleteDeltaFileData, String timeStamp) {
+    deleteDelataDataCache = RoaringBitmap.bitmapOf(deleteDeltaFileData);
+    this.timeStamp=timeStamp;
+  }
+
+  public boolean contains(int key) {
+    return deleteDelataDataCache.contains(key);
+  }
+
+  public int getSize() {
+    return deleteDelataDataCache.getCardinality();
+  }
+
+  public String getCacheTimeStamp() {
+    return timeStamp;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/DeleteDeltaDataCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/DeleteDeltaDataCache.java b/core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/DeleteDeltaDataCache.java
new file mode 100644
index 0000000..7fd4d04
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/DeleteDeltaDataCache.java
@@ -0,0 +1,29 @@
+package org.apache.carbondata.common.iudprocessor.cache;
+
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * Created by S71955 on 06-10-2016.
+ */
+public class DeleteDeltaDataCache {
+  private RoaringBitmap deleteDelataDataCache;
+  private String timeStamp;
+
+  public DeleteDeltaDataCache(int[] deleteDeltaFileData, String timeStamp) {
+    deleteDelataDataCache = RoaringBitmap.bitmapOf(deleteDeltaFileData);
+    this.timeStamp=timeStamp;
+  }
+
+  public boolean contains(int key) {
+    return deleteDelataDataCache.contains(key);
+  }
+
+  public int getSize() {
+    return deleteDelataDataCache.getCardinality();
+  }
+
+  public String getCacheTimeStamp() {
+    return timeStamp;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/BlockletDeleteDeltaCacheLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/BlockletDeleteDeltaCacheLoader.java b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/BlockletDeleteDeltaCacheLoader.java
new file mode 100644
index 0000000..81f344c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/BlockletDeleteDeltaCacheLoader.java
@@ -0,0 +1,59 @@
+package org.apache.carbondata.common.iudprocessor.iuddata;
+
+import org.apache.carbondata.common.iudprocessor.cache.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.DataRefNode;
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
+
+public class BlockletDeleteDeltaCacheLoader implements DeleteDeltaCacheLoaderIntf {
+  private String blockletID;
+  private DataRefNode blockletNode;
+  private AbsoluteTableIdentifier absoluteIdentifier;
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletDeleteDeltaCacheLoader.class.getName());
+
+  public BlockletDeleteDeltaCacheLoader(String blockletID,
+                                        DataRefNode blockletNode, AbsoluteTableIdentifier absoluteIdentifier) {
+    this.blockletID = blockletID;
+    this.blockletNode = blockletNode;
+    this.absoluteIdentifier= absoluteIdentifier;
+  }
+
+  /**
+   * This method will load the delete delta cache based on blocklet id of particular block with
+   * the help of SegmentUpdateStatusManager.
+   */
+  public void loadDeleteDeltaFileDataToCache() {
+    SegmentUpdateStatusManager segmentUpdateStatusManager =
+        new SegmentUpdateStatusManager(absoluteIdentifier);
+    int[] deleteDeltaFileData = null;
+    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = null;
+    if (null == blockletNode.getDeleteDeltaDataCache()) {
+      try {
+        deleteDeltaFileData = segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
+        deleteDeltaDataCache = new BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData,
+            segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, null));
+      } catch (Exception e) {
+        LOGGER.debug("Unable to retrieve delete delta files");
+      }
+    } else {
+      deleteDeltaDataCache = blockletNode.getDeleteDeltaDataCache();
+      // if already cache is present then validate the cache using timestamp
+      String cacheTimeStamp = segmentUpdateStatusManager
+          .getTimestampForRefreshCache(blockletID, deleteDeltaDataCache.getCacheTimeStamp());
+      if (null != cacheTimeStamp) {
+        try {
+          deleteDeltaFileData =
+              segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
+          deleteDeltaDataCache = new BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData,
+              segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, cacheTimeStamp));
+        } catch (Exception e) {
+          LOGGER.debug("Unable to retrieve delete delta files");
+        }
+      }
+    }
+    blockletNode.setDeleteDeltaDataCache(deleteDeltaDataCache);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/DeleteDeltaCacheLoaderIntf.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/DeleteDeltaCacheLoaderIntf.java b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/DeleteDeltaCacheLoaderIntf.java
new file mode 100644
index 0000000..79a3c14
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/DeleteDeltaCacheLoaderIntf.java
@@ -0,0 +1,10 @@
+package org.apache.carbondata.common.iudprocessor.iuddata;
+
+/**
+ * Created by S71955 on 06-10-2016.
+ */
+public interface DeleteDeltaCacheLoaderIntf {
+
+  void loadDeleteDeltaFileDataToCache();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/DeleteDeltaDataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/DeleteDeltaDataUtil.java b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/DeleteDeltaDataUtil.java
new file mode 100644
index 0000000..d7cc020
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/DeleteDeltaDataUtil.java
@@ -0,0 +1,44 @@
+package org.apache.carbondata.common.iudprocessor.iuddata;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
+
+/**
+ * Created by S71955 on 06-10-2016.
+ */
+public class DeleteDeltaDataUtil {
+
+  private DeleteDeltaDataUtil() {
+
+  }
+
+  public static int transformDataToNumericType(String deleteDeltadata) {
+    return Integer.parseInt(
+        deleteDeltadata.substring(deleteDeltadata.lastIndexOf('.'), deleteDeltadata.length()));
+  }
+
+  /**
+   * This method will verify whether any segment is updated or not as part of
+   * delete/update query.
+   * @param segmentStatusManager
+   * @param tableIdentifier
+   * @return boolean
+   */
+  public static boolean isSegmentStatusUpdated(SegmentStatusManager segmentStatusManager,
+                                               AbsoluteTableIdentifier tableIdentifier, String segmentID) {
+    CarbonTable table = CarbonMetadata.getInstance()
+        .getCarbonTable(tableIdentifier.getCarbonTableIdentifier().getTableUniqueName());
+    LoadMetadataDetails[] loadMetadataDetails =
+        segmentStatusManager.readLoadMetadata(table.getMetaDataFilepath());
+    for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+      if (segmentID.equals(loadMetadataDetail.getLoadName()) && "TRUE"
+          .equalsIgnoreCase(loadMetadataDetail.getIsDeleted())) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
index 28131d6..0323d55 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.carbon.datastore;
 
 import java.io.IOException;
 
+import org.apache.carbondata.common.iudprocessor.cache.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
@@ -114,4 +115,14 @@ public interface DataRefNode {
    * @return measure data chunk
    */
   MeasureColumnDataChunk getMeasureChunk(FileHolder fileReader, int blockIndex) throws IOException;
+
+  /**
+   * @param deleteDeltaDataCache
+   */
+  void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache);
+
+  /**
+   * @return
+   */
+  BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache();
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java
index ffde93f..cacc902 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java
@@ -42,6 +42,7 @@ public class TableSegmentUniqueIdentifier {
   Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos;
 
   private String segmentId;
+  private  boolean isSegmentUpdated;
 
   /**
    * Constructor to initialize the class instance
@@ -101,10 +102,17 @@ public class TableSegmentUniqueIdentifier {
    */
   public String getUniqueTableSegmentIdentifier() {
     CarbonTableIdentifier carbonTableIdentifier =
-        absoluteTableIdentifier.getCarbonTableIdentifier();
-    return carbonTableIdentifier.getDatabaseName()
-        + CarbonCommonConstants.FILE_SEPARATOR + carbonTableIdentifier
-        .getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId;
+            absoluteTableIdentifier.getCarbonTableIdentifier();
+    return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
+            + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
+            + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId;
+  }
+  public void setIsSegmentUpdated(boolean isSegmentUpdated) {
+    this.isSegmentUpdated=isSegmentUpdated;
+  }
+
+  public boolean isSegmentUpdated() {
+    return isSegmentUpdated;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java
index cd278b5..6a0d31b 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java
@@ -21,9 +21,11 @@ package org.apache.carbondata.core.carbon.datastore.block;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
+import org.apache.carbondata.core.update.UpdateVO;
 
 /**
  * SegmentTaskIndexWrapper class holds the  taskIdToTableSegmentMap
@@ -42,8 +44,10 @@ public class SegmentTaskIndexWrapper implements Cacheable {
   /**
    * Table block meta size.
    */
-  protected long memorySize;
+  protected AtomicLong memorySize = new AtomicLong();
 
+  private Long refreshedTimeStamp;
+  private UpdateVO invalidTaskKey;
   public SegmentTaskIndexWrapper(
       Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskIdToTableSegmentMap) {
     this.taskIdToTableSegmentMap = taskIdToTableSegmentMap;
@@ -64,7 +68,7 @@ public class SegmentTaskIndexWrapper implements Cacheable {
    * @param memorySize
    */
   public void setMemorySize(long memorySize) {
-    this.memorySize = memorySize;
+    this.memorySize.set(memorySize);
   }
 
   /**
@@ -91,7 +95,7 @@ public class SegmentTaskIndexWrapper implements Cacheable {
    * @return
    */
   @Override public long getMemorySize() {
-    return memorySize;
+    return memorySize.get();
   }
 
   /**
@@ -118,4 +122,27 @@ public class SegmentTaskIndexWrapper implements Cacheable {
     }
   }
 
+  public Long getRefreshedTimeStamp() {
+    return refreshedTimeStamp;
+  }
+
+  public void setRefreshedTimeStamp(Long refreshedTimeStamp) {
+    this.refreshedTimeStamp = refreshedTimeStamp;
+  }
+
+  public void removeEntryFromCacheAndRefresh(String taskId) {
+    AbstractIndex blockEntry = this.getTaskIdToTableSegmentMap().remove(taskId);
+    if (null != blockEntry) {
+      memorySize.set(memorySize.get() - blockEntry.getMemorySize());
+    }
+  }
+
+  public void setLastUpdateVO(UpdateVO invalidTaskKey) {
+    this.invalidTaskKey = invalidTaskKey;
+  }
+
+  public UpdateVO getInvalidTaskKey() {
+    return invalidTaskKey;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
index 802a116..2c377c3 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -19,10 +19,13 @@
 package org.apache.carbondata.core.carbon.datastore.block;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 
 /**
@@ -64,6 +67,12 @@ public class TableBlockInfo implements Distributable, Serializable {
    */
   private BlockletInfos blockletInfos = new BlockletInfos();
 
+  /**
+   * map of block location and storage id
+   */
+  private Map<String, String> blockStorageIdMap =
+          new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
       long blockLength, ColumnarFormatVersion version) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
@@ -86,13 +95,27 @@ public class TableBlockInfo implements Distributable, Serializable {
    */
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
       long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version) {
-    this.filePath = FileFactory.getUpdatedFilePath(filePath);
-    this.blockOffset = blockOffset;
-    this.segmentId = segmentId;
-    this.locations = locations;
-    this.blockLength = blockLength;
+    this(filePath, blockOffset, segmentId, locations, blockLength, version);
     this.blockletInfos = blockletInfos;
-    this.version = version;
+  }
+
+  /**
+   * constructor to initialize the TableBlockInfo with blockStorageIdMap
+   *
+   * @param filePath
+   * @param blockOffset
+   * @param segmentId
+   * @param locations
+   * @param blockLength
+   * @param blockletInfos
+   * @param version
+   * @param blockStorageIdMap
+   */
+  public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
+                        long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version,
+                        Map<String, String> blockStorageIdMap) {
+    this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version);
+    this.blockStorageIdMap = blockStorageIdMap;
   }
 
   /**
@@ -267,4 +290,22 @@ public class TableBlockInfo implements Distributable, Serializable {
   public void setVersion(ColumnarFormatVersion version) {
     this.version = version;
   }
+
+  /**
+   * returns the storage location vs storage id map
+   *
+   * @return
+   */
+  public Map<String, String> getBlockStorageIdMap() {
+    return this.blockStorageIdMap;
+  }
+
+  /**
+   * method to storage location vs storage id map
+   *
+   * @param blockStorageIdMap
+   */
+  public void setBlockStorageIdMap(Map<String, String> blockStorageIdMap) {
+    this.blockStorageIdMap = blockStorageIdMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java
index d889bce..e8b7713 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java
@@ -26,12 +26,21 @@ import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChun
 import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 
+import org.apache.carbondata.common.iudprocessor.cache.BlockletLevelDeleteDeltaDataCache;
+
 /**
  * Non leaf node abstract class
  */
 public abstract class AbstractBTreeLeafNode implements BTreeNode {
 
   /**
+   * Below method will be used to load the data block
+   *
+   * @param blockInfo block detail
+   */
+  protected BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache;
+
+  /**
    * number of keys in a btree
    */
   protected int numberOfKeys;
@@ -219,4 +228,17 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
     // measure chunks
     return null;
   }
+  /**
+   * @return the segmentProperties
+   */
+  public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) {
+
+    this.deleteDeltaDataCache = deleteDeltaDataCache;
+  }
+  /**
+   * @return the segmentProperties
+   */
+  public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
+    return deleteDeltaDataCache;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java
index 5be4ed1..e805919 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java
@@ -18,9 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore.impl.btree;
 
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.carbondata.common.iudprocessor.cache.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.carbon.datastore.DataRefNode;
 import org.apache.carbondata.core.carbon.datastore.IndexKey;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
@@ -28,6 +26,9 @@ import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * No leaf node of a b+tree class which will keep the matadata(start key) of the
  * leaf node
@@ -35,6 +36,13 @@ import org.apache.carbondata.core.datastorage.store.FileHolder;
 public class BTreeNonLeafNode implements BTreeNode {
 
   /**
+   * Below method will be used to load the data block
+   *
+   * @param blockInfo block detail
+   */
+  protected BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache;
+
+  /**
    * Child nodes
    */
   private BTreeNode[] children;
@@ -224,4 +232,18 @@ public class BTreeNonLeafNode implements BTreeNode {
     // node will be used only for searching the leaf node
     throw new UnsupportedOperationException("Unsupported operation");
   }
+
+  /**
+   * @return the segmentProperties
+   */
+  public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) {
+
+    this.deleteDeltaDataCache = deleteDeltaDataCache;
+  }
+  /**
+   * @return the segmentProperties
+   */
+  public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
+    return deleteDeltaDataCache;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockBTreeLeafNode.java
index d76ce4c..494a2a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockBTreeLeafNode.java
@@ -45,7 +45,7 @@ public class BlockBTreeLeafNode extends AbstractBTreeLeafNode {
     BlockletMinMaxIndex minMaxIndex = footer.getBlockletIndex().getMinMaxIndex();
     maxKeyOfColumns = minMaxIndex.getMaxValues();
     minKeyOfColumns = minMaxIndex.getMinValues();
-    numberOfKeys = 1;
+    numberOfKeys = (int)footer.getNumberOfRows();
     this.nodeNumber = nodeNumber;
     this.blockInfo = footer.getBlockInfo();
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1e4f821..6950e54 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.DataRefNode;
 import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
 import org.apache.carbondata.core.carbon.datastore.IndexKey;
-import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
+import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
 import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
@@ -48,10 +48,15 @@ import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsConstant
 import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.update.SegmentUpdateDetails;
+import org.apache.carbondata.core.update.UpdateVO;
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+import org.apache.carbondata.hadoop.util.BlockLevelTraverser;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.hadoop.util.SchemaReader;
@@ -66,15 +71,7 @@ import org.apache.carbondata.scan.model.QueryModel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -282,12 +279,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
-        getAbsoluteTableIdentifier(job.getConfiguration());
+            getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+    SegmentUpdateStatusManager updateStatusManager =
+            new SegmentUpdateStatusManager(absoluteTableIdentifier);
     //for each segment fetch blocks matching filter in Driver BTree
     for (String segmentNo : getSegmentsToAccess(job)) {
       List<DataRefNode> dataRefNodes =
           getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier,
-              filterResolver, segmentNo, cacheClient);
+              filterResolver, segmentNo, updateStatusManager);
       for (DataRefNode dataRefNode : dataRefNodes) {
         BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
         TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
@@ -319,11 +318,12 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   private List<DataRefNode> getDataBlocksOfSegment(JobContext job,
       FilterExpressionProcessor filterExpressionProcessor,
       AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
-      String segmentId, CacheClient cacheClient) throws IOException {
+      String segmentId, CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager)
+      throws IndexBuilderException, IOException, CarbonUtilException {
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
     Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
-        getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, cacheClient);
+        getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, updateStatusManager);
 
     List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
 
@@ -358,35 +358,131 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * @return list of table block
    * @throws IOException
    */
-  private List<TableBlockInfo> getTableBlockInfo(JobContext job, String segmentId)
+  private List<TableBlockInfo> getTableBlockInfo(JobContext job,
+                                                 TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier,
+                                                 Set<String> taskKeys,
+                                                 List<String> updatedTaskList,
+                                                 UpdateVO updateDetails,
+                                                 SegmentUpdateStatusManager updateStatusManager,
+                                                 String segmentId)
       throws IOException {
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
 
     // get file location of all files of given segment
     JobContext newJob =
         new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID());
-    newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, segmentId + "");
+    newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS,
+            tableSegmentUniqueIdentifier.getSegmentId() + "");
 
     // identify table blocks
     for (InputSplit inputSplit : getSplitsInternal(newJob)) {
       CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
-      BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
-          carbonInputSplit.getNumberOfBlocklets());
-      tableBlockInfoList.add(
-          new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
-              segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
-              blockletInfos, carbonInputSplit.getVersion()));
+      // if blockname and update block name is same then cmpare  its time stamp with
+      // tableSegmentUniqueIdentifiertimestamp if time stamp is greater
+      // then add as TableInfo object.
+      if (isValidBlockBasedOnUpdateDetails(taskKeys, carbonInputSplit, updateDetails,
+              updateStatusManager, segmentId)) {
+        BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
+                carbonInputSplit.getNumberOfBlocklets());
+        tableBlockInfoList.add(
+                new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+                        tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(),
+                        carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(),
+                        carbonInputSplit.getBlockStorageIdMap()));
+      }
     }
     return tableBlockInfoList;
   }
 
+  public boolean isValidBlockBasedOnUpdateDetails(Set<String> taskKeys,
+                                                  CarbonInputSplit carbonInputSplit, UpdateVO updateDetails,
+                                                  SegmentUpdateStatusManager updateStatusManager, String segmentId) {
+    String taskID = null;
+    if (null != carbonInputSplit) {
+
+      if(!updateStatusManager.isBlockValid(segmentId,carbonInputSplit.getPath().getName()))
+      {
+        return false;
+      }
+
+      if (null == taskKeys) {
+        return true;
+      }
+
+      taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+      String blockTimestamp = carbonInputSplit.getPath().getName()
+              .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
+                      carbonInputSplit.getPath().getName().lastIndexOf('.'));
+      if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
+              && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
+        if (!taskKeys.contains(taskID)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job,
+                                                              AbsoluteTableIdentifier absoluteTableIdentifier,
+                                                              String segmentId,
+                                                              SegmentUpdateStatusManager updateStatusManager)
+          throws IOException, IndexBuilderException, CarbonUtilException {
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+    Map<String, AbstractIndex> segmentIndexMap = null;
+    Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> cache = CacheProvider.getInstance()
+            .createCache(CacheType.DRIVER_BTREE, absoluteTableIdentifier.getStorePath());
+    List<String> updatedTaskList = null;
+    boolean isSegmentUpdated = false;
+    Set<String> taskKeys = null;
+
+    TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+            new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+    SegmentStatusManager statusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    segmentTaskIndexWrapper = cache.getIfPresent(tableSegmentUniqueIdentifier);
+    UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
+    if (null != segmentTaskIndexWrapper) {
+      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+      if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
+        taskKeys = segmentIndexMap.keySet();
+        isSegmentUpdated = true;
+        updatedTaskList =
+                statusManager.getUpdatedTasksDetailsForSegment(segmentId, updateStatusManager);
+      }
+    }
+
+    // if segment tree is not loaded, load the segment tree
+    if (segmentIndexMap == null || isSegmentUpdated) {
+      // if the segment is updated only the updated blocks TableInfo instance has to be
+      // retrieved. the same will be filtered based on taskKeys , if the task is same
+      // for the block then dont add it since already its btree is loaded.
+      List<TableBlockInfo> tableBlockInfoList =
+              getTableBlockInfo(job, tableSegmentUniqueIdentifier, taskKeys, updatedTaskList,
+                      updateStatusManager.getInvalidTimestampRange(segmentId), updateStatusManager,
+                      segmentId);
+      if (!tableBlockInfoList.isEmpty()) {
+        // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
+        Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
+        segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+        // get Btree blocks for given segment
+        tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
+        tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
+        segmentTaskIndexWrapper = cache.get(tableSegmentUniqueIdentifier);
+        segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+      }
+    }
+    return segmentIndexMap;
+  }
+
   /**
-   * It returns index for each task file.
+   *
    * @param job
    * @param absoluteTableIdentifier
-   * @param segmentId
    * @return
    * @throws IOException
+   * @throws CarbonUtilException
+   * @throws IndexBuilderException
+   * @throws KeyGenException
    */
   private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
       JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId,
@@ -406,16 +502,47 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job, segmentId);
       // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
 
-      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
-      segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+    Map<String, Long> blockRowCountMapping =
+            new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
       // get Btree blocks for given segment
       tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
       segmentTaskIndexWrapper =
           cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
       segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+    for (String eachValidSeg : validAndInvalidSegments.getValidSegments()) {
+
+      long countOfBlocksInSeg = 0;
+
+      Map<String, AbstractIndex> taskAbstractIndexMap =
+              getSegmentAbstractIndexs(job, absoluteTableIdentifier, eachValidSeg, updateStatusManager);
+
+      for (Map.Entry<String, AbstractIndex> taskMap : taskAbstractIndexMap.entrySet()) {
+
+        AbstractIndex taskAbstractIndex = taskMap.getValue();
+
+        countOfBlocksInSeg += new BlockLevelTraverser()
+                .getBlockRowMapping(taskAbstractIndex, blockRowCountMapping, eachValidSeg,
+                        updateStatusManager);
+      }
+
+      segmentAndBlockCountMapping.put(eachValidSeg, countOfBlocksInSeg);
+
     }
-    return segmentIndexMap;
+
+    return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
+
+  }
+
+
+  private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
+                                  UpdateVO updateDetails) {
+    if (null != updateDetails.getLatestUpdateTimestamp()
+            && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
+            .getRefreshedTimeStamp()) {
+      return true;
+    }
+    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index a4acd9c..a7aa0a1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -23,13 +23,16 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.internal.index.Block;
 
@@ -61,6 +64,12 @@ public class CarbonInputSplit extends FileSplit
 
   private ColumnarFormatVersion version;
 
+  /**
+   * map of blocklocation and storage id
+   */
+  private Map<String, String> blockStorageIdMap =
+          new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
@@ -86,6 +95,23 @@ public class CarbonInputSplit extends FileSplit
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
+  /**
+   * Constructor to initialize the CarbonInputSplit with blockStorageIdMap
+   * @param segmentId
+   * @param path
+   * @param start
+   * @param length
+   * @param locations
+   * @param numberOfBlocklets
+   * @param version
+   * @param blockStorageIdMap
+   */
+  public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
+                          int numberOfBlocklets, ColumnarFormatVersion version, Map<String, String> blockStorageIdMap) {
+    this(segmentId, path, start, length, locations, numberOfBlocklets, version);
+    this.blockStorageIdMap = blockStorageIdMap;
+  }
+
   public static CarbonInputSplit from(String segmentId, FileSplit split,
       ColumnarFormatVersion version)
       throws IOException {
@@ -235,4 +261,12 @@ public class CarbonInputSplit extends FileSplit
   @Override public boolean fullScan() {
     return true;
   }
+
+  /**
+   * returns map of blocklocation and storage id
+   * @return
+   */
+  public Map<String, String> getBlockStorageIdMap() {
+    return blockStorageIdMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
new file mode 100644
index 0000000..d81aca8
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
@@ -0,0 +1,69 @@
+package org.apache.carbondata.hadoop.util;
+
+import java.util.Map;
+
+import org.apache.carbondata.core.carbon.datastore.DataRefNode;
+import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
+import org.apache.carbondata.core.carbon.datastore.IndexKey;
+import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.carbon.datastore.impl.btree.BlockBTreeLeafNode;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.update.CarbonUpdateUtil;
+import org.apache.carbondata.core.update.SegmentUpdateDetails;
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
+import org.apache.carbondata.scan.filter.FilterUtil;
+
+
+/**
+ *
+ */
+public class BlockLevelTraverser {
+
+  /**
+   *
+   * @param abstractIndex
+   * @param blockRowMap
+   * @param segId
+   * @param updateStatusManager
+   * @throws KeyGenException
+   */
+  public long getBlockRowMapping(AbstractIndex abstractIndex, Map<String, Long> blockRowMap,
+      String segId, SegmentUpdateStatusManager updateStatusManager)
+      throws KeyGenException {
+
+    IndexKey  searchStartKey =
+          FilterUtil.prepareDefaultStartIndexKey(abstractIndex.getSegmentProperties());
+
+    DataRefNodeFinder blockFinder = new BTreeDataRefNodeFinder(
+        abstractIndex.getSegmentProperties().getEachDimColumnValueSize());
+    DataRefNode currentBlock =
+        blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), searchStartKey);
+
+    long count = 0;
+
+    while (currentBlock != null) {
+
+      String blockName = ((BlockBTreeLeafNode) currentBlock).getTableBlockInfo().getFilePath();
+      blockName = CarbonTablePath.getCarbonDataFileName(blockName);
+      blockName = blockName + CarbonTablePath.getCarbonDataExtension();
+
+      long rowCount = currentBlock.nodeSize();
+
+      String key = CarbonUpdateUtil.getSegmentBlockNameKey(segId, blockName);
+
+      // if block is invalid then dont add the count
+      SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
+
+      if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+        blockRowMap.put(key, rowCount);
+        count++;
+      }
+      currentBlock = currentBlock.getNextDataRefNode();
+    }
+
+    return count;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index e983f86..4b1a09b 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -59,9 +59,9 @@ import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWrit
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
+import org.apache.carbondata.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.fileoperations.FileWriteOperation;
 import org.apache.carbondata.processing.api.dataloader.DataLoadModel;
 import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
 import org.apache.carbondata.processing.constants.TableOptionConstant;
@@ -407,10 +407,10 @@ public class StoreCreator {
   public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
       String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
     LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-    loadMetadataDetails.setTimestamp(readCurrentTime());
+   // loadMetadataDetails.setTimestamp(readCurrentTime());
     loadMetadataDetails.setLoadStatus("SUCCESS");
     loadMetadataDetails.setLoadName(String.valueOf(0));
-    loadMetadataDetails.setLoadStartTime(readCurrentTime());
+    loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
     listOfLoadFolderDetails.add(loadMetadataDetails);
 
     String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
new file mode 100644
index 0000000..f4c8cd1
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
+
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
+
+case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
+
+object CarbonSparkUtil {
+
+  def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
+    val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+        .asScala.map(x => x.getColName) // wf : may be problem
+    val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+        .asScala.map(x => x.getColName)
+    val dictionary =
+      carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
+        (f.getColName.toLowerCase,
+            f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+                !CarbonUtil.hasComplexDataType(f.getDataType))
+      }
+    CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
new file mode 100644
index 0000000..c55c807
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.hadoop.CarbonInputFormat
+
+
+/**
+ * All the utility functions for carbon plan creation
+ */
+object QueryPlanUtil {
+
+  /**
+   * createCarbonInputFormat from query model
+   */
+  def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
+  (CarbonInputFormat[Array[Object]], Job) = {
+    val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+    val jobConf: JobConf = new JobConf(new Configuration)
+    val job: Job = new Job(jobConf)
+    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
+    (carbonInputFormat, job)
+  }
+
+  def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
+      conf: Configuration) : CarbonInputFormat[V] = {
+    val carbonInputFormat = new CarbonInputFormat[V]()
+    val job: Job = new Job(conf)
+    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
+    carbonInputFormat
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 6cbc517..427e982 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
@@ -32,7 +34,6 @@ import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.spark.{CarbonOption, _}
 import org.apache.carbondata.spark.merger.TableMeta
 
@@ -301,8 +302,8 @@ case class CarbonRelation(
   private var sizeInBytesLocalValue = 0L
 
   def sizeInBytes: Long = {
-    val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
-      tableMeta.carbonTable.getAbsoluteTableIdentifier)
+    val tableStatusNewLastUpdatedTime = new SegmentStatusManager(
+      tableMeta.carbonTable.getAbsoluteTableIdentifier).getTableStatusLastModifiedTime
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
       val tablePath = CarbonStorePath.getCarbonTablePath(
         tableMeta.storePath,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 73c44d6..d33d3df 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -335,7 +335,12 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
           case _ => // Unsupport features
         }
 
-        val altertablemodel = AlterTableModel(dbName, tableName, compactionType, alterSql)
+        val altertablemodel = AlterTableModel(dbName,
+          tableName,
+          None,
+          compactionType,
+          Some(System.currentTimeMillis()),
+          alterSql)
         AlterTableCompaction(altertablemodel)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/427b202c/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 9cdbf86..cf66bda 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.hive
 import java.io._
 import java.util.UUID
 
+import org.apache.carbondata.locks.ZookeeperInit
+
 import scala.Array.canBuildFrom
 import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
@@ -47,7 +49,6 @@ import org.apache.carbondata.core.reader.ThriftReader
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.lcm.locks.ZookeeperInit
 import org.apache.carbondata.spark.merger.TableMeta
 
 case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])


Mime
View raw message