carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3337][CARBONDATA-3306] Distributed index server
Date Mon, 20 May 2019 05:48:14 GMT
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 24fe230  [CARBONDATA-3337][CARBONDATA-3306] Distributed index server
24fe230 is described below

commit 24fe230a90d3b2b4e6ca9bbb4512300fa6ba6a75
Author: kunal642 <kunalkapoor642@gmail.com>
AuthorDate: Tue Feb 5 16:44:44 2019 +0530

    [CARBONDATA-3337][CARBONDATA-3306] Distributed index server
    
    Implement Distributed Index Server which will cache and prune the datamaps using executors.
    
    Implemented Hadoop RPC framework.
    Implemented DistributedPruneRDD which will prune and cache the datamaps.
    Implemented DistributedShowCacheRDD.
    
    This closes #3177
---
 bin/start-indexserver.sh                           |  56 +++++
 bin/stop-indexserver.sh                            |  26 +++
 .../carbondata/core/cache/CarbonLRUCache.java      |   8 +
 .../core/constants/CarbonCommonConstants.java      |  34 +++
 .../carbondata/core/datamap/DataMapChooser.java    |   2 +-
 .../apache/carbondata/core/datamap/DataMapJob.java |   4 +-
 .../core/datamap/DataMapStoreManager.java          | 124 +++++++---
 .../carbondata/core/datamap/DataMapUtil.java       | 175 ++++++++++----
 .../core/datamap/DistributableDataMapFormat.java   | 254 ++++++++++++++++++---
 .../apache/carbondata/core/datamap/Segment.java    |  79 ++++++-
 .../carbondata/core/datamap/TableDataMap.java      |  47 +---
 .../core/datamap/dev/DataMapFactory.java           |  13 +-
 .../carbondata/core/indexstore/Blocklet.java       |  24 +-
 .../core/indexstore/BlockletDetailsFetcher.java    |   2 +
 .../core/indexstore/ExtendedBlocklet.java          |  53 +++++
 .../carbondata/core/indexstore/PartitionSpec.java  |  52 ++++-
 .../TableBlockIndexUniqueIdentifierWrapper.java    |   7 +-
 .../BlockletDataMapDistributable.java              |  14 ++
 .../blockletindex/BlockletDataMapFactory.java      | 140 ++++++++----
 .../carbondata/core/metadata/SegmentFileStore.java |   5 +-
 .../schema/table/AggregationDataMapSchema.java     |   5 +-
 .../core/metadata/schema/table/CarbonTable.java    |  16 +-
 .../carbondata/core/util/BlockletDataMapUtil.java  |   2 +-
 .../carbondata/core/util/CarbonProperties.java     |  52 +++++
 .../apache/carbondata/core/util/SessionParams.java |   1 +
 .../apache/carbondata/hadoop/CarbonInputSplit.java |   4 +
 .../bloom/BloomCoarseGrainDataMapFactory.java      |  18 +-
 .../datamap/lucene/LuceneDataMapFactoryBase.java   |  17 +-
 dev/findbugs-exclude.xml                           |   8 +
 .../hadoop/api/CarbonFileInputFormat.java          |   4 +-
 .../carbondata/hadoop/api/CarbonInputFormat.java   | 212 +++++++++++------
 .../hadoop/api/CarbonTableInputFormat.java         | 136 ++++++-----
 .../hadoop/util/CarbonInputFormatUtil.java         |   2 +-
 ...ryWithColumnMetCacheAndCacheLevelProperty.scala |   4 +-
 .../testsuite/datamap/CGDataMapTestCase.scala      |   8 -
 .../testsuite/datamap/DataMapWriterSuite.scala     |   2 -
 .../testsuite/datamap/FGDataMapTestCase.scala      |   8 +-
 .../testsuite/datamap/TestDataMapStatus.scala      |   2 -
 .../iud/TestInsertAndOtherCommandConcurrent.scala  |   2 -
 .../carbondata/spark/util/CarbonScalaUtil.scala    |  11 +-
 .../apache/spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../carbondata/datamap/IndexDataMapProvider.java   |   2 +-
 .../carbondata/indexserver/DataMapJobs.scala       |  80 +++++++
 .../indexserver/DistributedPruneRDD.scala}         |  88 ++++---
 .../indexserver/DistributedRDDUtils.scala          | 164 +++++++++++++
 .../indexserver/DistributedShowCacheRDD.scala      |  58 +++++
 .../carbondata/indexserver/IndexServer.scala       | 178 +++++++++++++++
 .../indexserver/InvalidateSegmentCacheRDD.scala    |  54 +++++
 .../spark/rdd/CarbonDataRDDFactory.scala           |   7 +
 .../org/apache/spark/sql/CarbonCountStar.scala     |   1 -
 .../sql/events/MergeBloomIndexEventListener.scala  |   5 -
 .../sql/execution/command/cache/CacheUtil.scala    |  15 +-
 .../command/cache/CarbonDropCacheCommand.scala     |  21 +-
 .../command/cache/CarbonShowCacheCommand.scala     |  56 ++++-
 .../mutation/CarbonProjectForDeleteCommand.scala   |   5 +-
 .../mutation/CarbonProjectForUpdateCommand.scala   |   4 +
 .../command/mutation/DeleteExecution.scala         |  20 +-
 .../execution/command/CarbonHiveCommands.scala     |   5 +
 58 files changed, 1917 insertions(+), 481 deletions(-)

diff --git a/bin/start-indexserver.sh b/bin/start-indexserver.sh
new file mode 100755
index 0000000..e34516c
--- /dev/null
+++ b/bin/start-indexserver.sh
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Shell script for starting the Distributed Index Server
+
+# Enter posix mode for bash
+set -o posix
+
+if [ -z "${SPARK_HOME}" ]; then
+  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+fi
+
+# NOTE: This exact class name is matched downstream by SparkSubmit.
+# Any changes need to be reflected there.
+CLASS="org.apache.carbondata.indexserver.IndexServer"
+
+function usage {
+  echo "Usage: ./sbin/start-indexserver [options] [index server options]"
+  pattern="usage"
+  pattern+="\|Spark assembly has been built with Hive"
+  pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
+  pattern+="\|Spark Command: "
+  pattern+="\|======="
+  pattern+="\|--help"
+
+  "${SPARK_HOME}"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+  echo
+  echo "Thrift server options:"
+  "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
+}
+
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
+  usage
+  exit 0
+fi
+
+export SUBMIT_USAGE_FUNCTION=usage
+
+exec "${SPARK_HOME}"/sbin/spark-daemon.sh submit $CLASS 1 --name "DistributedIndexServer" "$@"
diff --git a/bin/stop-indexserver.sh b/bin/stop-indexserver.sh
new file mode 100755
index 0000000..e024090
--- /dev/null
+++ b/bin/stop-indexserver.sh
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Stops the Distributed Index Server on the machine this script is executed on.
+
+if [ -z "${SPARK_HOME}" ]; then
+  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+fi
+
+"${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.carbondata.indexserver.IndexServer 1
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
index 3371d0d..2e2e368 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
@@ -217,6 +217,7 @@ public final class CarbonLRUCache {
     } else {
       synchronized (lruCacheMap) {
         addEntryToLRUCacheMap(columnIdentifier, cacheInfo);
+        currentSize = currentSize + requiredSize;
       }
       columnKeyAddedSuccessfully = true;
     }
@@ -358,4 +359,11 @@ public final class CarbonLRUCache {
     long mSizeMB = Runtime.getRuntime().maxMemory() / BYTE_CONVERSION_CONSTANT;
     return mSizeMB * CarbonCommonConstants.CARBON_LRU_CACHE_PERCENT_OVER_MAX_SIZE;
   }
