carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3665] Support TimeBased Cache expiration using ExpiringMap
Date Mon, 16 Mar 2020 09:46:17 GMT
This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eaf33a  [CARBONDATA-3665] Support TimeBased Cache expiration using ExpiringMap
3eaf33a is described below

commit 3eaf33a6a9b4673027965be213bc0f54efb23d46
Author: Indhumathi27 <indhumathim27@gmail.com>
AuthorDate: Tue Feb 11 18:20:38 2020 +0530

    [CARBONDATA-3665] Support TimeBased Cache expiration using ExpiringMap
    
    Why is this PR needed?
    
    Currently, in Carbon, we follow LRU cache based mechanism. An least-recently used entry will be removed from the cache when it is full. There is no time-based cache expiration supported in carbon. In cloud, all vm's may not have enough memory to cache everything we could cache.
    In that case, we can clear cache after a specified duration. This can be achieved by using cache libraries available.
    
    One of the caching library is ExpringMap, which provides flexible and powerful caching features. Please refer ExpiringMap for more info.
    
    What changes were proposed in this PR?
    
    1. Replaced LinkedHashMap with ExpringMap
    2. Added a table property to allow user to specify cache expiration duration in minutes, to clear cache entries for that table.
    Newly added carbon table property:
    index_cache_expiration_seconds which takes long value.
    For example:
    index_cache_expiration_seconds="300" -> After 5 minutes, cache will be cleared.
    
    Does this PR introduce any user interface change?
    Yes. (table property is added)
    
    Is any new testcase added?
    Yes
    
    This closes #3653
---
 core/pom.xml                                       |   5 +
 .../carbondata/core/cache/CarbonLRUCache.java      |  87 +++++++----------
 .../core/constants/CarbonCommonConstants.java      |  16 ++++
 .../carbondata/core/datamap/TableDataMap.java      |   8 +-
 .../carbondata/core/datamap/dev/DataMap.java       |   3 +-
 .../core/indexstore/BlockletDataMapIndexStore.java |   9 +-
 .../indexstore/blockletindex/BlockDataMap.java     |   4 +-
 .../blockletindex/BlockletDataMapFactory.java      |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java    |  12 +++
 .../carbondata/core/cache/CarbonLRUCacheTest.java  |  10 +-
 .../blockletindex/TestBlockletDataMapFactory.java  |   5 +
 docs/ddl-of-carbondata.md                          |  14 +++
 .../carbondata/hadoop/CacheAccessClient.java       | 105 ---------------------
 .../datamap/bloom/BloomCacheKeyValue.java          |  12 +++
 .../datamap/bloom/BloomCoarseGrainDataMap.java     |   8 +-
 .../datamap/bloom/BloomDataMapCache.java           |   7 +-
 .../datamap/lucene/LuceneFineGrainDataMap.java     |   3 +-
 .../apache/carbondata/spark/util/CommonUtil.scala  |  25 +++++
 .../spark/sql/catalyst/CarbonParserUtil.scala      |   3 +
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |   3 +
 .../org/apache/spark/util/AlterTableUtil.scala     |  10 +-
 .../testsuite/datamap/CGDataMapTestCase.scala      |   3 +-
 .../testsuite/datamap/FGDataMapTestCase.scala      |   3 +-
 .../sql/commands/TestCarbonShowCacheCommand.scala  |  95 +++++++++++++++++++
 24 files changed, 270 insertions(+), 182 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 8b389f5..5d5c1f4 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -60,6 +60,11 @@
       <version>${snappy.version}</version>
     </dependency>
     <dependency>
+      <groupId>net.jodah</groupId>
+      <artifactId>expiringmap</artifactId>
+      <version>0.5.9</version>
+    </dependency>
+    <dependency>
       <groupId>com.github.luben</groupId>
       <artifactId>zstd-jni</artifactId>
       <version>1.3.2-2</version>
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
index 2e2e368..3b19425 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
@@ -18,15 +18,17 @@
 package org.apache.carbondata.core.cache;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import net.jodah.expiringmap.ExpirationPolicy;
+import net.jodah.expiringmap.ExpiringMap;
 import org.apache.log4j.Logger;
 
 /**
@@ -46,7 +48,7 @@ public final class CarbonLRUCache {
    * Map that will contain key as table unique name and value as cache Holder
    * object
    */
-  private Map<String, Cacheable> lruCacheMap;
+  private ExpiringMap<String, Cacheable> expiringMap;
   /**
    * lruCacheSize
    */