+
+  /**
+   * @return current size of the cache in memory.
+   */
+  public long getCurrentSize() {
+    return currentSize;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 43544cb..8ed9c1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2129,4 +2129,38 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL = "512";
 
+  /**
+   * The IP on which Index Server will be started.
+   */
+  @CarbonProperty
+  public static final String CARBON_INDEX_SERVER_IP = "carbon.index.server.ip";
+
+  /**
+   * The Port to be used to start Index Server.
+   */
+  @CarbonProperty
+  public static final String CARBON_INDEX_SERVER_PORT = "carbon.index.server.port";
+
+  /**
+   * Whether to use index server for caching and pruning or not.
+   * This property can be used for
+   * 1. the whole application(carbon.properties).
+   * 2. the whole session(set carbon.enable.index.server)
+   * 3. a specific table for one session (set carbon.enable.index.server.<dbName>.<tableName>)
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String CARBON_ENABLE_INDEX_SERVER = "carbon.enable.index.server";
+
+  /**
+   * Property is used to enable/disable fallback for indexserver.
+   * Used for testing purposes only.
+   */
+  public static final String CARBON_DISABLE_INDEX_SERVER_FALLBACK =
+      "carbon.disable.index.server.fallback";
+
+  public static final String CARBON_INDEX_SERVER_WORKER_THREADS =
+      "carbon.index.server.max.worker.threads";
+
+  public static final int CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT =
+      500;
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index 3b6537c..239401e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -120,7 +120,7 @@ public class DataMapChooser {
     return chooseDataMap(DataMapLevel.CG, resolverIntf);
   }
 
-  private DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf resolverIntf) {
+  DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf resolverIntf) {
     if (resolverIntf != null) {
       Expression expression = resolverIntf.getFilterExpression();
       List<TableDataMap> datamaps = level == DataMapLevel.CG ? cgDataMaps : fgDataMaps;
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
index 57a739d..9eafe7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
@@ -22,7 +22,6 @@ import java.util.List;
 import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
@@ -34,7 +33,6 @@ public interface DataMapJob extends Serializable {
 
   void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletDataMapIndexWrapper> format);
 
-  List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
-      FilterResolverIntf filter);
+  List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat);
 
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index a797b11..81b1fb2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -454,10 +454,11 @@ public final class DataMapStoreManager {
 
   /**
    * Clear the invalid segments from all the datamaps of the table
-   * @param carbonTable
-   * @param segments
+   *
+   * @param carbonTable table for which the operation has to be performed.
+   * @param segments segments which have to be cleared from cache.
    */
-  public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments)
+  public void clearInvalidSegments(CarbonTable carbonTable, List<String> segments)
       throws IOException {
     getDefaultDataMap(carbonTable).clear(segments);
     List<TableDataMap> allDataMap = getAllDataMap(carbonTable);
@@ -467,6 +468,30 @@ public final class DataMapStoreManager {
 
   }
 
+  public List<String> getSegmentsToBeRefreshed(CarbonTable carbonTable,
+      SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
+      throws IOException {
+    List<String> toBeCleanedSegments = new ArrayList<>();
+    for (Segment filteredSegment : filteredSegmentToAccess) {
+      boolean refreshNeeded = getTableSegmentRefresher(carbonTable).isRefreshNeeded(filteredSegment,
+          updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
+      if (refreshNeeded) {
+        toBeCleanedSegments.add(filteredSegment.getSegmentNo());
+      }
+    }
+    return toBeCleanedSegments;
+  }
+
+  public void refreshSegmentCacheIfRequired(CarbonTable carbonTable,
+      SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
+      throws IOException {
+    List<String> toBeCleanedSegments =
+        getSegmentsToBeRefreshed(carbonTable, updateStatusManager, filteredSegmentToAccess);
+    if (toBeCleanedSegments.size() > 0) {
+      clearInvalidSegments(carbonTable, toBeCleanedSegments);
+    }
+  }
+
   /**
    * Clear the datamap/datamaps of a table from memory
    *
@@ -483,29 +508,44 @@ public final class DataMapStoreManager {
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean launchJob) {
     String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
-    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
-    if (tableIndices == null) {
-      String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath());
-      if (keyUsingTablePath != null) {
-        tableUniqueName = keyUsingTablePath;
-        tableIndices = allDataMaps.get(tableUniqueName);
-      }
-    }
-    if (launchJob && tableIndices != null) {
-      CarbonTable carbonTable = getCarbonTable(identifier);
+    CarbonTable carbonTable = getCarbonTable(identifier);
+    if (launchJob && CarbonProperties.getInstance()
+        .isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) {
       if (null != carbonTable) {
         try {
-          DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
+          DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME);
         } catch (IOException e) {
           LOGGER.error("clear dataMap job failed", e);
           // ignoring the exception
         }
       }
+    } else {
+      List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+      if (tableIndices == null) {
+        String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath());
+        if (keyUsingTablePath != null) {
+          tableUniqueName = keyUsingTablePath;
+        }
+      }
+      if (launchJob && null != carbonTable) {
+        try {
+          DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.EMBEDDED_JOB_NAME);
+        } catch (IOException e) {
+          LOGGER.error("clear dataMap job failed", e);
+          // ignoring the exception
+        }
+      }
+      // remove carbon table from meta cache if launchJob is false as this would be called in
+      // executor side.
+      if (!launchJob) {
+        CarbonMetadata.getInstance()
+            .removeTable(identifier.getDatabaseName(), identifier.getTableName());
+      }
+      segmentRefreshMap.remove(identifier.uniqueName());
+      clearDataMaps(tableUniqueName);
+      allDataMaps.remove(tableUniqueName);
+      tablePathMap.remove(tableUniqueName);
     }
-    segmentRefreshMap.remove(identifier.uniqueName());
-    clearDataMaps(tableUniqueName);
-    allDataMaps.remove(tableUniqueName);
-    tablePathMap.remove(tableUniqueName);
   }
 
   /**
@@ -554,29 +594,41 @@ public final class DataMapStoreManager {
    *
    * @param identifier Table identifier
    */
-  public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+  public void deleteDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
     CarbonTable carbonTable = getCarbonTable(identifier);
     String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
-    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
-    if (tableIndices != null) {
-      int i = 0;
-      for (TableDataMap tableDataMap : tableIndices) {
-        if (carbonTable != null && tableDataMap != null && dataMapName
-            .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
-          try {
-            DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
-            tableDataMap.clear();
-          } catch (IOException e) {
-            LOGGER.error("clear dataMap job failed", e);
-            // ignoring the exception
+    if (CarbonProperties.getInstance()
+        .isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) {
+      try {
+        DataMapUtil
+            .executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME, dataMapName);
+      } catch (IOException e) {
+        LOGGER.error("clear dataMap job failed", e);
+        // ignoring the exception
+      }
+    } else {
+      List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+      if (tableIndices != null) {
+        int i = 0;
+        for (TableDataMap tableDataMap : tableIndices) {
+          if (carbonTable != null && tableDataMap != null && dataMapName
+              .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
+            try {
+              DataMapUtil
+                  .executeClearDataMapJob(carbonTable, DataMapUtil.EMBEDDED_JOB_NAME, dataMapName);
+              tableDataMap.clear();
+            } catch (IOException e) {
+              LOGGER.error("clear dataMap job failed", e);
+              // ignoring the exception
+            }
+            tableDataMap.deleteDatamapData();
+            tableIndices.remove(i);
+            break;
           }
-          tableDataMap.deleteDatamapData();
-          tableIndices.remove(i);
-          break;
+          i++;
         }
-        i++;
+        allDataMaps.put(tableUniqueName, tableIndices);
       }
-      allDataMaps.put(tableUniqueName, tableIndices);
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index bea1cca..95c69c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -18,10 +18,14 @@
 package org.apache.carbondata.core.datamap;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -30,6 +34,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
@@ -38,6 +43,12 @@ public class DataMapUtil {
 
   private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
 
+  public static final String EMBEDDED_JOB_NAME =
+      "org.apache.carbondata.indexserver.EmbeddedDataMapJob";
+
+  public static final String DISTRIBUTED_JOB_NAME =
+      "org.apache.carbondata.indexserver.DistributedDataMapJob";
+
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(DataMapUtil.class.getName());
 
@@ -91,43 +102,110 @@ public class DataMapUtil {
    * @param carbonTable
    * @throws IOException
    */
-  public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable)
+  private static void executeClearDataMapJob(DataMapJob dataMapJob,
+      CarbonTable carbonTable, String dataMapToClear) throws IOException {
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
+            getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration());
+    List<String> invalidSegment = new ArrayList<>();
+    for (Segment segment : validAndInvalidSegmentsInfo.getInvalidSegments()) {
+      invalidSegment.add(segment.getSegmentNo());
+    }
+    DistributableDataMapFormat dataMapFormat = new DistributableDataMapFormat(carbonTable,
+        validAndInvalidSegmentsInfo.getValidSegments(), invalidSegment, true,
+        dataMapToClear);
+    dataMapJob.execute(dataMapFormat);
+  }
+
+  public static void executeClearDataMapJob(CarbonTable carbonTable, String jobClassName)
       throws IOException {
-    String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
-    DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName);
+    executeClearDataMapJob(carbonTable, jobClassName, "");
+  }
+
+  static void executeClearDataMapJob(CarbonTable carbonTable, String jobClassName,
+      String dataMapToClear) throws IOException {
+    DataMapJob dataMapJob = (DataMapJob) createDataMapJob(jobClassName);
     if (dataMapJob == null) {
       return;
     }
-    String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
-    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
-        getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration());
-    List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments();
-    List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
-    DataMapExprWrapper dataMapExprWrapper = null;
-    if (DataMapStoreManager.getInstance().getAllDataMap(carbonTable).size() > 0) {
-      DataMapChooser dataMapChooser = new DataMapChooser(carbonTable);
-      dataMapExprWrapper = dataMapChooser.getAllDataMapsForClear(carbonTable);
-    } else {
-      return;
+    executeClearDataMapJob(dataMapJob, carbonTable, dataMapToClear);
+  }
+
+  public static DataMapJob getEmbeddedJob() {
+    DataMapJob dataMapJob = (DataMapJob) DataMapUtil.createDataMapJob(EMBEDDED_JOB_NAME);
+    if (dataMapJob == null) {
+      throw new ExceptionInInitializerError("Unable to create EmbeddedDataMapJob");
     }
-    DistributableDataMapFormat dataMapFormat =
-        createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments, null,
-            className, true);
-    dataMapJob.execute(dataMapFormat, null);
+    return dataMapJob;
   }
 
-  private static DistributableDataMapFormat createDataMapJob(CarbonTable carbonTable,
-      DataMapExprWrapper dataMapExprWrapper, List<Segment> validsegments,
-      List<Segment> invalidSegments, List<PartitionSpec> partitionsToPrune, String clsName,
-      boolean isJobToClearDataMaps) {
-    try {
-      Constructor<?> cons = Class.forName(clsName).getDeclaredConstructors()[0];
-      return (DistributableDataMapFormat) cons
-          .newInstance(carbonTable, dataMapExprWrapper, validsegments, invalidSegments,
-              partitionsToPrune, isJobToClearDataMaps);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+  /**
+   * Prune the segments from the already pruned blocklets.
+   */
+  public static void pruneSegments(List<Segment> segments, List<ExtendedBlocklet> prunedBlocklets) {
+    Set<Segment> validSegments = new HashSet<>();
+    for (ExtendedBlocklet blocklet : prunedBlocklets) {
+      // Clear the old pruned index files if any present
+      blocklet.getSegment().getFilteredIndexShardNames().clear();
+      // Set the pruned index file to the segment
+      // for further pruning.
+      String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());
+      blocklet.getSegment().setFilteredIndexShardName(shardName);
+      validSegments.add(blocklet.getSegment());
     }
+    segments.clear();
+    segments.addAll(validSegments);
+  }
+
+  static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
+      FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad,
+      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets) throws IOException {
+    pruneSegments(segmentsToLoad, blocklets);
+    List<ExtendedBlocklet> cgDataMaps = pruneDataMaps(table, filterResolverIntf, segmentsToLoad,
+        partitions, blocklets,
+        DataMapLevel.CG);
+    pruneSegments(segmentsToLoad, cgDataMaps);
+    return pruneDataMaps(table, filterResolverIntf, segmentsToLoad,
+        partitions, cgDataMaps,
+        DataMapLevel.FG);
+  }
+
+  static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
+      FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad,
+      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets, DataMapLevel dataMapLevel)
+      throws IOException {
+    DataMapExprWrapper dataMapExprWrapper =
+        new DataMapChooser(table).chooseDataMap(dataMapLevel, filterResolverIntf);
+    if (dataMapExprWrapper != null) {
+      List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
+      // Prune segments from already pruned blocklets
+      for (DataMapDistributableWrapper wrapper : dataMapExprWrapper
+          .toDistributable(segmentsToLoad)) {
+        TableDataMap dataMap = DataMapStoreManager.getInstance()
+            .getDataMap(table, wrapper.getDistributable().getDataMapSchema());
+        List<DataMap> dataMaps = dataMap.getTableDataMaps(wrapper.getDistributable());
+        List<ExtendedBlocklet> prunnedBlocklet = new ArrayList<>();
+        if (table.isTransactionalTable()) {
+          prunnedBlocklet.addAll(dataMap.prune(dataMaps, wrapper.getDistributable(),
+              dataMapExprWrapper.getFilterResolverIntf(wrapper.getUniqueId()), partitions));
+        } else {
+          prunnedBlocklet
+              .addAll(dataMap.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf),
+                  partitions));
+        }
+        // For all blocklets initialize the detail info so that it can be serialized to the driver.
+        for (ExtendedBlocklet blocklet : prunnedBlocklet) {
+          blocklet.getDetailInfo();
+          blocklet.setDataMapUniqueId(wrapper.getUniqueId());
+        }
+        extendedBlocklets.addAll(prunnedBlocklet);
+      }
+      return dataMapExprWrapper.pruneBlocklets(extendedBlocklets);
+    }
+    // For all blocklets initialize the detail info so that it can be serialized to the driver.
+    for (ExtendedBlocklet blocklet : blocklets) {
+      blocklet.getDetailInfo();
+    }
+    return blocklets;
   }
 
   /**
@@ -136,23 +214,36 @@ public class DataMapUtil {
    * @return list of Extended blocklets after pruning
    */
   public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
-      FilterResolverIntf resolver, List<Segment> validSegments,
-      DataMapExprWrapper dataMapExprWrapper, DataMapJob dataMapJob,
-      List<PartitionSpec> partitionsToPrune) throws IOException {
-    String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
-    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
-        getValidAndInvalidSegments(carbonTable, validSegments.get(0).getConfiguration());
-    List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
+      FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
+      List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
+      List<String> segmentsToBeRefreshed) throws IOException {
+    return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments,
+        invalidSegments, level, false, segmentsToBeRefreshed);
+  }
+
+  /**
+   * this method gets the datamapJob and call execute of that job, this will be launched for
+   * distributed CG or FG
+   * @return list of Extended blocklets after pruning
+   */
+  public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
+      FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
+      List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
+      Boolean isFallbackJob, List<String> segmentsToBeRefreshed) throws IOException {
+    List<String> invalidSegmentNo = new ArrayList<>();
+    for (Segment segment : invalidSegments) {
+      invalidSegmentNo.add(segment.getSegmentNo());
+    }
+    invalidSegmentNo.addAll(segmentsToBeRefreshed);
     DistributableDataMapFormat dataMapFormat =
-        createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments,
-            partitionsToPrune, className, false);
-    List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat, resolver);
+        new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo,
+            partitionsToPrune, false, level, isFallbackJob);
+    List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat);
     // Apply expression on the blocklets.
-    prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
     return prunedBlocklets;
   }
 
-  private static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
+  public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
       CarbonTable carbonTable, Configuration configuration) throws IOException {
     SegmentStatusManager ssm =
         new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration);
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 4c23008..f76cfec 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -16,62 +16,102 @@
  */
 package org.apache.carbondata.core.datamap;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.log4j.Logger;
 
 /**
  * Input format for datamaps, it makes the datamap pruning distributable.
  */
-public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBlocklet> implements
-    Serializable {
+public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBlocklet>
+    implements Serializable, Writable {
+
+  private static final transient Logger LOGGER =
+      LogServiceFactory.getLogService(DistributableDataMapFormat.class.getName());
+
+  private static final long serialVersionUID = 9189779090091151248L;
 
   private CarbonTable table;
 
-  private DataMapExprWrapper dataMapExprWrapper;
+  private FilterResolverIntf filterResolverIntf;
 
   private List<Segment> validSegments;
 
-  private List<Segment> invalidSegments;
+  private List<String> invalidSegments;
 
   private List<PartitionSpec> partitions;
 
-  private  DataMapDistributableWrapper distributable;
-
   private boolean isJobToClearDataMaps = false;
 
-  DistributableDataMapFormat(CarbonTable table, DataMapExprWrapper dataMapExprWrapper,
-      List<Segment> validSegments, List<Segment> invalidSegments, List<PartitionSpec> partitions,
-      boolean isJobToClearDataMaps) {
+  private DataMapLevel dataMapLevel;
+
+  private boolean isFallbackJob = false;
+
+  private String dataMapToClear = "";
+
+  private ReadCommittedScope readCommittedScope;
+
+  DistributableDataMapFormat() {
+
+  }
+
+  DistributableDataMapFormat(CarbonTable table,
+      List<Segment> validSegments, List<String> invalidSegments, boolean isJobToClearDataMaps,
+      String dataMapToClear) throws IOException {
+    this(table, null, validSegments, invalidSegments, null,
+        isJobToClearDataMaps, null, false);
+    this.dataMapToClear = dataMapToClear;
+  }
+
+  DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
+      List<Segment> validSegments, List<String> invalidSegments, List<PartitionSpec> partitions,
+      boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob)
+      throws IOException {
     this.table = table;
-    this.dataMapExprWrapper = dataMapExprWrapper;
+    this.filterResolverIntf = filterResolverIntf;
     this.validSegments = validSegments;
+    if (!validSegments.isEmpty()) {
+      this.readCommittedScope = validSegments.get(0).getReadCommittedScope();
+    }
     this.invalidSegments = invalidSegments;
     this.partitions = partitions;
     this.isJobToClearDataMaps = isJobToClearDataMaps;
+    this.dataMapLevel = dataMapLevel;
+    this.isFallbackJob = isFallbackJob;
   }
 
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
-    List<DataMapDistributableWrapper> distributables =
-        dataMapExprWrapper.toDistributable(validSegments);
+    List<DataMapDistributableWrapper> distributables;
+    distributables =
+        DataMapChooser.getDefaultDataMap(table, filterResolverIntf).toDistributable(validSegments);
     List<InputSplit> inputSplits = new ArrayList<>(distributables.size());
     inputSplits.addAll(distributables);
     return inputSplits;
@@ -85,33 +125,67 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
       private ExtendedBlocklet currBlocklet;
       private List<DataMap> dataMaps;
 
-      @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      @Override
+      public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
           throws IOException, InterruptedException {
-        distributable = (DataMapDistributableWrapper) inputSplit;
-        // clear the segmentMap and from cache in executor when there are invalid segments
-        if (invalidSegments.size() > 0) {
-          DataMapStoreManager.getInstance().clearInvalidSegments(table, invalidSegments);
-        }
-        TableDataMap tableDataMap = DataMapStoreManager.getInstance()
-            .getDataMap(table, distributable.getDistributable().getDataMapSchema());
+        DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
+        distributable.getDistributable().getSegment().setCacheable(!isFallbackJob);
+        distributable.getDistributable().getSegment().setReadCommittedScope(readCommittedScope);
+        List<Segment> segmentsToLoad = new ArrayList<>();
+        segmentsToLoad.add(distributable.getDistributable().getSegment());
         if (isJobToClearDataMaps) {
-          // if job is to clear datamaps just clear datamaps from cache and return
-          DataMapStoreManager.getInstance()
-              .clearDataMaps(table.getCarbonTableIdentifier().getTableUniqueName());
-          // clear the segment properties cache from executor
-          SegmentPropertiesAndSchemaHolder.getInstance()
-              .invalidate(table.getAbsoluteTableIdentifier());
-          blockletIterator = Collections.emptyIterator();
+          if (StringUtils.isNotEmpty(dataMapToClear)) {
+            List<TableDataMap> dataMaps =
+                DataMapStoreManager.getInstance().getAllDataMap(table);
+            int i = 0;
+            for (TableDataMap tableDataMap : dataMaps) {
+              if (tableDataMap != null && dataMapToClear
+                  .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
+                tableDataMap.deleteSegmentDatamapData(
+                    ((DataMapDistributableWrapper) inputSplit).getDistributable().getSegment()
+                        .getSegmentNo());
+                tableDataMap.clear();
+                dataMaps.remove(i);
+                break;
+              }
+              i++;
+            }
+            DataMapStoreManager.getInstance().getAllDataMaps().put(table.getTableUniqueName(),
+                dataMaps);
+          } else {
+            // if job is to clear datamaps just clear datamaps from cache and return
+            DataMapStoreManager.getInstance()
+                .clearDataMaps(table.getCarbonTableIdentifier().getTableUniqueName());
+            // clear the segment properties cache from executor
+            SegmentPropertiesAndSchemaHolder.getInstance()
+                .invalidate(table.getAbsoluteTableIdentifier());
+          }
+          List<ExtendedBlocklet> list = new ArrayList<ExtendedBlocklet>();
+          list.add(new ExtendedBlocklet());
+          blockletIterator = list.iterator();
           return;
+        } else if (invalidSegments.size() > 0) {
+          // clear the segmentMap and from cache in executor when there are invalid segments
+          DataMapStoreManager.getInstance().clearInvalidSegments(table, invalidSegments);
         }
-        dataMaps = tableDataMap.getTableDataMaps(distributable.getDistributable());
-        List<ExtendedBlocklet> blocklets = tableDataMap
-            .prune(dataMaps,
-                distributable.getDistributable(),
-                dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
-        for (ExtendedBlocklet blocklet : blocklets) {
-          blocklet.getDetailInfo();
-          blocklet.setDataMapUniqueId(distributable.getUniqueId());
+        List<ExtendedBlocklet> blocklets = new ArrayList<>();
+        if (dataMapLevel == null) {
+          TableDataMap defaultDataMap = DataMapStoreManager.getInstance()
+              .getDataMap(table, distributable.getDistributable().getDataMapSchema());
+          dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable());
+          if (table.isTransactionalTable()) {
+            blocklets = defaultDataMap.prune(dataMaps, distributable.getDistributable(),
+                filterResolverIntf, partitions);
+          } else {
+            blocklets = defaultDataMap.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf),
+                partitions);
+          }
+          blocklets = DataMapUtil
+              .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, partitions, blocklets);
+        } else {
+          blocklets = DataMapUtil
+              .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, partitions, blocklets,
+                  dataMapLevel);
         }
         blockletIterator = blocklets.iterator();
       }