@@ -95,9 +97,12 @@ public final class CarbonLRUCache {
    * initialize lru cache
    */
   private void initCache() {
-    lruCacheMap =
-        new LinkedHashMap<String, Cacheable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE, 1.0f,
-            true);
+    // Cache entries can have individual variable expiration times and policies by adding
+    // variableExpiration to the map. ExpirationPolicy.ACCESSED means the expiration can occur based
+    // on last access time
+    expiringMap =
+        ExpiringMap.builder().expirationPolicy(ExpirationPolicy.ACCESSED).variableExpiration()
+            .build();
   }
 
   /**
@@ -108,7 +113,7 @@ public final class CarbonLRUCache {
     List<String> toBeDeletedKeys =
         new ArrayList<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     long removedSize = 0;
-    for (Entry<String, Cacheable> entry : lruCacheMap.entrySet()) {
+    for (Entry<String, Cacheable> entry : expiringMap.entrySet()) {
       String key = entry.getKey();
       Cacheable cacheInfo = entry.getValue();
       long memorySize = cacheInfo.getMemorySize();
@@ -156,7 +161,7 @@ public final class CarbonLRUCache {
    * @param key
    */
   public void remove(String key) {
-    synchronized (lruCacheMap) {
+    synchronized (expiringMap) {
       removeKey(key);
     }
   }
@@ -165,7 +170,7 @@ public final class CarbonLRUCache {
    * @param keys
    */
   public void removeAll(List<String> keys) {
-    synchronized (lruCacheMap) {
+    synchronized (expiringMap) {
       for (String key : keys) {
         removeKey(key);
       }
@@ -178,11 +183,11 @@ public final class CarbonLRUCache {
    * @param key
    */
   private void removeKey(String key) {
-    Cacheable cacheable = lruCacheMap.get(key);
+    Cacheable cacheable = expiringMap.get(key);
     if (null != cacheable) {
       long memorySize = cacheable.getMemorySize();
       cacheable.invalidate();
-      lruCacheMap.remove(key);
+      expiringMap.remove(key);
       currentSize = currentSize - memorySize;
       LOGGER.info("Removed entry from InMemory lru cache :: " + key);
     }
@@ -195,17 +200,18 @@ public final class CarbonLRUCache {
    * @param columnIdentifier
    * @param cacheInfo
    */
-  public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSize) {
+  public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSize,
+      long expiration_time) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Required size for entry " + columnIdentifier + " :: " + requiredSize
           + " Current cache size :: " + currentSize);
     }
     boolean columnKeyAddedSuccessfully = false;
     if (isLRUCacheSizeConfigured()) {
-      synchronized (lruCacheMap) {
+      synchronized (expiringMap) {
         if (freeMemorySizeForAddingCache(requiredSize)) {
           currentSize = currentSize + requiredSize;
-          addEntryToLRUCacheMap(columnIdentifier, cacheInfo);
+          addEntryToLRUCacheMap(columnIdentifier, cacheInfo, expiration_time);
           columnKeyAddedSuccessfully = true;
         } else {
           LOGGER.error(
@@ -215,8 +221,8 @@ public final class CarbonLRUCache {
         }
       }
     } else {
-      synchronized (lruCacheMap) {
-        addEntryToLRUCacheMap(columnIdentifier, cacheInfo);
+      synchronized (expiringMap) {
+        addEntryToLRUCacheMap(columnIdentifier, cacheInfo, expiration_time);
         currentSize = currentSize + requiredSize;
       }
       columnKeyAddedSuccessfully = true;
@@ -225,44 +231,17 @@ public final class CarbonLRUCache {
   }
 
   /**
-   * This method will check if required size is available in the memory
-   * @param columnIdentifier
-   * @param requiredSize
-   * @return
-   */
-  public boolean tryPut(String columnIdentifier, long requiredSize) {
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("checking Required size for entry " + columnIdentifier + " :: " + requiredSize
-          + " Current cache size :: " + currentSize);
-    }
-    boolean columnKeyCanBeAdded = false;
-    if (isLRUCacheSizeConfigured()) {
-      synchronized (lruCacheMap) {
-        if (freeMemorySizeForAddingCache(requiredSize)) {
-          columnKeyCanBeAdded = true;
-        } else {
-          LOGGER.error(
-              "Size check failed.Size not available. Entry cannot be added to lru cache :: "
-                  + columnIdentifier + " .Required Size = " + requiredSize + " Size available " + (
-                  lruCacheMemorySize - currentSize));
-        }
-      }
-    } else {
-      columnKeyCanBeAdded = true;
-    }
-    return columnKeyCanBeAdded;
-  }
-
-  /**
    * The method will add the cache entry to LRU cache map
    *
    * @param columnIdentifier
    * @param cacheInfo
    */
-  private void addEntryToLRUCacheMap(String columnIdentifier, Cacheable cacheInfo) {
-    if (null == lruCacheMap.get(columnIdentifier)) {
-      lruCacheMap.put(columnIdentifier, cacheInfo);
-    }
+  private void addEntryToLRUCacheMap(String columnIdentifier, Cacheable cacheInfo,
+      long expirationTimeSeconds) {
+    if (null == expiringMap.get(columnIdentifier) && expirationTimeSeconds != 0L) {
+      expiringMap.put(columnIdentifier, cacheInfo, ExpirationPolicy.ACCESSED, expirationTimeSeconds,
+          TimeUnit.SECONDS);
+    } else expiringMap.putIfAbsent(columnIdentifier, cacheInfo);
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Added entry to InMemory lru cache :: " + columnIdentifier);
     }
@@ -317,8 +296,8 @@ public final class CarbonLRUCache {
    * @return
    */
   public Cacheable get(String key) {
-    synchronized (lruCacheMap) {
-      return lruCacheMap.get(key);
+    synchronized (expiringMap) {
+      return expiringMap.get(key);
     }
   }
 
@@ -326,16 +305,16 @@ public final class CarbonLRUCache {
    * This method will empty the level cache
    */
   public void clear() {
-    synchronized (lruCacheMap) {
-      for (Cacheable cachebleObj : lruCacheMap.values()) {
+    synchronized (expiringMap) {
+      for (Cacheable cachebleObj : expiringMap.values()) {
         cachebleObj.invalidate();
       }
-      lruCacheMap.clear();
+      expiringMap.clear();
     }
   }
 
   public Map<String, Cacheable> getCacheMap() {
-    return lruCacheMap;
+    return expiringMap;
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c1d7d81..baf5a37 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2406,4 +2406,20 @@ public final class CarbonCommonConstants {
   public static final String BUCKET_COLUMNS = "bucket_columns";
   public static final String BUCKET_NUMBER = "bucket_number";
 
+  /**
+   * Table property name for table level cache expiration. Carbon maintains index cache in driver
+   * side and the cache will be expired after seconds indicated by this table property.
+   * Cache entries can have individual variable expiration times and policies by providing
+   * variableExpiration policy to ExpirationMap
+   */
+  public static final String INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS =
+      "index_cache_expiration_seconds";
+
+  /**
+   * By default, the index cache is not expired by time, thus the cache size is controlled by
+   * setting the maximum size to 'INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS'
+   */
+  public static final int INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS_DEFAULT = Integer.MAX_VALUE;
+
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index f83d486..036118f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -203,7 +203,8 @@ public final class TableDataMap extends OperationEventListener {
                 false);
         for (DataMap dataMap : dataMaps.get(segment)) {
           pruneBlocklets.addAll(
-              dataMap.prune(filter.getResolver(), segmentProperties, partitions, filterExecuter));
+              dataMap.prune(filter.getResolver(), segmentProperties, partitions, filterExecuter,
+                  this.table));
         }
       } else {
         Expression expression = filter.getExpression();
@@ -332,7 +333,7 @@ public final class TableDataMap extends OperationEventListener {
               for (int i = segmentDataMapGroup.getFromIndex();
                    i <= segmentDataMapGroup.getToIndex(); i++) {
                 List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
-                    filter.getResolver(), segmentProperties, partitions, filterExecuter);
+                    filter.getResolver(), segmentProperties, partitions, filterExecuter, table);
                 pruneBlocklets.addAll(addSegmentId(
                     blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
                     segment));
@@ -447,7 +448,8 @@ public final class TableDataMap extends OperationEventListener {
             null, table.getMinMaxCacheColumns(segmentProperties),
             false);
     for (DataMap dataMap : dataMaps) {
-      blocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions, filterExecuter));
+      blocklets
+          .addAll(dataMap.prune(filterExp, segmentProperties, partitions, filterExecuter, table));
     }
     BlockletSerializer serializer = new BlockletSerializer();
     String writePath =
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index af36277..b41a355 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -47,7 +47,8 @@ public interface DataMap<T extends Blocklet> {
    * It returns the list of blocklets where these filters can exist.
    */
   List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
-      List<PartitionSpec> partitions, FilterExecuter filterExecuter) throws IOException;
+      List<PartitionSpec> partitions, FilterExecuter filterExecuter, CarbonTable table)
+      throws IOException;
 
   /**
    * Prune the datamap with filter expression and partition information. It returns the list of
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 143a423..97eeab8 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
@@ -146,8 +147,9 @@ public class BlockletDataMapIndexStore
               new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
         }
         if (identifierWrapper.isAddTableBlockToUnsafeAndLRUCache()) {
+          long expiration_time = CarbonUtil.getExpiration_time(identifierWrapper.getCarbonTable());
           lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
-              blockletDataMapIndexWrapper.getMemorySize());
+              blockletDataMapIndexWrapper.getMemorySize(), expiration_time);
         }
       } catch (Throwable e) {
         // clear all the memory used by datamaps loaded
@@ -257,10 +259,13 @@ public class BlockletDataMapIndexStore
         for (BlockDataMap blockletDataMap : dataMaps) {
           blockletDataMap.convertToUnsafeDMStore();
         }
+        // get cacheExpirationTime for table from tableProperties
+        long expiration_time =
+            CarbonUtil.getExpiration_time(tableBlockIndexUniqueIdentifierWrapper.getCarbonTable());
         // Locking is not required here because in LRU cache map add method is synchronized to add
         // only one entry at a time and if a key already exists it will not overwrite the entry
         lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
-            .getUniqueTableSegmentIdentifier(), wrapper, wrapper.getMemorySize());
+            .getUniqueTableSegmentIdentifier(), wrapper, wrapper.getMemorySize(), expiration_time);
       } catch (Throwable e) {
         // clear all the memory acquired by data map in case of any failure
         for (DataMap blockletDataMap : dataMaps) {
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 5437093..47a2ca3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -723,12 +723,12 @@ public class BlockDataMap extends CoarseGrainDataMap
   public List<Blocklet> prune(Expression expression, SegmentProperties properties,
       List<PartitionSpec> partitions, CarbonTable carbonTable, FilterExecuter filterExecuter) {
     return prune(new DataMapFilter(properties, carbonTable, expression).getResolver(), properties,
-        partitions, filterExecuter);
+        partitions, filterExecuter, carbonTable);
   }
 
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
-      List<PartitionSpec> partitions, FilterExecuter filterExecuter) {
+      List<PartitionSpec> partitions, FilterExecuter filterExecuter, CarbonTable table) {
     if (memoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 930d031..b0f0214 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -498,7 +498,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     for (CoarseGrainDataMap dataMap : dataMaps) {
       blocklets.addAll(dataMap
           .prune((FilterResolverIntf) null, getSegmentProperties(segment, partitions), partitions,
-              null));
+              null, this.getCarbonTable()));
     }
     return blocklets;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 4949f18..41902b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3363,4 +3363,16 @@ public final class CarbonUtil {
     sortColumns.addAll(nonVarCharDims);
     return sortColumns;
   }
+
+  /**
+   * get cache expiration time from carbonTable table properties
+   */
+  public static long getExpiration_time(CarbonTable carbonTable) {
+    String cacheExpirationTime = carbonTable.getTableInfo().getFactTable().getTableProperties()
+        .get(CarbonCommonConstants.INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS);
+    if (null == cacheExpirationTime) {
+      return CarbonCommonConstants.INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS_DEFAULT;
+    }
+    return Integer.parseInt(cacheExpirationTime);
+  }
 }
diff --git a/core/src/test/java/org/apache/carbondata/core/cache/CarbonLRUCacheTest.java b/core/src/test/java/org/apache/carbondata/core/cache/CarbonLRUCacheTest.java
index a30184b..c5676bd 100644
--- a/core/src/test/java/org/apache/carbondata/core/cache/CarbonLRUCacheTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/cache/CarbonLRUCacheTest.java
@@ -42,17 +42,17 @@ public class CarbonLRUCacheTest {
   }
 
   @Test public void testPut() {
-    boolean result = carbonLRUCache.put("Column1", cacheable, 10L);
+    boolean result = carbonLRUCache.put("Column1", cacheable, 10L, 5);
     assertTrue(result);
   }
 
   @Test public void testPutWhenSizeIsNotAvailable() {
-    boolean result = carbonLRUCache.put("Column2", cacheable, 11111110L);
+    boolean result = carbonLRUCache.put("Column2", cacheable, 11111110L, 5);
     assertFalse(result);
   }
 
   @Test public void testPutWhenKeysHaveToBeRemoved() {
-    boolean result = carbonLRUCache.put("Column3", cacheable, 2097153L);
+    boolean result = carbonLRUCache.put("Column3", cacheable, 2097153L, 5);
     assertTrue(result);
   }
 
@@ -64,8 +64,8 @@ public class CarbonLRUCacheTest {
   @Test public void testBiggerThanMaxSizeConfiguration() {
     CarbonLRUCache carbonLRUCacheForConfig =
             new CarbonLRUCache("prop2", "200000");//200GB
-    assertTrue(carbonLRUCacheForConfig.put("Column1", cacheable, 10L));
-    assertFalse(carbonLRUCacheForConfig.put("Column2", cacheable, 107374182400L));//100GB
+    assertTrue(carbonLRUCacheForConfig.put("Column1", cacheable, 10L, 5));
+    assertFalse(carbonLRUCacheForConfig.put("Column2", cacheable, 107374182400L, 5));//100GB
   }
 
   @AfterClass public static void cleanUp() {
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index 0f314b8..c5812bc 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 
 import mockit.Deencapsulation;
@@ -65,10 +66,13 @@ public class TestBlockletDataMapFactory {
 
   private Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> cache;
 
+  private TableSchema factTable;
+
   @Before public void setUp()
       throws ClassNotFoundException, IllegalAccessException, InvocationTargetException,
       InstantiationException {
     tableInfo = new TableInfo();
+    factTable = new TableSchema();
     Constructor<?> constructor =
         Class.forName("org.apache.carbondata.core.metadata.schema.table.CarbonTable")
             .getDeclaredConstructors()[0];
@@ -78,6 +82,7 @@ public class TestBlockletDataMapFactory {
         .from("/opt/store/default/carbon_table/", "default", "carbon_table",
             UUID.randomUUID().toString());
     Deencapsulation.setField(tableInfo, "identifier", absoluteTableIdentifier);
+    Deencapsulation.setField(tableInfo, "factTable", factTable);
     Deencapsulation.setField(carbonTable, "tableInfo", tableInfo);
     new MockUp<CarbonTable>() {
       @Mock
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 3416426..84b18f3 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -36,6 +36,7 @@ CarbonData DDL statements are documented here,which includes:
   * [Bad Records Path](#bad-records-path) 
   * [Load Minimum Input File Size](#load-minimum-data-size)
   * [Range Column](#range-column)
+  * [Index Cache Expiration Time In Seconds](#index-cache-expiration-time-in-seconds)
 
 * [CREATE TABLE AS SELECT](#create-table-as-select)
 * [CREATE EXTERNAL TABLE](#create-external-table)
@@ -108,6 +109,7 @@ CarbonData DDL statements are documented here,which includes:
 | [BUCKET_COLUMNS](#bucketing)                                  | Columns which are to be placed in buckets                    |
 | [LOAD_MIN_SIZE_INMB](#load-minimum-data-size)                | Minimum input data size per node for data loading          |
 | [Range Column](#range-column)                                | partition input data by range                              |
+| [INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS](#index-cache-expiration-time-in-seconds)| Table level time-based cache expiration in seconds |
 
  Following are the guidelines for TBLPROPERTIES, CarbonData's additional table options can be set via carbon.properties.
 
@@ -503,6 +505,18 @@ CarbonData DDL statements are documented here,which includes:
      ```
      TBLPROPERTIES('RANGE_COLUMN'='col1')
      ```
+   - ##### Index Cache Expiration Time In Seconds
+     Carbon maintains index cache in driver side and the cache will be expired after seconds indicated by this table property.
+     
+     ```
+     TBLPROPERTIES('index_cache_expiration_seconds'='1')
+     ```
+     After creation of table or on already created tables use the alter table command to configure the cache expiration time.
+     
+     Syntax:
+     
+     ```
+      ALTER TABLE [dbName].tableName SET TBLPROPERTIES ('index_cache_expiration_seconds'='3')
 
 ## CREATE TABLE AS SELECT
   This function allows user to create a Carbon table from any of the Parquet/Hive/Carbon table. This is beneficial when the user wants to create Carbon table from any other Parquet/Hive table and use the Carbon query engine to query and achieve better query results for cases where Carbon is faster than other file formats. Also this feature can be used for backing up the data.
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
deleted file mode 100644
index 377afc2..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-/**
- * CacheClient : Class used to request the segments cache
- */
-public class CacheAccessClient<K, V> {
-  /**
-   * List of segments
-   */
-  private Set<K> segmentSet = new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  private Cache<K, V> cache;
-
-  public CacheAccessClient(Cache<K, V> cache) {
-    this.cache = cache;
-  }
-
-  /**
-   * This method will return the value for the given key. It will not check and load
-   * the data for the given key
-   *
-   * @param key
-   * @return
-   */
-  public V getIfPresent(K key) {
-    V value = cache.getIfPresent(key);
-    if (value != null) {
-      segmentSet.add(key);
-    }
-    return value;
-  }
-
-  /**
-   * This method will get the value for the given key. If value does not exist
-   * for the given key, it will check and load the value.
-   *
-   * @param key
-   * @return
-   */
-  public V get(K key) {
-    V value = cache.get(key);
-    if (value != null) {
-      segmentSet.add(key);
-    }
-    return value;
-  }
-
-  /**
-   * the method is used to clear access count of the unused segments cacheable object
-   */
-  public void close() {
-    List<K> segmentArrayList = new ArrayList<>(segmentSet.size());
-    segmentArrayList.addAll(segmentSet);
-    cache.clearAccessCount(segmentArrayList);
-    cache = null;
-  }
-
-  /**
-   * This method will remove the cache for a given key
-   *
-   * @param keys
-   */
-  public void invalidateAll(List<K> keys) {
-    for (K key : keys) {
-      cache.invalidate(key);
-    }
-  }
-
-  /**
-   * This method will clear the access count for a given list of segments
-   *
-   * @param segmentList
-   */
-  public void clearAccessCount(List<K> segmentList) {
-    cache.clearAccessCount(segmentList);
-    // remove from segment set so that access count is not decremented again during close operation
-    segmentSet.removeAll(segmentList);
-  }
-
-}
diff --git a/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java b/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
index 6bffb92..559df00 100644
--- a/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
+++ b/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
@@ -35,12 +35,19 @@ public class BloomCacheKeyValue {
     private static final long serialVersionUID = -1478238084352505372L;
     private String shardPath;
     private String indexColumn;
+    private long expirationTime;
 
     public CacheKey(String shardPath, String indexColumn) {
       this.shardPath = shardPath;
       this.indexColumn = indexColumn;
     }
 
+    public CacheKey(String shardPath, String indexColumn, long expirationTime) {
+      this.shardPath = shardPath;
+      this.indexColumn = indexColumn;
+      this.expirationTime = expirationTime;
+    }
+
     public String getShardPath() {
       return shardPath;
     }
@@ -71,6 +78,11 @@ public class BloomCacheKeyValue {
     public int hashCode() {
       return Objects.hash(shardPath, indexColumn);
     }
+
+    public long getExpirationTime() {
+      return expirationTime;
+    }
+
   }
 
   public static class CacheValue implements Cacheable {
diff --git a/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index d55e681..9dbc718 100644
--- a/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -133,7 +133,7 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
 
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
-      List<PartitionSpec> partitions, FilterExecuter filterExecuter) {
+      List<PartitionSpec> partitions, FilterExecuter filterExecuter, CarbonTable carbonTable) {
     Set<Blocklet> hitBlocklets = null;
     if (filterExp == null) {
       // null is different from empty here. Empty means after pruning, no blocklet need to scan.
@@ -151,8 +151,10 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("prune blocklet for query: " + bloomQueryModel);
       }
-      BloomCacheKeyValue.CacheKey cacheKey = new BloomCacheKeyValue.CacheKey(
-          this.indexPath.toString(), bloomQueryModel.columnName);
+      Long expiration_time = CarbonUtil.getExpiration_time(carbonTable);
+      BloomCacheKeyValue.CacheKey cacheKey =
+          new BloomCacheKeyValue.CacheKey(this.indexPath.toString(), bloomQueryModel.columnName,
+              expiration_time);
       BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey);
       List<CarbonBloomFilter> bloomIndexList = cacheValue.getBloomFilters();
       for (CarbonBloomFilter bloomFilter : bloomIndexList) {
diff --git a/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java b/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
index e08147c..65ce7be 100644
--- a/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
+++ b/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
@@ -47,12 +47,15 @@ public class BloomDataMapCache
 
   @Override
   public BloomCacheKeyValue.CacheValue get(BloomCacheKeyValue.CacheKey key) {
-    BloomCacheKeyValue.CacheValue cacheValue = getIfPresent(key);
+    BloomCacheKeyValue.CacheKey cacheKey =
+        new BloomCacheKeyValue.CacheKey(key.getShardPath(), key.getIndexColumn());
+    BloomCacheKeyValue.CacheValue cacheValue = getIfPresent(cacheKey);
     if (cacheValue == null) {
       List<CarbonBloomFilter> bloomFilters =
               BloomIndexFileStore.loadBloomFilterFromFile(key.getShardPath(), key.getIndexColumn());
       cacheValue = new BloomCacheKeyValue.CacheValue(bloomFilters);
-      lruCache.put(key.toString(), cacheValue, cacheValue.getMemorySize());
+      lruCache.put(cacheKey.toString(), cacheValue, cacheValue.getMemorySize(),
+          key.getExpirationTime());
     }
     return cacheValue;
   }
diff --git a/index/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/index/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index 0b1acaf..e9dcab1 100644
--- a/index/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++ b/index/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.MatchExpression;
@@ -203,7 +204,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
   @Override
   public List<FineGrainBlocklet> prune(FilterResolverIntf filterExp,
       SegmentProperties segmentProperties, List<PartitionSpec> partitions,
-      FilterExecuter filterExecuter) throws IOException {
+      FilterExecuter filterExecuter, CarbonTable carbonTable) throws IOException {
 
     // convert filter expr into lucene list query
     List<String> fields = new ArrayList<String>();
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index fc7dc36..5528184 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -417,6 +417,31 @@ object CommonUtil {
   }
 
   /**
+   * This method will validate the cache expiration time specified by the user
+   *
+   * @param tableProperties table property specified by user
+   * @param propertyName property name
+   */
+  def validateCacheExpiration(tableProperties: Map[String, String], propertyName: String): Unit = {
+    var expirationTime: java.lang.Integer = 0
+    if (tableProperties.get(propertyName).isDefined) {
+      val value = tableProperties(propertyName)
+      val exceptionMsg = s"Invalid $propertyName value found: " +
+                         s"$value, only duration from 1 second to INT_MAX_VALUE is supported."
+      try {
+        expirationTime = java.lang.Integer.parseInt(value)
+      } catch {
+        case n: NumberFormatException =>
+          throw new MalformedCarbonCommandException(exceptionMsg)
+      }
+      if (expirationTime == 0L) {
+        throw new MalformedCarbonCommandException(exceptionMsg)
+      }
+      tableProperties.put(propertyName, value)
+    }
+  }
+
+  /**
    * This method will validate the table page size
    *
    * @param tableProperties table property specified by user
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index a8ddd69..4ff66e0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -402,6 +402,9 @@ object CarbonParserUtil {
     // validate load_min_size_inmb property
     CommonUtil.validateLoadMinSize(tableProperties,
       CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)
+    // validate cache expiration time
+    CommonUtil.validateCacheExpiration(tableProperties,
+      CarbonCommonConstants.INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS)
 
     TableModel(
       ifNotExistPresent,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index adda8db..37d9ed8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -638,6 +638,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         }
         // validate the tableBlockSize from table properties
         CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKSIZE)
+        // validate cache expiration time
+        CommonUtil.validateCacheExpiration(tableProperties,
+          CarbonCommonConstants.INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS)
         // validate for supported table properties
         validateTableProperties(tableProperties)
         // validate column_meta_cache proeperty if defined
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 9d2002a..b4a2265 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -457,6 +457,13 @@ object AlterTableUtil {
       // validate the Compaction Level Threshold
       validateCompactionLevelThresholdProperties(carbonTable, lowerCasePropertiesMap)
 
+      val cacheExpiration = lowerCasePropertiesMap.get(CarbonCommonConstants
+        .INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS)
+      if (cacheExpiration.isDefined) {
+        CommonUtil.validateCacheExpiration(lowerCasePropertiesMap, CarbonCommonConstants
+          .INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS)
+      }
+
       // if SORT_COLUMN is changed, it will move them to the head of column list
       // Make an schemaEvolution entry as we changed the schema with different column order with
       // alter set sort columns
@@ -562,7 +569,8 @@ object AlterTableUtil {
       "SORT_SCOPE",
       "SORT_COLUMNS",
       "GLOBAL_SORT_PARTITIONS",
-      "LONG_STRING_COLUMNS")
+      "LONG_STRING_COLUMNS",
+      "INDEX_CACHE_EXPIRATION_SECONDS")
     supportedOptions.contains(propKey.toUpperCase)
   }
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index e8e4a17..628b0c8 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -200,7 +200,8 @@ class CGDataMap extends CoarseGrainDataMap {
       filterExp: FilterResolverIntf,
       segmentProperties: SegmentProperties,
       partitions: java.util.List[PartitionSpec],
-      filterExecuter: FilterExecuter): java.util.List[Blocklet] = {
+      filterExecuter: FilterExecuter,
+      carbonTable: CarbonTable): java.util.List[Blocklet] = {
     val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
     val expression = filterExp.getFilterExpression
     getEqualToExpression(expression, buffer)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index b52f7e2..abfeafd 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -195,7 +195,8 @@ class FGDataMap extends FineGrainDataMap {
       filterExp: FilterResolverIntf,
       segmentProperties: SegmentProperties,
       partitions: java.util.List[PartitionSpec],
-      filterExecuter: FilterExecuter): java.util.List[FineGrainBlocklet] = {
+      filterExecuter: FilterExecuter,
+      carbonTable: CarbonTable): java.util.List[FineGrainBlocklet] = {
     val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
     val expression = filterExp.getFilterExpression
     getEqualToExpression(expression, buffer)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
index e4a6081..b959d6c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -172,6 +173,7 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS empTable")
     sql("DROP TABLE IF EXISTS employeeTable")
     sql("DROP TABLE IF EXISTS extTable")
+    sql("drop table if exists carbonTable")
   }
 
   test("show cache") {
@@ -288,4 +290,97 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
     assert(showCache(0).get(2).toString.equalsIgnoreCase("5/5 index files cached"))
     sql("drop table if exists partitionTable")
   }
+
+  test("test cache expiration using expiringMap") {
+    sql("drop table if exists carbonTable")
+    sql("create table carbonTable(col1 int, col2 string,col3 string) stored as carbondata tblproperties('index_cache_expiration_seconds'='1')")
+    sql("insert into carbonTable select 1, 'ab', 'vf'")
+    checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(1)))
+    var showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
+    Thread.sleep(1000)
+    showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("0/1 index files cached"))
+    sql("drop table if exists carbonTable")
+  }
+
+  test("test cache expiration using expiringMap with bloom") {
+    sql("drop table if exists carbonTable")
+    sql("create table carbonTable(col1 int, col2 string,col3 string) stored as carbondata " +
+        "tblproperties('index_cache_expiration_seconds'='1')")
+    sql("insert into carbonTable select 1, 'ab', 'vf'")
+    sql("drop datamap if exists cache_2_bloom")
+    sql("CREATE DATAMAP IF NOT EXISTS cache_2_bloom ON TABLE carbonTable USING 'bloomfilter' " +
+        "DMPROPERTIES('INDEX_COLUMNS'='col3')")
+    checkAnswer(sql("select count(*) from carbonTable where col3='vf'"), Seq(Row(1)))
+    var showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
+    assertResult("bloomfilter")(showCache(1).getString(2))
+    Thread.sleep(1000)
+    showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache.length == 1)
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("0/1 index files cached"))
+    sql("drop table if exists carbonTable")
+  }
+
+  test("test cache expiration using expiringMap with invalid cache expiration time") {
+    sql("drop table if exists carbonTable")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table carbonTable(col1 int, col2 string,col3 string) stored as carbondata " +
+        "tblproperties('index_cache_expiration_seconds'='ab')")
+    }.getMessage.contains("Invalid cache_expiration_time value found: ab")
+  }
+
+  test("test cache expiration using expiringMap with alter set and unset tblproperties") {
+    sql("drop table if exists carbonTable")
+    sql("create table carbonTable(col1 int, col2 string,col3 string) stored as carbondata " +
+        "tblproperties('index_cache_expiration_seconds'='5')")
+    sql("insert into carbonTable select 1, 'ab', 'vf'")
+    // check cache expiration with 10 seconds
+    checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(1)))
+    var showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
+    Thread.sleep(10000)
+    showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("0/1 index files cached"))
+    // check cache expiration with 3 seconds with alter set
+    sql("alter table carbontable set tblproperties('index_cache_expiration_seconds'='3')")
+    sql("insert into carbonTable select 1, 'ab', 'vf'")
+    checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(2)))
+    showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("2/2 index files cached"))
+    Thread.sleep(3000)
+    showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("0/2 index files cached"))
+    // revert cache expiration property
+    sql("ALTER TABLE carbonTable UNSET TBLPROPERTIES('index_cache_expiration_seconds')")
+    checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(2)))
+    showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("2/2 index files cached"))
+    Thread.sleep(3000)
+    showCache = sql("show metacache on table carbonTable").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("2/2 index files cached"))
+    sql("drop table if exists carbonTable")
+  }
+
+  test("test cache expiration using expiringMap more than one table") {
+    sql("drop table if exists carbonTable1")
+    sql("create table carbonTable1(col1 int, col2 string,col3 string) stored as carbondata " +
+        "tblproperties('index_cache_expiration_seconds'='60')")
+    sql("drop table if exists carbonTable2")
+    sql("create table carbonTable2(col1 int, col2 string,col3 string) stored as carbondata " +
+        "tblproperties('index_cache_expiration_seconds'='5')")
+    sql("insert into carbonTable1 select 1, 'ab', 'vf'")
+    sql("insert into carbonTable2 select 1, 'ab', 'vf'")
+    checkAnswer(sql("select count(*) from carbonTable1"), Seq(Row(1)))
+    checkAnswer(sql("select count(*) from carbonTable2"), Seq(Row(1)))
+    Thread.sleep(5000)
+    var showCache = sql("show metacache on table carbonTable2").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("0/1 index files cached"))
+    showCache = sql("show metacache on table carbonTable1").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
+    sql("drop table if exists carbonTable1")
+    sql("drop table if exists carbonTable2")
+  }
+
 }


Mime
View raw message