@@ -154,4 +228,110 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
     };
   }
 
+  public CarbonTable getCarbonTable() {
+    return table;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    table.write(out);
+    out.writeInt(invalidSegments.size());
+    for (String invalidSegment : invalidSegments) {
+      out.writeUTF(invalidSegment);
+    }
+    out.writeBoolean(isJobToClearDataMaps);
+    out.writeBoolean(isFallbackJob);
+    if (dataMapLevel == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeUTF(dataMapLevel.name());
+    }
+    out.writeInt(validSegments.size());
+    for (Segment segment : validSegments) {
+      segment.write(out);
+    }
+    if (partitions == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeInt(partitions.size());
+      for (PartitionSpec partitionSpec : partitions) {
+        partitionSpec.write(out);
+      }
+    }
+    if (filterResolverIntf != null) {
+      out.writeBoolean(true);
+      byte[] filterResolverBytes = ObjectSerializationUtil.convertObjectToString(filterResolverIntf)
+          .getBytes(Charset.defaultCharset());
+      out.writeInt(filterResolverBytes.length);
+      out.write(filterResolverBytes);
+
+    } else {
+      out.writeBoolean(false);
+    }
+    out.writeUTF(dataMapToClear);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.table = new CarbonTable();
+    table.readFields(in);
+    int invalidSegmentSize = in.readInt();
+    invalidSegments = new ArrayList<>(invalidSegmentSize);
+    for (int i = 0; i < invalidSegmentSize; i++) {
+      invalidSegments.add(in.readUTF());
+    }
+    this.isJobToClearDataMaps = in.readBoolean();
+    this.isFallbackJob = in.readBoolean();
+    if (in.readBoolean()) {
+      this.dataMapLevel = DataMapLevel.valueOf(in.readUTF());
+    }
+    int validSegmentSize = in.readInt();
+    validSegments = new ArrayList<>(validSegmentSize);
+    initReadCommittedScope();
+    for (int i = 0; i < validSegmentSize; i++) {
+      Segment segment = new Segment();
+      segment.setReadCommittedScope(readCommittedScope);
+      segment.readFields(in);
+      validSegments.add(segment);
+    }
+    if (in.readBoolean()) {
+      int numPartitions = in.readInt();
+      partitions = new ArrayList<>(numPartitions);
+      for (int i = 0; i < numPartitions; i++) {
+        PartitionSpec partitionSpec = new PartitionSpec();
+        partitionSpec.readFields(in);
+        partitions.add(partitionSpec);
+      }
+    }
+    if (in.readBoolean()) {
+      byte[] filterResolverBytes = new byte[in.readInt()];
+      in.readFully(filterResolverBytes, 0, filterResolverBytes.length);
+      this.filterResolverIntf = (FilterResolverIntf) ObjectSerializationUtil
+          .convertStringToObject(new String(filterResolverBytes, Charset.defaultCharset()));
+    }
+    this.dataMapToClear = in.readUTF();
+  }
+
+  private void initReadCommittedScope() throws IOException {
+    if (readCommittedScope == null) {
+      this.readCommittedScope =
+          new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
+              FileFactory.getConfiguration());
+    }
+  }
+
+  /**
+   * @return Whether the job is fallback or not.
+   */
+  public boolean isFallbackJob() {
+    return isFallbackJob;
+  }
+
+  /**
+   * @return Whether the job is to clear cached datamaps or not.
+   */
+  public boolean isJobToClearDataMaps() {
+    return isJobToClearDataMaps;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 4797b53..9370be8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.core.datamap;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -25,6 +27,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -37,7 +40,7 @@ import org.apache.hadoop.conf.Configuration;
 /**
  * Represents one load of carbondata
  */
-public class Segment implements Serializable {
+public class Segment implements Serializable, Writable {
 
   private static final long serialVersionUID = 7044555408162234064L;
 
@@ -55,15 +58,26 @@ public class Segment implements Serializable {
    * transactional isolation level which only allows snapshot read of the
    * data and make non committed data invisible to the reader.
    */
-  private ReadCommittedScope readCommittedScope;
+  private transient ReadCommittedScope readCommittedScope;
 
   /**
    * keeps all the details about segments
    */
-  private LoadMetadataDetails loadMetadataDetails;
+  private transient LoadMetadataDetails loadMetadataDetails;
 
   private String segmentString;
 
+  private long indexSize = 0;
+
+  /**
+   * Whether to cache the segment data maps in executors or not.
+   */
+  private boolean isCacheable = true;
+
+  public Segment() {
+
+  }
+
   public Segment(String segmentNo) {
     this.segmentNo = segmentNo;
   }
@@ -120,6 +134,9 @@ public class Segment implements Serializable {
     this.segmentFileName = segmentFileName;
     this.readCommittedScope = readCommittedScope;
     this.loadMetadataDetails = loadMetadataDetails;
+    if (loadMetadataDetails.getIndexSize() != null) {
+      this.indexSize = Long.parseLong(loadMetadataDetails.getIndexSize());
+    }
     if (segmentFileName != null) {
       segmentString = segmentNo + "#" + segmentFileName;
     } else {
@@ -155,6 +172,10 @@ public class Segment implements Serializable {
     this.readCommittedScope = readCommittedScope;
   }
 
+  public ReadCommittedScope getReadCommittedScope() {
+    return readCommittedScope;
+  }
+
   public static List<Segment> toSegmentList(String[] segmentIds,
       ReadCommittedScope readCommittedScope) {
     List<Segment> list = new ArrayList<>(segmentIds.length);
@@ -257,4 +278,56 @@ public class Segment implements Serializable {
   public LoadMetadataDetails getLoadMetadataDetails() {
     return loadMetadataDetails;
   }
+
+  public long getIndexSize() {
+    return indexSize;
+  }
+
+  public void setIndexSize(long indexSize) {
+    this.indexSize = indexSize;
+  }
+
+  public boolean isCacheable() {
+    return isCacheable;
+  }
+
+  public void setCacheable(boolean cacheable) {
+    isCacheable = cacheable;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeUTF(segmentNo);
+    boolean writeSegmentFileName = segmentFileName != null;
+    out.writeBoolean(writeSegmentFileName);
+    if (writeSegmentFileName) {
+      out.writeUTF(segmentFileName);
+    }
+    out.writeInt(filteredIndexShardNames.size());
+    for (String name: filteredIndexShardNames) {
+      out.writeUTF(name);
+    }
+    if (segmentString == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeUTF(segmentString);
+    }
+    out.writeLong(indexSize);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    this.segmentNo = in.readUTF();
+    if (in.readBoolean()) {
+      this.segmentFileName = in.readUTF();
+    }
+    filteredIndexShardNames = new HashSet<>();
+    int indexShardNameSize = in.readInt();
+    for (int i = 0; i < indexShardNameSize; i++) {
+      filteredIndexShardNames.add(in.readUTF());
+    }
+    if (in.readBoolean()) {
+      this.segmentString = in.readUTF();
+    }
+    this.indexSize = in.readLong();
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 4375abb..bc87298 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -356,14 +356,7 @@ public final class TableDataMap extends OperationEventListener {
   public List<DataMapDistributable> toDistributable(List<Segment> segments) throws IOException {
     List<DataMapDistributable> distributables = new ArrayList<>();
     for (Segment segment : segments) {
-      List<DataMapDistributable> list =
-          dataMapFactory.toDistributable(segment);
-      for (DataMapDistributable distributable : list) {
-        distributable.setDataMapSchema(dataMapSchema);
-        distributable.setSegment(segment);
-        distributable.setTablePath(identifier.getTablePath());
-      }
-      distributables.addAll(list);
+      distributables.addAll(dataMapFactory.toDistributable(segment));
     }
     return distributables;
   }
@@ -420,10 +413,10 @@ public final class TableDataMap extends OperationEventListener {
 
   /**
    * Clear only the datamaps of the segments
-   * @param segments
+   * @param segmentIds list of segmentIds to be cleared from cache.
    */
-  public void clear(List<Segment> segments) {
-    for (Segment segment: segments) {
+  public void clear(List<String> segmentIds) {
+    for (String segment: segmentIds) {
       dataMapFactory.clear(segment);
     }
   }
@@ -452,6 +445,13 @@ public final class TableDataMap extends OperationEventListener {
     dataMapFactory.deleteDatamapData();
   }
 
+  /**
+   * delete datamap data for a segment if any
+   */
+  public void deleteSegmentDatamapData(String segmentNo) throws IOException {
+    dataMapFactory.deleteSegmentDatamapData(segmentNo);
+  }
+
   public DataMapSchema getDataMapSchema() {
     return dataMapSchema;
   }
@@ -465,31 +465,6 @@ public final class TableDataMap extends OperationEventListener {
   }
 
   /**
-   * Method to prune the segments based on task min/max values
-   *
-   * @param segments
-   * @param filterExp
-   * @return
-   * @throws IOException
-   */
-  public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp)
-      throws IOException {
-    List<Segment> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (Segment segment : segments) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
-      for (DataMap dataMap : dataMaps) {
-        if (dataMap.isScanRequired(filterExp)) {
-          // If any one task in a given segment contains the data that means the segment need to
-          // be scanned and we need to validate further data maps in the same segment
-          prunedSegments.add(segment);
-          break;
-        }
-      }
-    }
-    return prunedSegments;
-  }
-
-  /**
    * Prune the datamap of the given segments and return the Map of blocklet path and row count
    *
    * @param segments
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index b32a482..3fa7be6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -111,9 +111,11 @@ public abstract class DataMapFactory<T extends DataMap> {
   public abstract void fireEvent(Event event);
 
   /**
-   * Clears datamap of the segment
+   * Clear all datamaps for a segment from memory
    */
-  public abstract void clear(Segment segment);
+  public void clear(String segmentNo) {
+
+  }
 
   /**
    * Clear all datamaps from memory
@@ -141,6 +143,13 @@ public abstract class DataMapFactory<T extends DataMap> {
   public abstract void deleteDatamapData();
 
   /**
+   * delete datamap data if any
+   */
+  public void deleteSegmentDatamapData(String segmentNo) throws IOException {
+
+  }
+
+  /**
    * This function should return true is the input operation enum will make the datamap become stale
    */
   public abstract boolean willBecomeStale(TableOperation operation);
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 3270d08..9aeb6c4 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -21,7 +21,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 
-import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.hadoop.io.Writable;
 
 /**
  * Blocklet
@@ -67,14 +67,28 @@ public class Blocklet implements Writable,Serializable {
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeUTF(filePath);
-    out.writeUTF(blockletId);
+    if (filePath == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeUTF(filePath);
+    }
+    if (blockletId == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeUTF(blockletId);
+    }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    filePath = in.readUTF();
-    blockletId = in.readUTF();
+    if (in.readBoolean()) {
+      filePath = in.readUTF();
+    }
+    if (in.readBoolean()) {
+      blockletId = in.readUTF();
+    }
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index ae01e9e..5eace3c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -60,4 +60,6 @@ public interface BlockletDetailsFetcher {
    * clears the datamap from cache and segmentMap from executor
    */
   void clear();
+
+  String getCacheSize() throws IOException ;
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 3d6cedd..1de1ab5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
@@ -34,6 +36,10 @@ public class ExtendedBlocklet extends Blocklet {
 
   private CarbonInputSplit inputSplit;
 
+  public ExtendedBlocklet() {
+
+  }
+
   public ExtendedBlocklet(String filePath, String blockletId,
       boolean compareBlockletIdForObjectMatching, ColumnarFormatVersion version) {
     super(filePath, blockletId, compareBlockletIdForObjectMatching);
@@ -144,4 +150,51 @@ public class ExtendedBlocklet extends Blocklet {
     this.inputSplit.setColumnSchema(columnSchema);
   }
 
+
+
+  @Override public void write(DataOutput out) throws IOException {
+    super.write(out);
+    if (dataMapUniqueId == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeUTF(dataMapUniqueId);
+    }
+    if (inputSplit != null) {
+      out.writeBoolean(true);
+      inputSplit.write(out);
+      String[] locations = getLocations();
+      if (locations != null) {
+        out.writeBoolean(true);
+        out.writeInt(locations.length);
+        for (String location : locations) {
+          out.writeUTF(location);
+        }
+      } else {
+        out.writeBoolean(false);
+      }
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    if (in.readBoolean()) {
+      dataMapUniqueId = in.readUTF();
+    }
+    if (in.readBoolean()) {
+      inputSplit = new CarbonInputSplit();
+      inputSplit.readFields(in);
+      if (in.readBoolean()) {
+        int numLocations = in.readInt();
+        String[] locations = new String[numLocations];
+        for (int i = 0; i < numLocations; i++) {
+          locations[i] = in.readUTF();
+        }
+        inputSplit.setLocation(locations);
+      }
+    }
+  }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
index 87c875e..0d989cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
@@ -16,19 +16,24 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
 
 /**
  * Holds partition information.
  */
-public class PartitionSpec implements Serializable {
+public class PartitionSpec implements Serializable, Writable {
 
   private static final long serialVersionUID = 4828007433384867678L;
 
@@ -43,6 +48,10 @@ public class PartitionSpec implements Serializable {
 
   private String uuid;
 
+  public PartitionSpec() {
+
+  }
+
   public PartitionSpec(List<String> partitions, String location) {
     this.partitions = partitions;
     this.locationPath = new Path(FileFactory.getUpdatedFilePath(location));
@@ -89,4 +98,45 @@ public class PartitionSpec implements Serializable {
     return "PartitionSpec{" + "partitions=" + partitions + ", locationPath=" + locationPath
         + ", location='" + location + '\'' + '}';
   }
+
+  @Override public void write(DataOutput out) throws IOException {
+    if (partitions == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeInt(partitions.size());
+      for (String partition : partitions) {
+        out.writeUTF(partition);
+      }
+    }
+    if (uuid == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeUTF(uuid);
+    }
+    if (location == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeUTF(location);
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    if (in.readBoolean()) {
+      int numPartitions = in.readInt();
+      partitions = new ArrayList<>(numPartitions);
+      for (int i = 0; i < numPartitions; i++) {
+        partitions.add(in.readUTF());
+      }
+    }
+    if (in.readBoolean()) {
+      uuid = in.readUTF();
+    }
+    if (in.readBoolean()) {
+      location = in.readUTF();
+    }
+  }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
index b125197..88554fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
@@ -53,7 +53,7 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
     this.configuration = FileFactory.getConfiguration();
   }
 
-  public TableBlockIndexUniqueIdentifierWrapper(
+  private TableBlockIndexUniqueIdentifierWrapper(
       TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable,
       Configuration configuration) {
     this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
@@ -65,9 +65,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
   // Kindly do not remove
   public TableBlockIndexUniqueIdentifierWrapper(
       TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable,
-      boolean addTableBlockToUnsafeAndLRUCache) {
-    this(tableBlockIndexUniqueIdentifier, carbonTable);
-    this.configuration = FileFactory.getConfiguration();
+      Configuration configuration, boolean addTableBlockToUnsafeAndLRUCache) {
+    this(tableBlockIndexUniqueIdentifier, carbonTable, configuration);
     this.addTableBlockToUnsafeAndLRUCache = addTableBlockToUnsafeAndLRUCache;
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 7cdf77d..bb91eb0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -32,8 +32,14 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
    */
   private String filePath;
 
+  private String segmentPath;
+
   private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
 
+  public BlockletDataMapDistributable() {
+
+  }
+
   public BlockletDataMapDistributable(String indexFilePath) {
     this.filePath = indexFilePath;
   }
@@ -50,4 +56,12 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
       TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifiers) {
     this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifiers;
   }
+
+  public String getSegmentPath() {
+    return segmentPath;
+  }
+
+  public void setSegmentPath(String segmentPath) {
+    this.segmentPath = segmentPath;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 93be06e..e4a3ad8 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
@@ -33,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -54,10 +54,7 @@ import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 
 /**
  * Table map for blocklet
@@ -134,7 +131,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
         tableBlockIndexUniqueIdentifierWrappers.add(
             new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
-                this.getCarbonTable(), segment.getConfiguration()));
+                this.getCarbonTable(), segment.getConfiguration(), segment.isCacheable()));
       }
     }
     List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
@@ -257,29 +254,14 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> distributables = new ArrayList<>();
     try {
-      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-          getTableBlockIndexUniqueIdentifiers(segment);
-      CarbonFile[] carbonIndexFiles = new CarbonFile[tableBlockIndexUniqueIdentifiers.size()];
-      int identifierCounter = 0;
-      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
-          tableBlockIndexUniqueIdentifiers) {
-        String indexFilePath = tableBlockIndexUniqueIdentifier.getIndexFilePath();
-        String fileName = tableBlockIndexUniqueIdentifier.getIndexFileName();
-        carbonIndexFiles[identifierCounter++] = FileFactory
-            .getCarbonFile(indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + fileName);
-      }
-      for (int i = 0; i < carbonIndexFiles.length; i++) {
-        Path path = new Path(carbonIndexFiles[i].getPath());
-        FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
-        RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
-        LocatedFileStatus fileStatus = iter.next();
-        String[] location = fileStatus.getBlockLocations()[0].getHosts();
-        BlockletDataMapDistributable distributable =
-            new BlockletDataMapDistributable(path.toString());
-        distributable.setLocations(location);
-        distributables.add(distributable);
-      }
-    } catch (IOException e) {
+      BlockletDataMapDistributable distributable = new BlockletDataMapDistributable();
+      distributable.setSegment(segment);
+      distributable.setDataMapSchema(DATA_MAP_SCHEMA);
+      distributable.setSegmentPath(CarbonTablePath.getSegmentPath(identifier.getTablePath(),
+          segment.getSegmentNo()));
+      distributables.add(new DataMapDistributableWrapper(UUID.randomUUID().toString(),
+          distributable).getDistributable());
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
     return distributables;
@@ -291,8 +273,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override
-  public void clear(Segment segment) {
-    Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
+  public void clear(String segment) {
+    Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment);
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
         TableBlockIndexUniqueIdentifierWrapper blockIndexWrapper =
@@ -315,22 +297,95 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   public synchronized void clear() {
     if (segmentMap.size() > 0) {
       for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
-        clear(new Segment(segmentId, null, null));
+        clear(segmentId);
       }
     }
   }
 
-  @Override
-  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+  @Override public String getCacheSize() throws IOException {
+    long sum = 0L;
+    int numOfIndexFiles = 0;
+    for (Map.Entry<String, Set<TableBlockIndexUniqueIdentifier>> entry : segmentMap.entrySet()) {
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : entry.getValue()) {
+        sum += cache.get(new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+            getCarbonTable())).getMemorySize();
+        numOfIndexFiles++;
+      }
+    }
+    return numOfIndexFiles + ":" + sum;
+  }
+
+  @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
       throws IOException {
     BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
-    List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>();
-    Path indexPath = new Path(mapDistributable.getFilePath());
+    List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper;
     String segmentNo = mapDistributable.getSegment().getSegmentNo();
+    if (mapDistributable.getSegmentPath() != null) {
+      identifiersWrapper = getTableBlockIndexUniqueIdentifier(distributable);
+    } else {
+      identifiersWrapper =
+          getTableBlockIndexUniqueIdentifier(mapDistributable.getFilePath(), segmentNo);
+    }
+    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
+    try {
+      List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiersWrapper);
+      for (BlockletDataMapIndexWrapper wrapper : wrappers) {
+        dataMaps.addAll(wrapper.getDataMaps());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return dataMaps;
+  }
+
+  private List<TableBlockIndexUniqueIdentifierWrapper> getTableBlockIndexUniqueIdentifier(
+      DataMapDistributable distributable) throws IOException {
+    List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>();
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+        segmentMap.get(distributable.getSegment().getSegmentNo());
+    if (tableBlockIndexUniqueIdentifiers == null) {
+      Set<String> indexFiles = distributable.getSegment().getCommittedIndexFile().keySet();
+      for (String indexFile : indexFiles) {
+        CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
+        String indexFileName;
+        String mergeIndexName;
+        if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+          indexFileName = carbonFile.getName();
+          mergeIndexName = null;
+        } else {
+          indexFileName = carbonFile.getName();
+          mergeIndexName = carbonFile.getName();
+        }
+        String parentPath = carbonFile.getParentFile().getAbsolutePath();
+        TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier =
+            new TableBlockIndexUniqueIdentifier(parentPath, indexFileName, mergeIndexName,
+                distributable.getSegment().getSegmentNo());
+        identifiersWrapper.add(
+            new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+                this.getCarbonTable()));
+        tableBlockIndexUniqueIdentifiers = new HashSet<>();
+        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
+        segmentMap.put(distributable.getSegment().getSegmentNo(), tableBlockIndexUniqueIdentifiers);
+      }
+    } else {
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+          tableBlockIndexUniqueIdentifiers) {
+        identifiersWrapper.add(
+            new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+                getCarbonTable()));
+      }
+    }
+    return identifiersWrapper;
+  }
+
+  private List<TableBlockIndexUniqueIdentifierWrapper> getTableBlockIndexUniqueIdentifier(
+      String indexFilePath, String segmentId) throws IOException {
+    Path indexPath = new Path(indexFilePath);
+    List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>();
     if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
       String parent = indexPath.getParent().toString();
       identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
-          new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo),
+          new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentId),
           this.getCarbonTable()));
     } else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
       SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
@@ -340,19 +395,10 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       for (String indexFile : indexFiles) {
         identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
             new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(),
-                segmentNo), this.getCarbonTable()));
+                segmentId), this.getCarbonTable()));
       }
     }
-    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
-    try {
-      List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiersWrapper);
-      for (BlockletDataMapIndexWrapper wrapper : wrappers) {
-        dataMaps.addAll(wrapper.getDataMaps());
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return dataMaps;
+    return identifiersWrapper;
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 224b230..69e5dc3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -342,13 +342,10 @@ public class SegmentFileStore {
    */
   public static void clearBlockDataMapCache(CarbonTable carbonTable, String segmentId) {
     TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable);
-    Segment segment = new Segment(segmentId);
-    List<Segment> segments = new ArrayList<>();
-    segments.add(segment);
     LOGGER.info(
         "clearing cache while updating segment file entry in table status file for segmentId: "
             + segmentId);
-    defaultDataMap.clear(segments);
+    defaultDataMap.getDataMapFactory().clear(segmentId);
   }
 
   private static CarbonFile[] getSegmentFiles(String segmentPath) {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
index c8bb5ad..dcc3336 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum;
  */
 public class AggregationDataMapSchema extends DataMapSchema {
 
+  private static final long serialVersionUID = 5900935117929888412L;
   /**
    * map of parent column name to set of child column column without
    * aggregation function
@@ -63,7 +64,9 @@ public class AggregationDataMapSchema extends DataMapSchema {
    */
   private int ordinal = Integer.MAX_VALUE;
 
-  private Set aggExpToColumnMapping;
+  // Dont remove transient otherwise serialization for carbonTable will fail using
+  // JavaSerialization in spark.
+  private transient Set aggExpToColumnMapping;
 
   AggregationDataMapSchema(String dataMapName, String className) {
     super(dataMapName, className);
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f9ba6f5..0f1f628 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.metadata.schema.table;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -71,7 +73,7 @@ import org.apache.log4j.Logger;
 /**
  * Mapping class for Carbon actual table
  */
-public class CarbonTable implements Serializable {
+public class CarbonTable implements Serializable, Writable {
 
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(CarbonTable.class.getName());
@@ -184,7 +186,7 @@ public class CarbonTable implements Serializable {
    */
   private boolean isTransactionalTable = true;
 
-  private CarbonTable() {
+  public CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableMeasuresMap = new HashMap<String, List<CarbonMeasure>>();
@@ -1372,4 +1374,14 @@ public class CarbonTable implements Serializable {
       return SortScopeOptions.getSortScope(sortScope);
     }
   }
+
+  @Override public void write(DataOutput out) throws IOException {
+    tableInfo.write(out);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    tableInfo = new TableInfo();
+    tableInfo.readFields(in);
+    updateTableByTableInfo(this, tableInfo);
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 68aad72..c90c3dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -228,7 +228,7 @@ public class BlockletDataMapUtil {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new ArrayList<>();
     String mergeFilePath =
         identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-            .getIndexFileName();
+            .getMergeIndexFileName();
     segmentIndexFileStore.readMergeFile(mergeFilePath);
     List<String> indexFiles =
         segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath);
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index e26f3d8..f1aade9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1607,4 +1607,56 @@ public final class CarbonProperties {
       }
     }
   }
+
+  /**
+   * Check whether the Distributed Pruning is enabled by the user or not.
+   */
+  public boolean isDistributedPruningEnabled(String dbName, String tableName) {
+    // Check if user has enabled/disabled the use of index server for the current session using
+    // the set command
+    String configuredValue = getSessionPropertyValue(
+        CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER + "." + dbName + "." + tableName);
+    if (configuredValue == null) {
+      // if not set in session properties then check carbon.properties for the same.
+      configuredValue = getProperty(CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER);
+    }
+    boolean isServerEnabledByUser = Boolean.parseBoolean(configuredValue);
+    if (isServerEnabledByUser) {
+      LOGGER.info("Distributed Index server is enabled for " + dbName + "." + tableName);
+    }
+    return isServerEnabledByUser;
+  }
+
+  public String getIndexServerIP() {
+    return carbonProperties.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_IP, "");
+  }
+
+  public int getIndexServerPort() {
+    String configuredPort =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_PORT);
+    try {
+      return Integer.parseInt(configuredPort);
+    } catch (NumberFormatException e) {
+      LOGGER.error("Configured port for index server is not a valid number", e);
+      throw e;
+    }
+  }
+
+  /**
+   * Whether fallback is disabled by the user or not.
+   */
+  public boolean isFallBackDisabled() {
+    return Boolean.parseBoolean(carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_DISABLE_INDEX_SERVER_FALLBACK, "false"));
+  }
+
+  public int getNumberOfHandlersForIndexServer() {
+    String configuredValue =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_WORKER_THREADS);
+    if (configuredValue != null) {
+      return Integer.parseInt(configuredValue);
+    }
+    return CarbonCommonConstants.CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT;
+  }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index d9aa214..fbae502 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -143,6 +143,7 @@ public class SessionParams implements Serializable, Cloneable {
       case ENABLE_UNSAFE_IN_QUERY_EXECUTION:
       case ENABLE_AUTO_LOAD_MERGE:
       case CARBON_PUSH_ROW_FILTERS_FOR_VECTOR:
+      case CARBON_ENABLE_INDEX_SERVER:
         isValid = CarbonUtil.validateBoolean(value);
         if (!isValid) {
           throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 406456f..931b41b 100644
--- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -625,4 +625,8 @@ public class CarbonInputSplit extends FileSplit
     }
     return this.location;
   }
+
+  public void setLocation(String[] location) {
+    this.location = location;
+  }
 }
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 11b216e..03599a9 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -313,6 +313,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
           filteredShards.contains(new File(shardPath).getName())) {
         DataMapDistributable bloomDataMapDistributable =
             new BloomDataMapDistributable(shardPath, filteredShards);
+        bloomDataMapDistributable.setSegment(segment);
+        bloomDataMapDistributable.setDataMapSchema(getDataMapSchema());
         dataMapDistributableList.add(bloomDataMapDistributable);
       }
     }
@@ -325,8 +327,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
   }
 
   @Override
-  public void clear(Segment segment) {
-    Set<String> shards = segmentMap.remove(segment.getSegmentNo());
+  public void clear(String segment) {
+    Set<String> shards = segmentMap.remove(segment);
     if (shards != null) {
       for (String shard : shards) {
         for (CarbonColumn carbonColumn : dataMapMeta.getIndexedColumns()) {
@@ -341,15 +343,19 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     if (segmentMap.size() > 0) {
       List<String> segments = new ArrayList<>(segmentMap.keySet());
       for (String segmentId : segments) {
-        clear(new Segment(segmentId, null, null));
+        clear(segmentId);
       }
     }
   }
 
   @Override
   public void deleteDatamapData(Segment segment) throws IOException {
+    deleteSegmentDatamapData(segment.getSegmentNo());
+  }
+
+  @Override
+  public void deleteSegmentDatamapData(String segmentId) throws IOException {
     try {
-      String segmentId = segment.getSegmentNo();
       String datamapPath = CarbonTablePath
           .getDataMapStorePath(getCarbonTable().getTablePath(), segmentId, dataMapName);
       if (FileFactory.isFileExist(datamapPath)) {
@@ -357,9 +363,9 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
             FileFactory.getFileType(datamapPath));
         CarbonUtil.deleteFoldersAndFilesSilent(file);
       }
-      clear(segment);
+      clear(segmentId);
     } catch (InterruptedException ex) {
-      throw new IOException("Failed to delete datamap for segment_" + segment.getSegmentNo());
+      throw new IOException("Failed to delete datamap for segment_" + segmentId);
     }
   }
 
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 3ae390d..33f30b6 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -221,6 +221,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
         DataMapDistributable luceneDataMapDistributable =
             new LuceneDataMapDistributable(tableIdentifier.getTablePath(),
                 indexDir.getAbsolutePath());
+        luceneDataMapDistributable.setSegment(segment);
+        luceneDataMapDistributable.setDataMapSchema(getDataMapSchema());
         lstDataMapDistribute.add(luceneDataMapDistributable);
       }
       return lstDataMapDistribute;
@@ -234,6 +236,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
       DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(
           CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()),
           indexDir.getAbsolutePath());
+      luceneDataMapDistributable.setSegment(segment);
+      luceneDataMapDistributable.setDataMapSchema(getDataMapSchema());
       lstDataMapDistribute.add(luceneDataMapDistributable);
     }
     return lstDataMapDistribute;
@@ -245,14 +249,6 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
   }
 
   /**
-   * Clears datamap of the segment
-   */
-  @Override
-  public void clear(Segment segment) {
-
-  }
-
-  /**
    * Clear all datamaps from memory
    */
   @Override
@@ -262,8 +258,11 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
 
   @Override
   public void deleteDatamapData(Segment segment) throws IOException {
+    deleteSegmentDatamapData(segment.getSegmentNo());
+  }
+
+  @Override public void deleteSegmentDatamapData(String segmentId) throws IOException {
     try {
-      String segmentId = segment.getSegmentNo();
       String datamapPath = CarbonTablePath
           .getDataMapStorePath(tableIdentifier.getTablePath(), segmentId, dataMapName);
       if (FileFactory.isFileExist(datamapPath)) {
diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml
index 63b6bd5..cac0eb4 100644
--- a/dev/findbugs-exclude.xml
+++ b/dev/findbugs-exclude.xml
@@ -59,6 +59,10 @@
     <Bug pattern="STCAL_INVOKE_ON_STATIC_DATE_FORMAT_INSTANCE"/>
   </Match>
   <Match>
+    <Class name="org.apache.carbondata.core.datamap.DistributableDataMapFormat"/>
+    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
+  </Match>
+  <Match>
     <!--
     Returning a reference to a mutable object value stored in one of the object's fields exposes
     the internal representation of the object.  If instances are accessed by untrusted code,
@@ -102,4 +106,8 @@
   <Match>
     <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
   </Match>
+  <Match>
+    <Class name="org.apache.carbondata.core.datamap.Segment"/>
+    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
+  </Match>
 </FindBugsFilter>
\ No newline at end of file
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index d81b02c..6051d4f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -225,8 +225,8 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
 
     // for each segment fetch blocks matching filter in Driver BTree
     List<CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, carbonTable, expression, matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList);
+        getDataBlocksOfSegment(job, carbonTable, expression, matchedPartitions, validSegments,
+            partitionInfo, oldPartitionIdList, new ArrayList<Segment>(), new ArrayList<String>());
     numBlocks = dataBlocksOfSegment.size();
     result.addAll(dataBlocksOfSegment);
     return result;
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 90532fb..e31b5b1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapChooser;
 import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapJob;
+import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.DataMapUtil;
 import org.apache.carbondata.core.datamap.Segment;
@@ -47,6 +48,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
@@ -55,6 +57,7 @@ import org.apache.carbondata.core.profiler.ExplainCollector;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.stats.QueryStatistic;
@@ -130,6 +133,9 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   protected int hitedStreamFiles = 0;
   protected int numBlocks = 0;
 
+  private CarbonTable carbonTable;
+
+
   public int getNumSegments() {
     return numSegments;
   }
@@ -178,8 +184,25 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  public abstract CarbonTable getOrCreateCarbonTable(Configuration configuration)
-      throws IOException;
+  public CarbonTable getOrCreateCarbonTable(Configuration configuration)
+      throws IOException {
+    if (carbonTable == null) {
+      // carbon table should be created either from deserialized table info (schema saved in
+      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+      TableInfo tableInfo = getTableInfo(configuration);
+      CarbonTable carbonTable;
+      if (tableInfo != null) {
+        carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+      } else {
+        carbonTable = SchemaReader.readCarbonTableFromStore(
+            getAbsoluteTableIdentifier(configuration));
+      }
+      this.carbonTable = carbonTable;
+      return carbonTable;
+    } else {
+      return this.carbonTable;
+    }
+  }
 
   public static void setTablePath(Configuration configuration, String tablePath) {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
@@ -384,6 +407,33 @@ m filterExpression
    */
   @Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException;
 
+  List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
+      FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
+      List<Segment> validSegments, List<Segment> invalidSegments,
+      List<String> segmentsToBeRefreshed) throws IOException {
+    try {
+      DataMapJob dataMapJob =
+          (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
+      if (dataMapJob == null) {
+        throw new ExceptionInInitializerError("Unable to create DistributedDataMapJob");
+      }
+      return DataMapUtil
+          .executeDataMapJob(table, filterResolverIntf, dataMapJob, partitionNames, validSegments,
+              invalidSegments, null, segmentsToBeRefreshed);
+    } catch (Exception e) {
+      // Check if fallback is disabled for testing purposes then directly throw exception.
+      if (CarbonProperties.getInstance().isFallBackDisabled()) {
+        throw e;
+      }
+      LOG.error("Exception occurred while getting splits using index server. Initiating Fall "
+          + "back to embedded mode", e);
+      return DataMapUtil
+          .executeDataMapJob(table, filterResolverIntf,
+              DataMapUtil.getEmbeddedJob(), partitionNames, validSegments, invalidSegments, null,
+              true, segmentsToBeRefreshed);
+    }
+  }
+
   protected Expression getFilterPredicates(Configuration configuration) {
     try {
       String filterExprString = configuration.get(FILTER_PREDICATE);
@@ -402,7 +452,9 @@ m filterExpression
    */
   protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTable carbonTable,
       Expression expression, BitSet matchedPartitions, List<Segment> segmentIds,
-      PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) throws IOException {
+      PartitionInfo partitionInfo, List<Integer> oldPartitionIdList,
+      List<Segment> invalidSegments, List<String> segmentsToBeRefreshed)
+      throws IOException {
 
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
@@ -411,7 +463,8 @@ m filterExpression
     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
         new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets =
-        getPrunedBlocklets(job, carbonTable, expression, segmentIds);
+        getPrunedBlocklets(job, carbonTable, expression, segmentIds, invalidSegments,
+            segmentsToBeRefreshed);
     List<CarbonInputSplit> resultFilteredBlocks = new ArrayList<>();
     int partitionIndex = 0;
     List<Integer> partitionIdList = new ArrayList<>();
@@ -466,7 +519,8 @@ m filterExpression
    * First pruned with default blocklet datamap, then pruned with CG and FG datamaps
    */
   private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
-      Expression expression, List<Segment> segmentIds) throws IOException {
+      Expression expression, List<Segment> segmentIds, List<Segment> invalidSegments,
+      List<String> segmentsToBeRefreshed) throws IOException {
     ExplainCollector.addPruningInfo(carbonTable.getTableName());
     final DataMapFilter filter = new DataMapFilter(carbonTable, expression);
     ExplainCollector.setFilterStatement(expression == null ? "none" : expression.getStatement());
@@ -480,57 +534,83 @@ m filterExpression
     List<ExtendedBlocklet> prunedBlocklets = null;
     // This is to log the event, so user will know what is happening by seeing logs.
     LOG.info("Started block pruning ...");
-    prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune);
+    boolean isDistributedPruningEnabled = CarbonProperties.getInstance()
+        .isDistributedPruningEnabled(carbonTable.getDatabaseName(), carbonTable.getTableName());
+    if (isDistributedPruningEnabled) {
+      try {
+        prunedBlocklets =
+            getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, segmentIds,
+                invalidSegments, segmentsToBeRefreshed);
+      } catch (Exception e) {
+        // Check if fallback is disabled then directly throw exception otherwise try driver
+        // pruning.
+        if (CarbonProperties.getInstance().isFallBackDisabled()) {
+          throw e;
+        }
+        prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune);
+      }
+    } else {
+      prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune);
 
-    if (ExplainCollector.enabled()) {
-      ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets));
-    }
+      if (ExplainCollector.enabled()) {
+        ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets));
+      }
 
-    if (prunedBlocklets.size() == 0) {
-      return prunedBlocklets;
-    }
+      if (prunedBlocklets.size() == 0) {
+        return prunedBlocklets;
+      }
 
-    DataMapChooser chooser = new DataMapChooser(getOrCreateCarbonTable(job.getConfiguration()));
+      DataMapChooser chooser = new DataMapChooser(getOrCreateCarbonTable(job.getConfiguration()));
 
-    // Get the available CG datamaps and prune further.
-    DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(filter.getResolver());
-    if (cgDataMapExprWrapper != null) {
-      // Prune segments from already pruned blocklets
-      pruneSegments(segmentIds, prunedBlocklets);
-      List<ExtendedBlocklet> cgPrunedBlocklets;
-      // Again prune with CG datamap.
-      if (distributedCG && dataMapJob != null) {
-        cgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable, filter.getResolver(),
-            segmentIds, cgDataMapExprWrapper, dataMapJob, partitionsToPrune);
-      } else {
-        cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
-      }
-      // since index datamap prune in segment scope,
-      // the result need to intersect with previous pruned result
-      prunedBlocklets = intersectFilteredBlocklets(carbonTable, prunedBlocklets, cgPrunedBlocklets);
-      ExplainCollector.recordCGDataMapPruning(
-          DataMapWrapperSimpleInfo.fromDataMapWrapper(cgDataMapExprWrapper),
-          prunedBlocklets.size(), getBlockCount(prunedBlocklets));
-    }
+      // Get the available CG datamaps and prune further.
+      DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(filter.getResolver());
 
-    if (prunedBlocklets.size() == 0) {
-      return prunedBlocklets;
-    }
-    // Now try to prune with FG DataMap.
-    if (isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) {
-      DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(filter.getResolver());
-      if (fgDataMapExprWrapper != null) {
+      if (cgDataMapExprWrapper != null) {
         // Prune segments from already pruned blocklets
-        pruneSegments(segmentIds, prunedBlocklets);
-        List<ExtendedBlocklet> fgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable,
-            filter.getResolver(), segmentIds, fgDataMapExprWrapper, dataMapJob, partitionsToPrune);
-        // note that the 'fgPrunedBlocklets' has extra datamap related info compared with
-        // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
-        prunedBlocklets = intersectFilteredBlocklets(carbonTable, prunedBlocklets,
-            fgPrunedBlocklets);
-        ExplainCollector.recordFGDataMapPruning(
-            DataMapWrapperSimpleInfo.fromDataMapWrapper(fgDataMapExprWrapper),
-            prunedBlocklets.size(), getBlockCount(prunedBlocklets));
+        DataMapUtil.pruneSegments(segmentIds, prunedBlocklets);
+        List<ExtendedBlocklet> cgPrunedBlocklets;
+        // Again prune with CG datamap.
+        if (distributedCG && dataMapJob != null) {
+          cgPrunedBlocklets = DataMapUtil
+              .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
+                  segmentIds, invalidSegments, DataMapLevel.CG, new ArrayList<String>());
+        } else {
+          cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+        }
+        // since index datamap prune in segment scope,
+        // the result need to intersect with previous pruned result
+        prunedBlocklets =
+            intersectFilteredBlocklets(carbonTable, prunedBlocklets, cgPrunedBlocklets);
+        if (ExplainCollector.enabled()) {
+          ExplainCollector.recordCGDataMapPruning(
+              DataMapWrapperSimpleInfo.fromDataMapWrapper(cgDataMapExprWrapper),
+              prunedBlocklets.size(), getBlockCount(prunedBlocklets));
+        }
+      }
+
+      if (prunedBlocklets.size() == 0) {
+        return prunedBlocklets;
+      }
+      // Now try to prune with FG DataMap.
+      if (isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) {
+        DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(filter.getResolver());
+        List<ExtendedBlocklet> fgPrunedBlocklets;
+        if (fgDataMapExprWrapper != null) {
+          // Prune segments from already pruned blocklets
+          DataMapUtil.pruneSegments(segmentIds, prunedBlocklets);
+          // Prune segments from already pruned blocklets
+          fgPrunedBlocklets = DataMapUtil
+              .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
+                  segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(),
+                  new ArrayList<String>());
+          // note that the 'fgPrunedBlocklets' has extra datamap related info compared with
+          // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
+          prunedBlocklets =
+              intersectFilteredBlocklets(carbonTable, prunedBlocklets, fgPrunedBlocklets);
+          ExplainCollector.recordFGDataMapPruning(
+              DataMapWrapperSimpleInfo.fromDataMapWrapper(fgDataMapExprWrapper),
+              prunedBlocklets.size(), getBlockCount(prunedBlocklets));
+        }
       }
     }
     LOG.info("Finished block pruning ...");
@@ -555,33 +635,15 @@ m filterExpression
     return prunedBlocklets;
   }
 
-  /**
-   * Prune the segments from the already pruned blocklets.
-   * @param segments
-   * @param prunedBlocklets
-   */
-  private void pruneSegments(List<Segment> segments, List<ExtendedBlocklet> prunedBlocklets) {
-    List<Segment> toBeRemovedSegments = new ArrayList<>();
-    for (Segment segment : segments) {
-      boolean found = false;
-      // Clear the old pruned index files if any present
-      segment.getFilteredIndexShardNames().clear();
-      // Check the segment exist in any of the pruned blocklets.
-      for (ExtendedBlocklet blocklet : prunedBlocklets) {
-        if (blocklet.getSegment().toString().equals(segment.toString())) {
-          found = true;
-          // Set the pruned index file to the segment for further pruning.
-          String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());
-          segment.setFilteredIndexShardName(shardName);
-        }
-      }
-      // Add to remove segments list if not present in pruned blocklets.
-      if (!found) {
-        toBeRemovedSegments.add(segment);
+
+  static List<InputSplit> convertToCarbonInputSplit(List<ExtendedBlocklet> extendedBlocklets) {
+    List<InputSplit> resultFilteredBlocks = new ArrayList<>();
+    for (ExtendedBlocklet blocklet : extendedBlocklets) {
+      if (blocklet != null) {
+        resultFilteredBlocks.add(blocklet.getInputSplit());
       }
     }
-    // Remove all segments which are already pruned from pruned blocklets
-    segments.removeAll(toBeRemovedSegments);
+    return resultFilteredBlocks;
   }
 
   @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index a7ca290..458c95e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -36,10 +36,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
-import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -57,10 +55,10 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.stream.StreamFile;
 import org.apache.carbondata.core.stream.StreamPruner;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -98,28 +96,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   private ReadCommittedScope readCommittedScope;
 
   /**
-   * Get the cached CarbonTable or create it by TableInfo in `configuration`
-   */
-  public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
-    if (carbonTable == null) {
-      // carbon table should be created either from deserialized table info (schema saved in
-      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
-      TableInfo tableInfo = getTableInfo(configuration);
-      CarbonTable carbonTable;
-      if (tableInfo != null) {
-        carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
-      } else {
-        carbonTable = SchemaReader.readCarbonTableFromStore(
-            getAbsoluteTableIdentifier(configuration));
-      }
-      this.carbonTable = carbonTable;
-      return carbonTable;
-    } else {
-      return this.carbonTable;
-    }
-  }
-
-  /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR
    * are used to get table path to read.
@@ -131,7 +107,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+    carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
@@ -140,7 +116,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
-    List<Segment> invalidSegments = new ArrayList<>();
+    List<String> invalidSegmentIds = new ArrayList<>();
     List<Segment> streamSegments = null;
     // get all valid segments and set them into the configuration
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier,
@@ -177,10 +153,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         setSegmentsToAccess(job.getConfiguration(),filteredNormalSegments);
       }
       // remove entry in the segment index if there are invalid segments
-      invalidSegments.addAll(segments.getInvalidSegments());
-      if (invalidSegments.size() > 0) {
+      for (Segment segment : segments.getInvalidSegments()) {
+        invalidSegmentIds.add(segment.getSegmentNo());
+      }
+      if (invalidSegmentIds.size() > 0) {
         DataMapStoreManager.getInstance()
-            .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), invalidSegments);
+            .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
+                invalidSegmentIds);
       }
     }
     List<Segment> validAndInProgressSegments = new ArrayList<>(segments.getValidSegments());
@@ -191,8 +170,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<Segment> filteredSegmentToAccess =
         getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), false,
             readCommittedScope);
-    // Clean the updated segments from memory if the update happens on segments
-    refreshSegmentCacheIfRequired(job, carbonTable, updateStatusManager, filteredSegmentToAccess);
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
@@ -216,7 +193,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     // do block filtering and get split
     List<InputSplit> splits =
         getSplits(job, filter, filteredSegmentToAccess, matchedPartitions, partitionInfo,
-            null, updateStatusManager);
+            null, updateStatusManager, segments.getInvalidSegments());
     // add all splits of streaming
     List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job, streamSegments, carbonTable);
     if (!splitsOfStreaming.isEmpty()) {
@@ -234,32 +211,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
    * @param filteredSegmentToAccess
    * @throws IOException
    */
-  public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTable,
-      SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
-      throws IOException {
-    List<Segment> toBeCleanedSegments = new ArrayList<>();
-    for (Segment filteredSegment : filteredSegmentToAccess) {
-      boolean refreshNeeded =
-          DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
-              .isRefreshNeeded(filteredSegment,
-                  updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
-      if (refreshNeeded) {
-        toBeCleanedSegments.add(filteredSegment);
-      }
-    }
-    // Clean segments if refresh is needed
-    for (Segment segment : filteredSegmentToAccess) {
-      if (DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
-          .isRefreshNeeded(segment.getSegmentNo())) {
-        toBeCleanedSegments.add(segment);
-      }
-    }
-    if (toBeCleanedSegments.size() > 0) {
-      DataMapStoreManager.getInstance()
-          .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
-              toBeCleanedSegments);
-    }
-  }
+
 
   /**
    * Below method will be used to get the filter segments when query is fired on pre Aggregate
@@ -435,7 +387,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
       // do block filtering and get split
       List<InputSplit> splits = getSplits(job, filter, segmentList, matchedPartitions,
-          partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(carbonTable));
+          partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(carbonTable),
+          new ArrayList<Segment>());
       return splits;
     } catch (IOException e) {
       throw new RuntimeException("Can't get splits of the target segment ", e);
@@ -479,8 +432,20 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
    */
   private List<InputSplit> getSplits(JobContext job, Expression expression,
       List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
-      List<Integer> oldPartitionIdList, SegmentUpdateStatusManager updateStatusManager)
-      throws IOException {
+      List<Integer> oldPartitionIdList, SegmentUpdateStatusManager updateStatusManager,
+      List<Segment> invalidSegments) throws IOException {
+
+    List<String> segmentsToBeRefreshed = new ArrayList<>();
+    if (!CarbonProperties.getInstance()
+        .isDistributedPruningEnabled(carbonTable.getDatabaseName(), carbonTable.getTableName())) {
+      // Clean the updated segments from memory if the update happens on segments
+      DataMapStoreManager.getInstance().refreshSegmentCacheIfRequired(carbonTable,
+          updateStatusManager,
+          validSegments);
+    } else {
+      segmentsToBeRefreshed = DataMapStoreManager.getInstance()
+          .getSegmentsToBeRefreshed(carbonTable, updateStatusManager, validSegments);
+    }
 
     numSegments = validSegments.size();
     List<InputSplit> result = new LinkedList<InputSplit>();
@@ -491,8 +456,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
     // for each segment fetch blocks matching filter in Driver BTree
     List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, carbonTable, expression, matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList);
+        getDataBlocksOfSegment(job, carbonTable, expression, matchedPartitions, validSegments,
+            partitionInfo, oldPartitionIdList, invalidSegments, segmentsToBeRefreshed);
     numBlocks = dataBlocksOfSegment.size();
     for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
 
@@ -550,7 +515,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     ExplainCollector.remove();
 
     AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
-    TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
 
     ReadCommittedScope readCommittedScope = getReadCommitted(job, identifier);
     LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
@@ -572,26 +536,42 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     For NonTransactional table, one of the reason for a segment refresh is below scenario.
     SDK is written one set of files with UUID, with same UUID it can write again.
     So, latest files content should reflect the new count by refreshing the segment */
-    List<Segment> toBeCleanedSegments = new ArrayList<>();
+    List<String> toBeCleanedSegments = new ArrayList<>();
     for (Segment eachSegment : filteredSegment) {
       boolean refreshNeeded = DataMapStoreManager.getInstance()
           .getTableSegmentRefresher(getOrCreateCarbonTable(job.getConfiguration()))
           .isRefreshNeeded(eachSegment,
               updateStatusManager.getInvalidTimestampRange(eachSegment.getSegmentNo()));
       if (refreshNeeded) {
-        toBeCleanedSegments.add(eachSegment);
+        toBeCleanedSegments.add(eachSegment.getSegmentNo());
       }
     }
-    // remove entry in the segment index if there are invalid segments
-    toBeCleanedSegments.addAll(allSegments.getInvalidSegments());
+    for (Segment segment : allSegments.getInvalidSegments()) {
+      // remove entry in the segment index if there are invalid segments
+      toBeCleanedSegments.add(segment.getSegmentNo());
+    }
     if (toBeCleanedSegments.size() > 0) {
       DataMapStoreManager.getInstance()
           .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
               toBeCleanedSegments);
     }
     if (isIUDTable || isUpdateFlow) {
-      Map<String, Long> blockletToRowCountMap =
-          defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap);
+      Map<String, Long> blockletToRowCountMap = new HashMap<>();
+      if (CarbonProperties.getInstance().isDistributedPruningEnabled(table.getDatabaseName(),
+          table.getTableName())) {
+        List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
+            getDistributedSplit(table, null, partitions, filteredSegment,
+                allSegments.getInvalidSegments(), toBeCleanedSegments));
+        for (InputSplit extendedBlocklet : extendedBlocklets) {
+          CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
+          blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blocklet.getFilePath(),
+              (long) blocklet.getDetailInfo().getRowCount());
+        }
+      } else {
+        TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
+        blockletToRowCountMap.putAll(
+            defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap));
+      }
       // key is the (segmentId","+blockletPath) and key is the row count of that blocklet
       for (Map.Entry<String, Long> eachBlocklet : blockletToRowCountMap.entrySet()) {
         String[] segmentIdAndPath = eachBlocklet.getKey().split(",", 2);
@@ -620,7 +600,19 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         }
       }
     } else {
-      long totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap);
+      long totalRowCount = 0L;
+      if (CarbonProperties.getInstance().isDistributedPruningEnabled(table.getDatabaseName(),
+          table.getTableName())) {
+        List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
+            getDistributedSplit(table, null, partitions, filteredSegment,
+                allSegments.getInvalidSegments(), new ArrayList<String>()));
+        for (InputSplit extendedBlocklet : extendedBlocklets) {
+          totalRowCount += ((CarbonInputSplit) extendedBlocklet).getDetailInfo().getRowCount();
+        }
+      } else {
+        TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
+        totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap);
+      }
       blockRowCountMapping.put(CarbonCommonConstantsInternal.ROW_COUNT, totalRowCount);
     }
     return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index ccc0594..4d3f73a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -163,7 +163,7 @@ public class CarbonInputFormatUtil {
    * @throws IOException
    */
   public static void setDataMapJobIfConfigured(Configuration conf) throws IOException {
-    String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
+    String className = "org.apache.carbondata.indexserver.EmbeddedDataMapJob";
     DataMapUtil.setDataMapJob(conf, DataMapUtil.createDataMapJob(className));
   }
 
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 92a49dd..ac0ca8b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -20,18 +20,18 @@ package org.apache.carbondata.spark.testsuite.allqueries
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.dev.DataMap
 import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment, TableDataMap}
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
+import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, BlockletDataMap, BlockletDataMapRowIndexes}
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema
-import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 69334a0..97aaa23 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -114,14 +114,6 @@ class CGDataMapFactory(
     }.toList.asJava
   }
 
-
-  /**
-   * Clears datamap of the segment
-   */
-  override def clear(segment: Segment): Unit = {
-
-  }
-
   /**
    * Clear all datamaps from memory
    */
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 449ffa0..f777908 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -47,8 +47,6 @@ class C2DataMapFactory(
 
   override def fireEvent(event: Event): Unit = ???
 
-  override def clear(segment: Segment): Unit = {}
-
   override def clear(): Unit = {}
 
   override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index ff77820..0be4970 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -98,6 +98,8 @@ class FGDataMapFactory(carbonTable: CarbonTable,
     val files = file.listFiles()
     files.map { f =>
       val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+      d.setSegment(segment)
+      d.setDataMapSchema(getDataMapSchema)
       d
     }.toList.asJava
   }
@@ -111,12 +113,6 @@ class FGDataMapFactory(carbonTable: CarbonTable,
   }
 
   /**
-   * Clears datamap of the segment
-   */
-  override def clear(segment: Segment): Unit = {
-  }
-
-  /**
    * Clear all datamaps from memory
    */
   override def clear(): Unit = {
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index 683098e..bfc67cf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -226,8 +226,6 @@ class TestDataMapFactory(
 
   override def fireEvent(event: Event): Unit = ???
 
-  override def clear(segmentId: Segment): Unit = {}
-
   override def clear(): Unit = {}
 
   override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = {
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index bbe1368..1bc4411 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -306,8 +306,6 @@ class WaitingDataMapFactory(
 
   override def fireEvent(event: Event): Unit = ???
 
-  override def clear(segmentId: Segment): Unit = {}
-
   override def clear(): Unit = {}
 
   override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 4b29e77..b0af2ea 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -58,7 +58,7 @@ import org.apache.carbondata.streaming.parser.FieldConverter
 
 object CarbonScalaUtil {
 
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   def getString(value: Any,
       serializationNullFormat: String,
@@ -700,4 +700,11 @@ object CarbonScalaUtil {
     newColumnSchemas
   }
 
+  def logTime[T](f: => T): (T, Long) = {
+    val startTime = System.currentTimeMillis()
+    val response = f
+    val endTime = System.currentTimeMillis() - startTime
+    (response, endTime)
+  }
+
 }
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 656d8eb..0861d2b 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -95,7 +95,7 @@ object DistributionUtil {
     while (iface.hasMoreElements) {
       addresses = iface.nextElement().getInterfaceAddresses.asScala.toList ++ addresses
     }
-    val inets = addresses.map(_.getAddress.getHostAddress)
+    val inets = addresses.map(_.getAddress.getHostName)
     inets
   }
 
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index 487148d..8e1f22a 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -93,7 +93,7 @@ public class IndexDataMapProvider extends DataMapProvider {
     if (mainTable == null) {
       throw new UnsupportedOperationException("Table need to be specified in index datamaps");
     }
-    DataMapStoreManager.getInstance().clearDataMap(
+    DataMapStoreManager.getInstance().deleteDataMap(
         mainTable.getAbsoluteTableIdentifier(), getDataMapSchema().getDataMapName());
   }
 
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
new file mode 100644
index 0000000..b03beca
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.indexserver
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.log4j.Logger
+import org.apache.spark.util.SizeEstimator
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat}
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet
+import org.apache.carbondata.spark.util.CarbonScalaUtil.logTime
+
+/**
+ * Spark job to execute datamap job and prune all the datamaps distributable. This job will prune
+ * and cache the appropriate datamaps in executor LRUCache.
+ */
+class DistributedDataMapJob extends AbstractDataMapJob {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def execute(dataMapFormat: DistributableDataMapFormat): util.List[ExtendedBlocklet] = {
+    if (LOGGER.isDebugEnabled) {
+      val messageSize = SizeEstimator.estimate(dataMapFormat)
+      LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
+    }
+    val (resonse, time) = logTime {
+      IndexServer.getClient.getSplits(dataMapFormat).toList.asJava
+    }
+    LOGGER.info(s"Time taken to get response from server: $time ms")
+    resonse
+  }
+}
+
+/**
+ * Spark job to execute datamap job and prune all the datamaps distributable. This job will just
+ * prune the datamaps but will not cache in executors.
+ */
+class EmbeddedDataMapJob extends AbstractDataMapJob {
+
+  override def execute(dataMapFormat: DistributableDataMapFormat): util.List[ExtendedBlocklet] = {
+    IndexServer.getSplits(dataMapFormat).toList.asJava
+  }
+
+}
+
+class DistributedClearCacheJob extends AbstractDataMapJob {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def execute(dataMapFormat: DistributableDataMapFormat): util.List[ExtendedBlocklet] = {
+    if (LOGGER.isDebugEnabled) {
+      val messageSize = SizeEstimator.estimate(dataMapFormat)
+      LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
+    }
+    val (response, time) = logTime {
+      IndexServer.getClient.invalidateCache(dataMapFormat)
+      new util.ArrayList[ExtendedBlocklet]()
+    }
+    LOGGER.info(s"Time taken to get response from server: $time ms")
+    response
+  }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
similarity index 51%
rename from integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
rename to integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index 968739b..fd59e2b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -14,71 +14,73 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.carbondata.spark.rdd
+
+package org.apache.carbondata.indexserver
 
 import java.text.SimpleDateFormat
-import java.util
 import java.util.Date
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, TaskContext, TaskKilledException}
+import org.apache.spark.{Partition, SparkEnv, TaskContext, TaskKilledException}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.sql.hive.DistributionUtil
 
-import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.datamap.DistributableDataMapFormat
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.spark.rdd.CarbonRDD
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 
-/**
- * Spark job to execute datamap job and prune all the datamaps distributable
- */
-class SparkDataMapJob extends AbstractDataMapJob {
+private[indexserver] class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
+  extends Partition {
 
-  override def execute(dataMapFormat: DistributableDataMapFormat,
-      filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {
-    new DataMapPruneRDD(SparkSQLUtil.getSparkSession, dataMapFormat, filter).collect()
-      .toList.asJava
-  }
-}
-
-class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) extends Partition {
   override def index: Int = idx
 
   override def hashCode(): Int = 41 * (41 + rddId) + idx
 }
 
-/**
- * RDD to prune the datamaps across spark cluster
- * @param ss
- * @param dataMapFormat
- */
-class DataMapPruneRDD(
-    @transient private val ss: SparkSession,
-    dataMapFormat: DistributableDataMapFormat,
-    resolverIntf: FilterResolverIntf)
-  extends CarbonRDD[(ExtendedBlocklet)](ss, Nil) {
+private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkSession,
+    dataMapFormat: DistributableDataMapFormat)
+  extends CarbonRDD[(String, ExtendedBlocklet)](ss, Nil) {
+
+  val executorsList: Set[String] = DistributionUtil.getNodeList(ss.sparkContext).toSet
+
+  @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
+    .getName)
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     formatter.format(new Date())
   }
 
+  override protected def getPreferredLocations(split: Partition): Seq[String] = {
+    split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
+  }
+
   override def internalCompute(split: Partition,
-      context: TaskContext): Iterator[ExtendedBlocklet] = {
+      context: TaskContext): Iterator[(String, ExtendedBlocklet)] = {
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
     val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
     val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
     reader.initialize(inputSplit, attemptContext)
+    val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
+      CacheProvider.getInstance().getCarbonCache.getCurrentSize
+    } else {
+      0L
+    }
     context.addTaskCompletionListener(_ => {
-      reader.close()
+      if (reader != null) {
+        reader.close()
+      }
     })
-    val iter = new Iterator[ExtendedBlocklet] {
+    val iter: Iterator[(String, ExtendedBlocklet)] = new Iterator[(String, ExtendedBlocklet)] {
 
       private var havePair = false
       private var finished = false
@@ -94,12 +96,13 @@ class DataMapPruneRDD(
         !finished
       }
 
-      override def next(): ExtendedBlocklet = {
+      override def next(): (String, ExtendedBlocklet) = {
         if (!hasNext) {
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        val value = reader.getCurrentValue
+        val executorIP = SparkEnv.get.blockManager.blockManagerId.host
+        val value = (executorIP + "_" + cacheSize.toString, reader.getCurrentValue)
         value
       }
     }
@@ -108,7 +111,18 @@ class DataMapPruneRDD(
 
   override protected def internalGetPartitions: Array[Partition] = {
     val job = Job.getInstance(FileFactory.getConfiguration)
-    val splits = dataMapFormat.getSplits(job)
-    splits.asScala.zipWithIndex.map(f => new DataMapRDDPartition(id, f._2, f._1)).toArray
+    val splits = dataMapFormat.getSplits(job).asScala
+    if (dataMapFormat.isFallbackJob || splits.isEmpty) {
+      splits.zipWithIndex.map {
+        f => new DataMapRDDPartition(id, f._2, f._1)
+      }.toArray
+    } else {
+      val (response, time) = CarbonScalaUtil.logTime {
+        DistributedRDDUtils.getExecutors(splits.toArray, executorsList, dataMapFormat
+          .getCarbonTable.getTableUniqueName, id)
+      }
+      LOGGER.debug(s"Time taken to assign executors to ${splits.length} is $time ms")
+      response.toArray
+    }
   }
 }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
new file mode 100644
index 0000000..c381f80
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.indexserver
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.spark.Partition
+
+import org.apache.carbondata.core.datamap.{DataMapDistributable, Segment}
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
+
+object DistributedRDDUtils {
+  // Segment number to executorNode mapping
+  val segmentToExecutorMapping: java.util.Map[String, String] =
+    new ConcurrentHashMap[String, String]()
+
+  // executorNode to segmentSize mapping
+  val executorToCacheSizeMapping: java.util.Map[String, Long] =
+    new ConcurrentHashMap[String, Long]()
+
+  def getExecutors(segment: Array[InputSplit], executorsList : Set[String],
+      tableUniqueName: String, rddId: Int): Seq[Partition] = {
+    // sort the partitions in increasing order of index size.
+    val (segments, legacySegments) = segment.span(split => split
+      .asInstanceOf[DataMapDistributableWrapper].getDistributable.getSegment.getIndexSize > 0)
+    val sortedPartitions = segments.sortWith(_.asInstanceOf[DataMapDistributableWrapper]
+                                              .getDistributable.getSegment.getIndexSize >
+                                            _.asInstanceOf[DataMapDistributableWrapper]
+                                              .getDistributable.getSegment.getIndexSize)
+    val executorCache = DistributedRDDUtils.executorToCacheSizeMapping
+    // check if any executor is dead.
+    val invalidExecutors = executorCache.keySet().asScala.diff(executorsList)
+    if (invalidExecutors.nonEmpty) {
+      // extract the dead executor host name
+      DistributedRDDUtils.invalidateExecutors(invalidExecutors.toSeq)
+    }
+    (convertToPartition(legacySegments, tableUniqueName, executorsList) ++
+     convertToPartition(sortedPartitions, tableUniqueName, executorsList)).zipWithIndex.map {
+      case (dataMapDistributable, index) =>
+        new DataMapRDDPartition(rddId, index, dataMapDistributable)
+    }
+  }
+
+  private def convertToPartition(segments: Seq[InputSplit], tableUniqueName: String,
+      executorList: Set[String]): Seq[InputSplit] = {
+    segments.map { partition =>
+      val wrapper: DataMapDistributable = partition.asInstanceOf[DataMapDistributableWrapper]
+        .getDistributable
+      if (wrapper.getSegment.getIndexSize == 0L) {
+        wrapper.getSegment.setIndexSize(1L)
+      }
+      wrapper.setLocations(Array(DistributedRDDUtils
+        .assignExecutor(tableUniqueName, wrapper.getSegment, executorList)))
+      partition
+    }
+  }
+
+  /**
+   * Update the cache size returned by the executors to the driver mapping.
+   */
+  def updateExecutorCacheSize(cacheSizes: Set[String]): Unit = {
+    synchronized {
+      cacheSizes.foreach {
+        executorCacheSize =>
+          // executorCacheSize would be in the form of 127.0.0.1_10024 where the left of '_'
+          // would be the executor IP and the right would be the cache that executor is holding.
+          val executorIP = executorCacheSize.substring(0, executorCacheSize.lastIndexOf('_'))
+          val size = executorCacheSize.substring(executorCacheSize.lastIndexOf('_') + 1,
+            executorCacheSize.length)
+          executorToCacheSizeMapping.put(executorIP, size.toLong)
+      }
+    }
+  }
+
+  def invalidateCache(tableUniqueName: String): Unit = {
+    segmentToExecutorMapping.keySet().asScala.foreach {
+      key =>
+        if (key.split("_")(0).equalsIgnoreCase(tableUniqueName)) {
+          segmentToExecutorMapping.remove(key)
+        }
+    }
+  }
+
+  /**
+   * Invalidate the dead executors from the mapping and assign the segments to some other
+   * executor, so that the query can load the segments to the new assigned executor.
+   */
+  def invalidateExecutors(invalidExecutors: Seq[String]): Unit = synchronized {
+    // remove all invalidExecutor mapping from cache.
+    for ((key: String, value: String) <- segmentToExecutorMapping.asScala) {
+      // find the invalid executor in cache.
+      if (invalidExecutors.contains(value)) {
+        // remove mapping for the invalid executor.
+        val invalidExecutorSize = executorToCacheSizeMapping.remove(key)
+        // find a new executor for the segment
+        val reassignedExecutor = getLeastLoadedExecutor
+        segmentToExecutorMapping.put(key, reassignedExecutor)
+        // add the size size of the invalid executor to the reassigned executor.
+        executorToCacheSizeMapping.put(
+          reassignedExecutor,
+          executorToCacheSizeMapping.get(reassignedExecutor) + invalidExecutorSize
+        )
+      }
+    }
+  }
+
+  /**
+   * Sorts the executor cache based on the size each one is handling and returns the least of them.
+   *
+   * @return
+   */
+  private def getLeastLoadedExecutor: String = {
+    executorToCacheSizeMapping.asScala.toSeq.sortWith(_._2 < _._2).head._1
+  }
+
+  /**
+   * Assign a executor for the current segment. If a executor was previously assigned to the
+   * segment then the same would be returned.
+   *
+   * @return
+   */
+  def assignExecutor(tableName: String, segment: Segment, validExecutors: Set[String]): String = {
+    val cacheKey = s"${ tableName }_${ segment.getSegmentNo }"
+    val executor = segmentToExecutorMapping.get(cacheKey)
+    if (executor != null) {
+      executor
+    } else {
+      // check if any executor is not assigned. If yes then give priority to that executor
+      // otherwise get the executor which has handled the least size.
+      val unassignedExecutors = validExecutors
+        .diff(executorToCacheSizeMapping.asScala.keys.toSet)
+      val newExecutor = if (unassignedExecutors.nonEmpty) {
+        unassignedExecutors.head.split(":")(0)
+      } else {
+        val identifiedExecutor = getLeastLoadedExecutor
+        identifiedExecutor
+      }
+      val existingExecutorSize = executorToCacheSizeMapping.get(newExecutor)
+      executorToCacheSizeMapping.put(newExecutor, existingExecutorSize + segment.getIndexSize
+        .toInt)
+      segmentToExecutorMapping.put(cacheKey, newExecutor)
+      newExecutor
+    }
+  }
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
new file mode 100644
index 0000000..0069d86
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.indexserver
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.hive.DistributionUtil
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.hadoop.CarbonInputSplit
+import org.apache.carbondata.spark.rdd.CarbonRDD
+
+class DistributedShowCacheRDD(@transient private val ss: SparkSession, tableName: String)
+  extends CarbonRDD[String](ss, Nil) {
+
+  val executorsList: Array[String] = DistributionUtil.getNodeList(ss.sparkContext)
+
+  override protected def internalGetPartitions: Array[Partition] = {
+    executorsList.zipWithIndex.map {
+      case (executor, idx) =>
+        // create a dummy split for each executor to accumulate the cache size.
+        val dummySplit = new CarbonInputSplit()
+        dummySplit.setLocation(Array(executor))
+        new DataMapRDDPartition(id, idx, dummySplit)
+    }
+  }
+
+  override def internalCompute(split: Partition, context: TaskContext): Iterator[String] = {
+    val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala
+    val iterator = dataMaps.collect {
+      case (table, tableDataMaps) if table.isEmpty ||
+                                     (tableName.nonEmpty && tableName.equalsIgnoreCase(table)) =>
+        val sizeAndIndexLengths = tableDataMaps.asScala
+          .map(_.getBlockletDetailsFetcher.getCacheSize)
+        // return tableName_indexFileLength_indexCachesize for each executor.
+        sizeAndIndexLengths.map {
+          x => s"$table:$x"
+        }
+    }.flatten.toIterator
+    iterator
+  }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
new file mode 100644
index 0000000..194f4cb
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.indexserver
+
+import java.net.InetSocketAddress
+import java.security.PrivilegedAction
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.ipc.{ProtocolInfo, RPC}
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.{KerberosInfo, SecurityUtil, UserGroupInformation}
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.{CarbonSession, SparkSession}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DistributableDataMapFormat
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet
+import org.apache.carbondata.core.util.CarbonProperties
+
+@ProtocolInfo(protocolName = "Server", protocolVersion = 1)
+@KerberosInfo(serverPrincipal = "spark.carbon.indexserver.principal",
+  clientPrincipal = "spark.carbon.indexserver.principal")
+trait ServerInterface {
+  /**
+   * Used to prune and cache the datamaps for the table.
+   */
+  def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet]
+
+  /**
+   * Invalidate the cache for the provided table.
+   */
+  def invalidateCache(request: DistributableDataMapFormat): Unit
+
+  /**
+   * Get the cache size for the specified table.
+   */
+  def showCache(tableName: String) : Array[String]
+
+  /**
+   * Invalidate the cache for the specified segments only. Used in case of compaction/Update/Delete.
+   */
+  def invalidateSegmentCache(databaseName: String,
+      tableName: String,
+      segmentIds: Array[String]): Unit
+}
+
+/**
+ * An instance of a distributed Index Server which will be used for:
+ * 1. Pruning the datamaps in a distributed way by using the executors.
+ * 2. Caching the pruned datamaps in executor size to be reused in the next query.
+ * 3. Getting the size of the datamaps cached in the executors.
+ * 4. Clearing the datamaps for a table or for the specified invalid segments.
+ *
+ * Start using ./bin/start-indexserver.sh
+ * Stop using ./bin/stop-indexserver.sh
+ */
+object IndexServer extends ServerInterface {
+
+  val isDistributedPruning: Boolean =
+    CarbonProperties.getInstance().isDistributedPruningEnabled("", "")
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  private val serverIp: String = CarbonProperties.getInstance().getIndexServerIP
+
+  private lazy val serverPort: Int = CarbonProperties.getInstance().getIndexServerPort
+
+  private val numHandlers: Int = CarbonProperties.getInstance().getNumberOfHandlersForIndexServer
+
+  /**
+   * Getting sparkSession from ActiveSession because in case of embedded mode the session would
+   * have already been created whereas in case of distributed mode the session would be
+   * created by the main method after some validations.
+   */
+  private lazy val sparkSession: SparkSession = SparkSQLUtil.getSparkSession
+
+  private def doAs[T](f: => T): T = {
+    UserGroupInformation.getLoginUser.doAs(new PrivilegedAction[T] {
+      override def run(): T = {
+        f
+      }
+    })
+  }
+
+  def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet] = doAs {
+    val splits = new DistributedPruneRDD(sparkSession, request).collect()
+    DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+    splits.map(_._2)
+  }
+
+  override def invalidateCache(request: DistributableDataMapFormat): Unit = doAs {
+    val splits = new DistributedPruneRDD(sparkSession, request).collect()
+    DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+    if (request.isJobToClearDataMaps) {
+      DistributedRDDUtils.invalidateCache(request.getCarbonTable.getTableUniqueName)
+    }
+  }
+
+  override def invalidateSegmentCache(databaseName: String, tableName: String,
+      segmentIds: Array[String]): Unit = doAs {
+    new InvalidateSegmentCacheRDD(sparkSession, databaseName, tableName, segmentIds.toList)
+      .collect()
+  }
+
+  override def showCache(tableName: String = ""): Array[String] = doAs {
+    new DistributedShowCacheRDD(sparkSession, tableName).collect()
+  }
+
+  def main(args: Array[String]): Unit = {
+    if (serverIp.isEmpty) {
+      throw new RuntimeException(s"Please set the server IP to use Index Cache Server")
+    } else if (!isDistributedPruning) {
+      throw new RuntimeException(
+        s"Please set ${ CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER }" +
+        s" as true to use index server")
+    } else {
+      createCarbonSession()
+      LOGGER.info("Starting Index Cache Server")
+      val conf = new Configuration()
+      val server: RPC.Server = new RPC.Builder(conf).setInstance(this)
+        .setBindAddress(serverIp)
+        .setPort(serverPort)
+        .setNumHandlers(numHandlers)
+        .setProtocol(classOf[ServerInterface]).build
+      server.start()
+      SecurityUtil.login(sparkSession.asInstanceOf[CarbonSession].sessionState.newHadoopConf(),
+        "spark.carbon.indexserver.keytab", "spark.carbon.indexserver.principal")
+      sparkSession.sparkContext.addSparkListener(new SparkListener {
+        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+          LOGGER.info("Spark Application has ended. Stopping the Index Server")
+          server.stop()
+        }
+      })
+      LOGGER.info(s"Index cache server running on ${ server.getPort } port")
+    }
+  }
+
+  private def createCarbonSession(): SparkSession = {
+    import org.apache.spark.sql.CarbonSession._
+    val spark = SparkSession
+      .builder().config(new SparkConf())
+      .appName("DistributedIndexServer")
+      .enableHiveSupport()
+      .getOrCreateCarbonSession(CarbonProperties.getStorePath)
+    SparkSession.setActiveSession(spark)
+    SparkSession.setDefaultSession(spark)
+    spark
+  }
+
+  /**
+   * @return Return a new Client to communicate with the Index Server.
+   */
+  def getClient: ServerInterface = {
+    import org.apache.hadoop.ipc.RPC
+    RPC.getProxy(classOf[ServerInterface],
+      RPC.getProtocolVersion(classOf[ServerInterface]),
+      new InetSocketAddress(serverIp, serverPort), UserGroupInformation.getLoginUser,
+      FileFactory.getConfiguration, NetUtils.getDefaultSocketFactory(FileFactory.getConfiguration))
+  }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
new file mode 100644
index 0000000..1aa8cd9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.indexserver
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.DistributionUtil
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.hadoop.CarbonInputSplit
+import org.apache.carbondata.spark.rdd.CarbonRDD
+
+class InvalidateSegmentCacheRDD(@transient private val ss: SparkSession, databaseName: String,
+    tableName: String, invalidSegmentIds: List[String]) extends CarbonRDD[String](ss, Nil) {
+
+  val carbonTable: CarbonTable = CarbonEnv
+    .getCarbonTable(TableIdentifier(tableName, Some(databaseName)))(ss)
+
+  val executorsList: Array[String] = DistributionUtil.getNodeList(ss.sparkContext)
+
+  override def internalCompute(split: Partition,
+      context: TaskContext): Iterator[String] = {
+    DataMapStoreManager.getInstance().clearInvalidSegments(carbonTable, invalidSegmentIds.asJava)
+    Iterator.empty
+  }
+
+  override protected def internalGetPartitions: Array[Partition] = {
+    executorsList.zipWithIndex.map {
+      case (executor, idx) =>
+        // create a dummy split for each executor to accumulate the cache size.
+        val dummySplit = new CarbonInputSplit()
+        dummySplit.setLocation(Array(executor))
+        new DataMapRDDPartition(id, idx, dummySplit)
+    }
+  }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8a04887..82c893f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -66,6 +66,7 @@ import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentSta
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
@@ -253,6 +254,12 @@ object CarbonDataRDDFactory {
                 skipCompactionTables.asJava)
             }
           }
+          // Remove compacted segments from executor cache.
+          if (CarbonProperties.getInstance().isDistributedPruningEnabled(
+              carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) {
+            IndexServer.getClient.invalidateSegmentCache(carbonLoadModel.getDatabaseName,
+              carbonLoadModel.getTableName, compactedSegments.asScala.toArray)
+          }
           // giving the user his error for telling in the beeline if his triggered table
           // compaction is failed.
           if (!triggeredCompactionStatus) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index cfceea4..e4c1620 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
index 8fb3cc1..038f726 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
@@ -74,9 +74,4 @@ class MergeBloomIndexEventListener extends OperationEventListener with Logging {
     }
   }
 
-
-  private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = {
-    DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName)
-  }
-
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
index 18402e9..158bd1f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
@@ -26,7 +26,10 @@ import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.datamap.bloom.{BloomCacheKeyValue, BloomCoarseGrainDataMapFactory}
+import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 
 
@@ -41,7 +44,17 @@ object CacheUtil {
   def getAllIndexFiles(carbonTable: CarbonTable): List[String] = {
     if (carbonTable.isTransactionalTable) {
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      CarbonDataMergerUtil.getValidSegmentList(absoluteTableIdentifier).asScala.flatMap {
+      val validAndInvalidSegmentsInfo = new SegmentStatusManager(absoluteTableIdentifier)
+        .getValidAndInvalidSegments()
+      // Fire a job to clear the invalid segments cached in the executors.
+      if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName,
+        carbonTable.getTableName)) {
+        val invalidSegmentIds = validAndInvalidSegmentsInfo.getInvalidSegments.asScala
+          .map(_.getSegmentNo).toArray
+        IndexServer.getClient.invalidateSegmentCache(carbonTable.getDatabaseName, carbonTable
+          .getTableName, invalidSegmentIds)
+      }
+      validAndInvalidSegmentsInfo.getValidSegments.asScala.flatMap {
         segment =>
           segment.getCommittedIndexFile.keySet().asScala
       }.map { indexFile =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
index a0bb43e..909c196 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
@@ -25,7 +25,9 @@ import org.apache.spark.sql.execution.command.MetadataCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.datamap.DataMapUtil
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{DropTableCacheEvent, OperationContext, OperationListenerBus}
 
 case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall: Boolean = false)
@@ -48,16 +50,19 @@ case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall
 
     val cache = CacheProvider.getInstance().getCarbonCache
     if (cache != null) {
-
       // Get all Index files for the specified table.
-      val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)
-
-      // Extract dictionary keys for the table and create cache keys from those
-      val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
+      if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName,
+        carbonTable.getTableName)) {
+        DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME)
+      } else {
+        val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)
+        // Extract dictionary keys for the table and create cache keys from those
+        val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
 
-      // Remove elements from cache
-      val keysToRemove = allIndexFiles ++ dictKeys
-      cache.removeAll(keysToRemove.asJava)
+        // Remove elements from cache
+        val keysToRemove = allIndexFiles ++ dictKeys
+        cache.removeAll(keysToRemove.asJava)
+      }
     }
     LOGGER.info("Drop cache request served for table " + carbonTable.getTableUniqueName)
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index 3b85313..6f6ddea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -32,8 +32,11 @@ import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo
 import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, ShowTableCacheEvent}
+import org.apache.carbondata.indexserver.IndexServer
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 import org.apache.carbondata.spark.util.CommonUtil.bytesToDisplaySize
 
 
@@ -41,7 +44,7 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
     internalCall: Boolean = false)
   extends MetadataCommand {
 
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private val LOGGER = LogServiceFactory.getLogService(classOf[CarbonShowCacheCommand].getName)
 
   override def output: Seq[AttributeReference] = {
     if (tableIdentifier.isEmpty) {
@@ -171,14 +174,16 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
     OperationListenerBus.getInstance.fireEvent(showTableCacheEvent, operationContext)
 
     // Get all Index files for the specified table in cache
-    val indexFilesInCache: List[String] = allIndexFiles.filter {
-      indexFile =>
-        cache.get(indexFile) != null
+    val (indexFilesLength, size) = if (CarbonProperties.getInstance()
+        .isDistributedPruningEnabled(carbonTable.getDatabaseName, carbonTable.getTableName)) {
+      getTableCache(carbonTable.getTableUniqueName)
+    } else {
+      val memorySizeForEachIndexFile: List[Long] = allIndexFiles.collect {
+        case indexFile if cache.get(indexFile) != null =>
+          cache.get(indexFile).getMemorySize
+      }
+      (memorySizeForEachIndexFile.length, memorySizeForEachIndexFile.sum)
     }
-    val sizeOfIndexFilesInCache: Long = indexFilesInCache.map {
-      indexFile =>
-        cache.get(indexFile).getMemorySize
-    }.sum
 
     // Extract dictionary keys for the table and create cache keys from those
     val dictKeys = CacheUtil.getAllDictCacheKeys(carbonTable)
@@ -194,17 +199,46 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
       case (name, (provider, indexSize, dmSize)) =>
         Row(name, indexSize, dmSize, provider)
     }.toSeq
-
-    var comments = indexFilesInCache.size + "/" + allIndexFiles.size + " index files cached"
+    var comments = indexFilesLength + "/" + allIndexFiles.size + " index files cached"
     if (!carbonTable.isTransactionalTable) {
       comments += " (external table)"
     }
     Seq(
-      Row("Index", sizeOfIndexFilesInCache, comments),
+      Row("Index", size, comments),
       Row("Dictionary", sizeOfDictInCache, "")
     ) ++ otherDatamapsResults
   }
 
+  private lazy val cacheResult: Seq[(String, Int, Long)] = {
+    val tableUniqueName = tableIdentifier match {
+      case Some(identifier) => s"${
+        identifier.database.getOrElse(SparkSession.getActiveSession
+          .get.catalog.currentDatabase)
+      }_${ identifier.table }"
+      case None => ""
+    }
+    val (result, time) = CarbonScalaUtil.logTime {
+      IndexServer.getClient.showCache(tableUniqueName).map(_.split(":"))
+        .groupBy(_.head).map { t =>
+        var sum = 0L
+        var length = 0
+        t._2.foreach {
+          arr =>
+            sum += arr(2).toLong
+            length += arr(1).toInt
+        }
+        (t._1, length, sum)
+      }
+    }
+    LOGGER.info(s"Time taken to get cache results from Index Server is $time ms")
+    result.toList
+  }
+
+  private def getTableCache(tableName: String): (Int, Long) = {
+    val (_, indexFileLength, cacheSize) = cacheResult.find(_._1 == tableName).getOrElse(("", 0, 0L))
+    (indexFileLength, cacheSize)
+  }
+
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     if (tableIdentifier.isEmpty) {
       /**
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 709260e..8b8fe0d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -28,7 +28,9 @@ import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.loading.FailureCauses
 
 /**
@@ -98,7 +100,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
       // handle the clean up of IUD.
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-      DeleteExecution.deleteDeltaExecution(
+      val deletedSegments = DeleteExecution.deleteDeltaExecution(
         databaseNameOp,
         tableName,
         sparkSession,
@@ -110,6 +112,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
       HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
         isUpdateOperation = false)
 
+      DeleteExecution.clearDistributedSegmentCache(carbonTable, deletedSegments)
 
       if (executorErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executorErrors.errorMsg)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 705ba4b..9fbf745 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
+import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.loading.FailureCauses
 
 private[sql] case class CarbonProjectForUpdateCommand(
@@ -153,6 +154,9 @@ private[sql] case class CarbonProjectForUpdateCommand(
             currentTime,
             executionErrors,
             segmentsToBeDeleted)
+
+          DeleteExecution.clearDistributedSegmentCache(carbonTable, segmentsToBeDeleted)
+
         } else {
           throw new ConcurrentOperationException(carbonTable, "compaction", "update")
         }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 7337496..e14f465 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -37,13 +37,15 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.exception.MultipleMatchingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.DeleteDelataResultImpl
@@ -314,6 +316,22 @@ object DeleteExecution {
     segmentsTobeDeleted
   }
 
+  def clearDistributedSegmentCache(carbonTable: CarbonTable,
+      segmentsToBeCleared: Seq[Segment]): Unit = {
+    if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable
+      .getDatabaseName, carbonTable.getTableName)) {
+      try {
+        IndexServer.getClient.invalidateSegmentCache(carbonTable
+          .getDatabaseName, carbonTable.getTableName, segmentsToBeCleared.map(_.getSegmentNo)
+          .toArray)
+      } catch {
+        case _: Exception =>
+          LOGGER.warn(s"Clearing of invalid segments for ${
+            carbonTable.getTableUniqueName} has failed")
+      }
+    }
+  }
+
   private def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
   (CarbonTableInputFormat[Array[Object]], Job) = {
     val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 5cc5bc8..d1d3992 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -123,6 +123,11 @@ object CarbonSetCommand {
           "property should be in \" carbon.table.load.sort.scope.<database_name>" +
           ".<table_name>=<sort_sope> \" format.")
       }
+    } else if (key.startsWith(CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER)) {
+      val keySplits = key.split("\\.")
+      if (keySplits.length == 6 || keySplits.length == 4) {
+        sessionParams.addProperty(key.toString, value)
+      }
     }
     else if (isCarbonProperty) {
       sessionParams.addProperty(key, value)


Mime
View raw message