carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [carbondata] 26/41: [CARBONDATA-3318] Added PreAgg & Bloom Event-Listener for ShowCacheCommmand
Date Tue, 02 Apr 2019 02:41:46 GMT
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit c4f32dd3c127011f656e7caea82e5df7af7c1c2a
Author: namanrastogi <naman.rastogi.52@gmail.com>
AuthorDate: Sat Mar 9 15:49:00 2019 +0530

    [CARBONDATA-3318] Added PreAgg & Bloom Event-Listener for ShowCacheCommmand
    
    Decoupling of Cache Commands
    1. Added PreAgg Event-Listener for ShowCacheCommmand
    2. Added Bloom Event-Listener for ShowCacheCommmand
    3. Added PreAgg Event-Listener for DropCacheCommand
    4. Added Bloom Event-Listener for DropCacheCommmand
    5. Updated doc
    6. Support external table
    6.1 display external table in comments/with table name
    6.2 count the index files for external table
    
    This closes #3146
---
 .../datamap/bloom/BloomCacheKeyValue.java          |   2 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java      |   9 +-
 docs/ddl-of-carbondata.md                          |  13 +
 .../sql/commands/TestCarbonShowCacheCommand.scala  |  51 ++-
 .../{DropCacheEvents.scala => CacheEvents.scala}   |  11 +-
 .../org/apache/carbondata/events/Events.scala      |   9 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |   7 +-
 .../sql/execution/command/cache/CacheUtil.scala    | 108 +++++++
 .../command/cache/CarbonDropCacheCommand.scala     |  59 +---
 .../command/cache/CarbonShowCacheCommand.scala     | 341 ++++++++-------------
 .../command/cache/DropCacheEventListeners.scala    | 121 ++++++++
 .../cache/DropCachePreAggEventListener.scala       |  70 -----
 .../command/cache/ShowCacheEventListeners.scala    | 126 ++++++++
 13 files changed, 581 insertions(+), 346 deletions(-)

diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
index a66ee63..70624eb 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
@@ -35,7 +35,7 @@ public class BloomCacheKeyValue {
     private String shardPath;
     private String indexColumn;
 
-    CacheKey(String shardPath, String indexColumn) {
+    public CacheKey(String shardPath, String indexColumn) {
       this.shardPath = shardPath;
       this.indexColumn = indexColumn;
     }
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 9785549..11b216e 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -227,7 +227,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
    * returns all shard directories of bloom index files for query
    * if bloom index files are merged we should get only one shard path
    */
-  private Set<String> getAllShardPaths(String tablePath, String segmentId) {
+  public static Set<String> getAllShardPaths(String tablePath, String segmentId,
+      String dataMapName) {
     String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
         tablePath, segmentId, dataMapName);
     CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
@@ -257,7 +258,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     try {
       Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
       if (shardPaths == null) {
-        shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
+        shardPaths =
+            getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
         segmentMap.put(segment.getSegmentNo(), shardPaths);
       }
       Set<String> filteredShards = segment.getFilteredIndexShardNames();
@@ -299,7 +301,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
     Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
     if (shardPaths == null) {
-      shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
+      shardPaths =
+          getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
       segmentMap.put(segment.getSegmentNo(), shardPaths);
     }
     Set<String> filteredShards = segment.getFilteredIndexShardNames();
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index e6f209e..07a2670 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -1119,3 +1119,16 @@ Users can specify which columns to include and exclude for local dictionary gene
   its dictionary files, its datamaps and children tables.
     
   This command is not allowed on child tables.
+
+### Important points
+
+  1. Cache information is updated only after the select query is executed. 
+  
+  2. In case of alter table the already loaded cache is invalidated when any subsequent select query
+  is fired.
+
+  3. Dictionary is loaded in cache only when the dictionary columns are queried upon. If we don't do
+  direct query on dictionary column, cache will not be loaded.
+  If we do `SELECT * FROM t1`, and even though for this case dictionary is loaded, it is loaded in
+  executor and not on driver, and the final result rows are returned back to driver, and thus will
+  produce no trace on driver cache if we do `SHOW METACACHE` or `SHOW METACACHE ON TABLE t1`.
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
index e7fd5fa..35ac2e3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
@@ -17,10 +17,14 @@
 
 package org.apache.carbondata.sql.commands
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
 class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
   override protected def beforeAll(): Unit = {
     // use new database
@@ -133,6 +137,28 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
     assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
   }
 
+  test("test external table show cache") {
+    sql(s"CREATE TABLE employeeTable(empno int, empname String, designation String, " +
+        s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
+        s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
+        s"attendance int, utilization int, salary int) stored by 'carbondata'")
+    sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE employeeTable")
+    val table = CarbonEnv.getCarbonTable(Some("default"), "employeeTable")(sqlContext.sparkSession)
+    val location = FileFactory
+      .getUpdatedFilePath(
+        table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "/Fact/Part0/Segment_0")
+    sql(s"CREATE EXTERNAL TABLE extTable stored as carbondata LOCATION '${location}'")
+    sql("select * from extTable").show()
+    val rows = sql("SHOW METACACHE ON TABLE extTable").collect()
+    var isPresent = false
+    rows.foreach(row => {
+      if (row.getString(2).equalsIgnoreCase("1/1 index files cached (external table)")){
+        isPresent = true
+      }
+    })
+    Assert.assertTrue(isPresent)
+  }
+
   override protected def afterAll(): Unit = {
     sql("use default").collect()
     dropTable
@@ -145,42 +171,63 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS default.cache_4")
     sql("DROP TABLE IF EXISTS default.cache_5")
     sql("DROP TABLE IF EXISTS empTable")
+    sql("DROP TABLE IF EXISTS employeeTable")
+    sql("DROP TABLE IF EXISTS extTable")
   }
 
   test("show cache") {
+
+    // Empty database
     sql("use cache_empty_db").collect()
     val result1 = sql("show metacache").collect()
     assertResult(2)(result1.length)
     assertResult(Row("cache_empty_db", "ALL", "0 B", "0 B", "0 B"))(result1(1))
 
+    // Database with 3 tables but only 2 are in cache
     sql("use cache_db").collect()
     val result2 = sql("show metacache").collect()
     assertResult(4)(result2.length)
 
+    // Make sure PreAgg tables are not in SHOW METADATA
     sql("use default").collect()
     val result3 = sql("show metacache").collect()
     val dataMapCacheInfo = result3
       .map(row => row.getString(1))
       .filter(table => table.equals("cache_4_cache_4_count"))
-    assertResult(1)(dataMapCacheInfo.length)
+    assertResult(0)(dataMapCacheInfo.length)
   }
 
   test("show metacache on table") {
     sql("use cache_db").collect()
+
+    // Table with Index, Dictionary & Bloom filter
     val result1 = sql("show metacache on table cache_1").collect()
     assertResult(3)(result1.length)
+    assertResult("1/1 index files cached")(result1(0).getString(2))
+    assertResult("bloomfilter")(result1(2).getString(2))
 
+    // Table with Index and Dictionary
     val result2 = sql("show metacache on table cache_db.cache_2").collect()
     assertResult(2)(result2.length)
+    assertResult("2/2 index files cached")(result2(0).getString(2))
+    assertResult("0 B")(result2(1).getString(1))
 
+    // Table not in cache
     checkAnswer(sql("show metacache on table cache_db.cache_3"),
       Seq(Row("Index", "0 B", "0/1 index files cached"), Row("Dictionary", "0 B", "")))
 
+    // Table with Index, Dictionary & PreAgg child table
     val result4 = sql("show metacache on table default.cache_4").collect()
     assertResult(3)(result4.length)
+    assertResult("1/1 index files cached")(result4(0).getString(2))
+    assertResult("0 B")(result4(1).getString(1))
+    assertResult("preaggregate")(result4(2).getString(2))
 
     sql("use default").collect()
+
+    // Table with 5 index files
     val result5 = sql("show metacache on table cache_5").collect()
     assertResult(2)(result5.length)
+    assertResult("5/5 index files cached")(result5(0).getString(2))
   }
 }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CacheEvents.scala
similarity index 80%
rename from integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala
rename to integration/spark-common/src/main/scala/org/apache/carbondata/events/CacheEvents.scala
index 2e8b78e..ec5127f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CacheEvents.scala
@@ -21,8 +21,15 @@ import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 
-case class DropCacheEvent(
+case class DropTableCacheEvent(
     carbonTable: CarbonTable,
     sparkSession: SparkSession,
     internalCall: Boolean)
-  extends Event with DropCacheEventInfo
+  extends Event with DropTableCacheEventInfo
+
+
+case class ShowTableCacheEvent(
+    carbonTable: CarbonTable,
+    sparkSession: SparkSession,
+    internalCall: Boolean)
+  extends Event with ShowTableCacheEventInfo
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index c03d3c6..e6b9213 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -63,9 +63,16 @@ trait DropTableEventInfo {
 }
 
 /**
+ * event for show cache
+ */
+trait ShowTableCacheEventInfo {
+  val carbonTable: CarbonTable
+}
+
+/**
  * event for drop cache
  */
-trait DropCacheEventInfo {
+trait DropTableCacheEventInfo {
   val carbonTable: CarbonTable
 }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 60d896a..7ca9945 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
-import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener
+import org.apache.spark.sql.execution.command.cache._
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
@@ -186,7 +186,10 @@ object CarbonEnv {
       .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
       .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
       .addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
-      .addListener(classOf[DropCacheEvent], DropCachePreAggEventListener)
+      .addListener(classOf[DropTableCacheEvent], DropCachePreAggEventListener)
+      .addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
+      .addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener)
+      .addListener(classOf[ShowTableCacheEvent], ShowCacheBloomEventListener)
   }
 
   /**
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
new file mode 100644
index 0000000..615d8e0
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.cache
+
+import org.apache.hadoop.mapred.JobConf
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.core.cache.CacheType
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
+import org.apache.carbondata.datamap.bloom.{BloomCacheKeyValue, BloomCoarseGrainDataMapFactory}
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+
+
+object CacheUtil {
+
+  /**
+   * Given a carbonTable, returns the list of all carbonindex files
+   *
+   * @param carbonTable
+   * @return List of all index files
+   */
+  def getAllIndexFiles(carbonTable: CarbonTable): List[String] = {
+    if (carbonTable.isTransactionalTable) {
+      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      CarbonDataMergerUtil.getValidSegmentList(absoluteTableIdentifier).asScala.flatMap {
+        segment =>
+          segment.getCommittedIndexFile.keySet().asScala
+      }.toList
+    } else {
+      val tablePath = carbonTable.getTablePath
+      val readCommittedScope = new LatestFilesReadCommittedScope(tablePath,
+        FileFactory.getConfiguration)
+      readCommittedScope.getSegmentList.flatMap {
+        load =>
+          val seg = new Segment(load.getLoadName, null, readCommittedScope)
+          seg.getCommittedIndexFile.keySet().asScala
+      }.toList
+    }
+  }
+
+  /**
+   * Given a carbonTable file, returns a list of all dictionary entries which can be in cache
+   *
+   * @param carbonTable
+   * @return List of all dict entries which can in cache
+   */
+  def getAllDictCacheKeys(carbonTable: CarbonTable): List[String] = {
+    def getDictCacheKey(columnIdentifier: String,
+        cacheType: CacheType[_, _]): String = {
+      columnIdentifier + CarbonCommonConstants.UNDERSCORE + cacheType.getCacheName
+    }
+
+    carbonTable.getAllDimensions.asScala
+      .collect {
+        case dict if dict.isGlobalDictionaryEncoding =>
+          Seq(getDictCacheKey(dict.getColumnId, CacheType.FORWARD_DICTIONARY),
+            getDictCacheKey(dict.getColumnId, CacheType.REVERSE_DICTIONARY))
+      }.flatten.toList
+  }
+
+  def getBloomCacheKeys(carbonTable: CarbonTable, datamap: DataMapSchema): List[String] = {
+    val segments = CarbonDataMergerUtil
+      .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala
+
+    // Generate shard Path for the datamap
+    val shardPaths = segments.flatMap {
+      segment =>
+        BloomCoarseGrainDataMapFactory.getAllShardPaths(carbonTable.getTablePath,
+          segment.getSegmentNo, datamap.getDataMapName).asScala
+    }
+
+    // get index columns
+    val indexColumns = carbonTable.getIndexedColumns(datamap).asScala.map {
+      entry =>
+        entry.getColName
+    }
+
+    // generate cache key using shard path and index columns on which bloom was created.
+    val datamapKeys = shardPaths.flatMap {
+      shardPath =>
+        indexColumns.map {
+          indexCol =>
+            new BloomCacheKeyValue.CacheKey(shardPath, indexCol).toString
+      }
+    }
+    datamapKeys.toList
+  }
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
index e955ed9..a0bb43e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.command.cache
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -26,12 +25,8 @@ import org.apache.spark.sql.execution.command.MetadataCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue
-import org.apache.carbondata.events.{DropCacheEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{DropTableCacheEvent, OperationContext, OperationListenerBus}
 
 case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall: Boolean = false)
   extends MetadataCommand {
@@ -45,59 +40,27 @@ case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall
   }
 
   def clearCache(carbonTable: CarbonTable, sparkSession: SparkSession): Unit = {
-    LOGGER.info("Drop cache request received for table " + carbonTable.getTableName)
+    LOGGER.info("Drop cache request received for table " + carbonTable.getTableUniqueName)
 
-    val dropCacheEvent = DropCacheEvent(
-      carbonTable,
-      sparkSession,
-      internalCall
-    )
+    val dropCacheEvent = DropTableCacheEvent(carbonTable, sparkSession, internalCall)
     val operationContext = new OperationContext
     OperationListenerBus.getInstance.fireEvent(dropCacheEvent, operationContext)
 
     val cache = CacheProvider.getInstance().getCarbonCache
     if (cache != null) {
-      val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
 
-      // Dictionary IDs
-      val dictIds = carbonTable.getAllDimensions.asScala.filter(_.isGlobalDictionaryEncoding)
-        .map(_.getColumnId).toArray
+      // Get all Index files for the specified table.
+      val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)
 
-      // Remove elements from cache
-      val keysToRemove = ListBuffer[String]()
-      val cacheIterator = cache.getCacheMap.entrySet().iterator()
-      while (cacheIterator.hasNext) {
-        val entry = cacheIterator.next()
-        val cache = entry.getValue
+      // Extract dictionary keys for the table and create cache keys from those
+      val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
 
-        if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
-          // index
-          val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
-            CarbonCommonConstants.FILE_SEPARATOR)
-          if (indexPath.startsWith(tablePath)) {
-            keysToRemove += entry.getKey
-          }
-        } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
-          // bloom datamap
-          val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
-            CarbonCommonConstants.FILE_SEPARATOR)
-          if (shardPath.contains(tablePath)) {
-            keysToRemove += entry.getKey
-          }
-        } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
-          // dictionary
-          val dictId = dictIds.find(id => entry.getKey.startsWith(id))
-          if (dictId.isDefined) {
-            keysToRemove += entry.getKey
-          }
-        }
-      }
+      // Remove elements from cache
+      val keysToRemove = allIndexFiles ++ dictKeys
       cache.removeAll(keysToRemove.asJava)
     }
-
-    LOGGER.info("Drop cache request received for table " + carbonTable.getTableName)
+    LOGGER.info("Drop cache request served for table " + carbonTable.getTableUniqueName)
   }
 
-  override protected def opName: String = "DROP CACHE"
-
+  override protected def opName: String = "DROP METACACHE"
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index 462be83..e19ee48 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -20,27 +20,28 @@ package org.apache.spark.sql.execution.command.cache
 import scala.collection.mutable
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.mapred.JobConf
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.execution.command.MetadataCommand
 import org.apache.spark.sql.types.StringType
 
-import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.cache.{CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
 import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, ShowTableCacheEvent}
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 import org.apache.carbondata.spark.util.CommonUtil.bytesToDisplaySize
 
-/**
- * SHOW CACHE
- */
-case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
+
+case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
+    internalCall: Boolean = false)
   extends MetadataCommand {
 
   override def output: Seq[AttributeReference] = {
@@ -61,241 +62,147 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
 
   override protected def opName: String = "SHOW CACHE"
 
-  def showAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
+  def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
     val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
     val cache = CacheProvider.getInstance().getCarbonCache()
     if (cache == null) {
       Seq(
-        Row("ALL", "ALL", bytesToDisplaySize(0L),
-          bytesToDisplaySize(0L), bytesToDisplaySize(0L)),
-        Row(currentDatabase, "ALL", bytesToDisplaySize(0L),
-          bytesToDisplaySize(0L), bytesToDisplaySize(0L)))
+        Row("ALL", "ALL", 0L, 0L, 0L),
+        Row(currentDatabase, "ALL", 0L, 0L, 0L))
     } else {
       val carbonTables = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-        .listAllTables(sparkSession)
-        .filter { table =>
-        table.getDatabaseName.equalsIgnoreCase(currentDatabase)
-      }
-      val tablePaths = carbonTables
-        .map { table =>
-          (table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR,
-            table.getDatabaseName + "." + table.getTableName)
+        .listAllTables(sparkSession).filter {
+        carbonTable =>
+          carbonTable.getDatabaseName.equalsIgnoreCase(currentDatabase) &&
+          !carbonTable.isChildDataMap
       }
 
-      val dictIds = carbonTables
-        .filter(_ != null)
-        .flatMap { table =>
-          table
-            .getAllDimensions
-            .asScala
-            .filter(_.isGlobalDictionaryEncoding)
-            .toArray
-            .map(dim => (dim.getColumnId, table.getDatabaseName + "." + table.getTableName))
-        }
-
-      // all databases
-      var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
-      // current database
+      // All tables of current database
       var (dbIndexSize, dbDatamapSize, dbDictSize) = (0L, 0L, 0L)
-      val tableMapIndexSize = mutable.HashMap[String, Long]()
-      val tableMapDatamapSize = mutable.HashMap[String, Long]()
-      val tableMapDictSize = mutable.HashMap[String, Long]()
-      val cacheIterator = cache.getCacheMap.entrySet().iterator()
-      while (cacheIterator.hasNext) {
-        val entry = cacheIterator.next()
-        val cache = entry.getValue
-        if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
-          // index
-          allIndexSize = allIndexSize + cache.getMemorySize
-          val indexPath = entry.getKey.replace(
-            CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
-          val tablePath = tablePaths.find(path => indexPath.startsWith(path._1))
-          if (tablePath.isDefined) {
-            dbIndexSize = dbIndexSize + cache.getMemorySize
-            val memorySize = tableMapIndexSize.get(tablePath.get._2)
-            if (memorySize.isEmpty) {
-              tableMapIndexSize.put(tablePath.get._2, cache.getMemorySize)
-            } else {
-              tableMapIndexSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
-            }
-          }
-        } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
-          // bloom datamap
-          allDatamapSize = allDatamapSize + cache.getMemorySize
-          val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
-            CarbonCommonConstants.FILE_SEPARATOR)
-          val tablePath = tablePaths.find(path => shardPath.contains(path._1))
-          if (tablePath.isDefined) {
-            dbDatamapSize = dbDatamapSize + cache.getMemorySize
-            val memorySize = tableMapDatamapSize.get(tablePath.get._2)
-            if (memorySize.isEmpty) {
-              tableMapDatamapSize.put(tablePath.get._2, cache.getMemorySize)
-            } else {
-              tableMapDatamapSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
-            }
-          }
-        } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
-          // dictionary
-          allDictSize = allDictSize + cache.getMemorySize
-          val dictId = dictIds.find(id => entry.getKey.startsWith(id._1))
-          if (dictId.isDefined) {
-            dbDictSize = dbDictSize + cache.getMemorySize
-            val memorySize = tableMapDictSize.get(dictId.get._2)
-            if (memorySize.isEmpty) {
-              tableMapDictSize.put(dictId.get._2, cache.getMemorySize)
-            } else {
-              tableMapDictSize.put(dictId.get._2, memorySize.get + cache.getMemorySize)
-            }
-          }
-        }
-      }
-      if (tableMapIndexSize.isEmpty && tableMapDatamapSize.isEmpty && tableMapDictSize.isEmpty) {
-        Seq(
-          Row("ALL", "ALL", bytesToDisplaySize(allIndexSize),
-            bytesToDisplaySize(allDatamapSize), bytesToDisplaySize(allDictSize)),
-          Row(currentDatabase, "ALL", bytesToDisplaySize(0),
-            bytesToDisplaySize(0), bytesToDisplaySize(0)))
-      } else {
-        val tableList = tableMapIndexSize
-          .map(_._1)
-          .toSeq
-          .union(tableMapDictSize.map(_._1).toSeq)
-          .distinct
-          .sorted
-          .map { uniqueName =>
-            val values = uniqueName.split("\\.")
-            val indexSize = tableMapIndexSize.getOrElse(uniqueName, 0L)
-            val datamapSize = tableMapDatamapSize.getOrElse(uniqueName, 0L)
-            val dictSize = tableMapDictSize.getOrElse(uniqueName, 0L)
-            Row(values(0), values(1), bytesToDisplaySize(indexSize),
-              bytesToDisplaySize(datamapSize), bytesToDisplaySize(dictSize))
+      val tableList: Seq[Row] = carbonTables.map {
+        carbonTable =>
+          val tableResult = getTableCache(sparkSession, carbonTable)
+          var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
+          tableResult.drop(2).foreach {
+            row =>
+              indexSize += row.getLong(1)
+              datamapSize += row.getLong(2)
           }
+          val dictSize = tableResult(1).getLong(1)
 
-        Seq(
-          Row("ALL", "ALL", bytesToDisplaySize(allIndexSize),
-            bytesToDisplaySize(allDatamapSize), bytesToDisplaySize(allDictSize)),
-          Row(currentDatabase, "ALL", bytesToDisplaySize(dbIndexSize),
-            bytesToDisplaySize(dbDatamapSize), bytesToDisplaySize(dbDictSize))
-        ) ++ tableList
-      }
-    }
-  }
-
-  def showTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
-    val cache = CacheProvider.getInstance().getCarbonCache()
-    if (cache == null) {
-      Seq.empty
-    } else {
-      val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
-      var numIndexFilesCached = 0
+          dbIndexSize += indexSize
+          dbDictSize += dictSize
+          dbDatamapSize += datamapSize
 
-      // Path -> Name, Type
-      val datamapName = mutable.Map[String, (String, String)]()
-      // Path -> Size
-      val datamapSize = mutable.Map[String, Long]()
-      // parent table
-      datamapName.put(tablePath, ("", ""))
-      datamapSize.put(tablePath, 0)
-      // children tables
-      for( schema <- carbonTable.getTableInfo.getDataMapSchemaList.asScala ) {
-        val childTableName = carbonTable.getTableName + "_" + schema.getDataMapName
-        val childTable = CarbonEnv
-          .getCarbonTable(Some(carbonTable.getDatabaseName), childTableName)(sparkSession)
-        val path = childTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
-        val name = schema.getDataMapName
-        val dmType = schema.getProviderName
-        datamapName.put(path, (name, dmType))
-        datamapSize.put(path, 0)
-      }
-      // index schemas
-      for (schema <- DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
-        .asScala) {
-        val path = tablePath + schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR
-        val name = schema.getDataMapName
-        val dmType = schema.getProviderName
-        datamapName.put(path, (name, dmType))
-        datamapSize.put(path, 0)
-      }
-
-      var dictSize = 0L
-
-      // dictionary column ids
-      val dictIds = carbonTable
-        .getAllDimensions
-        .asScala
-        .filter(_.isGlobalDictionaryEncoding)
-        .map(_.getColumnId)
-        .toArray
-
-      val cacheIterator = cache.getCacheMap.entrySet().iterator()
-      while (cacheIterator.hasNext) {
-        val entry = cacheIterator.next()
-        val cache = entry.getValue
-
-        if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
-          // index
-          val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
-            CarbonCommonConstants.FILE_SEPARATOR)
-          val pathEntry = datamapSize.filter(entry => indexPath.startsWith(entry._1))
-          if(pathEntry.nonEmpty) {
-            val (path, size) = pathEntry.iterator.next()
-            datamapSize.put(path, size + cache.getMemorySize)
-          }
-          if(indexPath.startsWith(tablePath)) {
-            numIndexFilesCached += 1
+          val tableName = if (!carbonTable.isTransactionalTable) {
+            carbonTable.getTableName + " (external table)"
           }
-        } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
-          // bloom datamap
-          val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
-            CarbonCommonConstants.FILE_SEPARATOR)
-          val pathEntry = datamapSize.filter(entry => shardPath.contains(entry._1))
-          if(pathEntry.nonEmpty) {
-            val (path, size) = pathEntry.iterator.next()
-            datamapSize.put(path, size + cache.getMemorySize)
+          else {
+            carbonTable.getTableName
           }
-        } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
-          // dictionary
-          val dictId = dictIds.find(id => entry.getKey.startsWith(id))
-          if (dictId.isDefined) {
-            dictSize = dictSize + cache.getMemorySize
+          (currentDatabase, tableName, indexSize, datamapSize, dictSize)
+      }.collect {
+        case (db, table, indexSize, datamapSize, dictSize) if !((indexSize == 0) &&
+                                                                (datamapSize == 0) &&
+                                                                (dictSize == 0)) =>
+          Row(db, table, indexSize, datamapSize, dictSize)
+      }
+
+      // Scan whole cache and fill the entries for All-Database-All-Tables
+      var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
+      cache.getCacheMap.asScala.foreach {
+        case (_, cacheable) =>
+          cacheable match {
+            case _: BlockletDataMapIndexWrapper =>
+              allIndexSize += cacheable.getMemorySize
+            case _: BloomCacheKeyValue.CacheValue =>
+              allDatamapSize += cacheable.getMemorySize
+            case _: AbstractColumnDictionaryInfo =>
+              allDictSize += cacheable.getMemorySize
           }
-        }
       }
 
-      // get all index files
-      val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath)
-      val numIndexFilesAll = CarbonDataMergerUtil.getValidSegmentList(absoluteTableIdentifier)
-        .asScala.map {
-          segment =>
-            segment.getCommittedIndexFile
-        }.flatMap {
-        indexFilesMap => indexFilesMap.keySet().toArray
-      }.size
+      Seq(
+        Row("ALL", "ALL", allIndexSize, allDatamapSize, allDictSize),
+        Row(currentDatabase, "ALL", dbIndexSize, dbDatamapSize, dbDictSize)
+      ) ++ tableList
+    }
+  }
 
-      var result = Seq(
-        Row("Index", bytesToDisplaySize(datamapSize.get(tablePath).get),
-          numIndexFilesCached + "/" + numIndexFilesAll + " index files cached"),
-        Row("Dictionary", bytesToDisplaySize(dictSize), "")
-      )
-      for ((path, size) <- datamapSize) {
-        if (path != tablePath) {
-          val (dmName, dmType) = datamapName.get(path).get
-          result = result :+ Row(dmName, bytesToDisplaySize(size), dmType)
-        }
-      }
-      result
+  def getTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
+    val cache = CacheProvider.getInstance().getCarbonCache
+    val showTableCacheEvent = ShowTableCacheEvent(carbonTable, sparkSession, internalCall)
+    val operationContext = new OperationContext
+    // datamapName -> (datamapProviderName, indexSize, datamapSize)
+    val currentTableSizeMap = scala.collection.mutable.Map[String, (String, String, Long, Long)]()
+    operationContext.setProperty(carbonTable.getTableUniqueName, currentTableSizeMap)
+    OperationListenerBus.getInstance.fireEvent(showTableCacheEvent, operationContext)
+
+    // Get all Index files for the specified table.
+    val allIndexFiles: List[String] = CacheUtil.getAllIndexFiles(carbonTable)
+    val indexFilesInCache: List[String] = allIndexFiles.filter {
+      indexFile =>
+        cache.get(indexFile) != null
+    }
+    val sizeOfIndexFilesInCache: Long = indexFilesInCache.map {
+      indexFile =>
+        cache.get(indexFile).getMemorySize
+    }.sum
+
+    // Extract dictionary keys for the table and create cache keys from those
+    val dictKeys = CacheUtil.getAllDictCacheKeys(carbonTable)
+    val sizeOfDictInCache = dictKeys.collect {
+      case dictKey if cache.get(dictKey) != null =>
+        cache.get(dictKey).getMemorySize
+    }.sum
+
+    // Assemble result for all the datamaps for the table
+    val otherDatamaps = operationContext.getProperty(carbonTable.getTableUniqueName)
+      .asInstanceOf[mutable.Map[String, (String, Long, Long)]]
+    val otherDatamapsResults: Seq[Row] = otherDatamaps.map {
+      case (name, (provider, indexSize, dmSize)) =>
+        Row(name, indexSize, dmSize, provider)
+    }.toSeq
+
+    var comments = indexFilesInCache.size + "/" + allIndexFiles.size + " index files cached"
+    if (!carbonTable.isTransactionalTable) {
+      comments += " (external table)"
     }
+    Seq(
+      Row("Index", sizeOfIndexFilesInCache, comments),
+      Row("Dictionary", sizeOfDictInCache, "")
+    ) ++ otherDatamapsResults
   }
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     if (tableIdentifier.isEmpty) {
-      showAllTablesCache(sparkSession)
+      /**
+       * Assemble result for database
+       */
+      val result = getAllTablesCache(sparkSession)
+      result.map {
+        row =>
+          Row(row.get(0), row.get(1), bytesToDisplaySize(row.getLong(2)),
+            bytesToDisplaySize(row.getLong(3)), bytesToDisplaySize(row.getLong(4)))
+      }
     } else {
+      /**
+       * Assemble result for table
+       */
       val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
-      if (carbonTable.isChildDataMap) {
-        throw new UnsupportedOperationException("Operation not allowed on child table.")
+      if (CacheProvider.getInstance().getCarbonCache == null) {
+        return Seq.empty
+      }
+      val rawResult = getTableCache(sparkSession, carbonTable)
+      val result = rawResult.slice(0, 2) ++
+                   rawResult.drop(2).map {
+                     row =>
+                       Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3))
+                   }
+      result.map {
+        row =>
+          Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2))
       }
-      showTableCache(sparkSession, carbonTable)
     }
   }
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
new file mode 100644
index 0000000..6c8bb54
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
@@ -0,0 +1,121 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.command.cache
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener.LOGGER
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
+import org.apache.carbondata.events.{DropTableCacheEvent, Event, OperationContext,
+  OperationEventListener}
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+
+object DropCachePreAggEventListener extends OperationEventListener {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override protected def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    event match {
+      case dropCacheEvent: DropTableCacheEvent =>
+        val carbonTable = dropCacheEvent.carbonTable
+        val sparkSession = dropCacheEvent.sparkSession
+        val internalCall = dropCacheEvent.internalCall
+        if (carbonTable.isChildDataMap && !internalCall) {
+          throw new UnsupportedOperationException("Operation not allowed on child table.")
+        }
+
+        if (carbonTable.hasDataMapSchema) {
+          val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
+            .filter(_.getRelationIdentifier != null)
+          for (childSchema <- childrenSchemas) {
+            val childTable =
+              CarbonEnv.getCarbonTable(
+                TableIdentifier(childSchema.getRelationIdentifier.getTableName,
+                  Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
+            try {
+              val dropCacheCommandForChildTable =
+                CarbonDropCacheCommand(
+                  TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
+                  internalCall = true)
+              dropCacheCommandForChildTable.processMetadata(sparkSession)
+            }
+            catch {
+              case e: Exception =>
+                LOGGER.warn(
+                  s"Clean cache for PreAgg table ${ childTable.getTableName } failed.", e)
+            }
+          }
+        }
+    }
+  }
+}
+
+
+object DropCacheBloomEventListener extends OperationEventListener {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override protected def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    event match {
+      case dropCacheEvent: DropTableCacheEvent =>
+        val carbonTable = dropCacheEvent.carbonTable
+        val cache = CacheProvider.getInstance().getCarbonCache
+        val datamaps = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+          .asScala.toList
+        val segments = CarbonDataMergerUtil
+          .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala.toList
+
+        datamaps.foreach {
+          case datamap if datamap.getProviderName
+            .equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName) =>
+            try {
+              // Get datamap keys
+              val datamapKeys = CacheUtil.getBloomCacheKeys(carbonTable, datamap)
+
+              // remove datamap keys from cache
+              cache.removeAll(datamapKeys.asJava)
+            } catch {
+              case e: Exception =>
+                LOGGER.warn(
+                  s"Clean cache for Bloom datamap ${datamap.getDataMapName} failed.", e)
+            }
+          case _ =>
+        }
+    }
+  }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala
deleted file mode 100644
index 3d03c60..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.command.cache
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.catalyst.TableIdentifier
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.events.{DropCacheEvent, Event, OperationContext,
-  OperationEventListener}
-
-object DropCachePreAggEventListener extends OperationEventListener {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * Called on a specified event occurrence
-   *
-   * @param event
-   * @param operationContext
-   */
-  override protected def onEvent(event: Event,
-      operationContext: OperationContext): Unit = {
-
-    event match {
-      case dropCacheEvent: DropCacheEvent =>
-        val carbonTable = dropCacheEvent.carbonTable
-        val sparkSession = dropCacheEvent.sparkSession
-        val internalCall = dropCacheEvent.internalCall
-        if (carbonTable.isChildDataMap && !internalCall) {
-          throw new UnsupportedOperationException("Operation not allowed on child table.")
-        }
-
-        if (carbonTable.hasDataMapSchema) {
-          val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
-            .filter(_.getRelationIdentifier != null)
-          for (childSchema <- childrenSchemas) {
-            val childTable =
-              CarbonEnv.getCarbonTable(
-                TableIdentifier(childSchema.getRelationIdentifier.getTableName,
-                  Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
-            val dropCacheCommandForChildTable =
-              CarbonDropCacheCommand(
-                TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
-                internalCall = true)
-            dropCacheCommandForChildTable.processMetadata(sparkSession)
-          }
-        }
-    }
-
-  }
-}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
new file mode 100644
index 0000000..70f63a4
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
@@ -0,0 +1,126 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.command.cache
+
+import java.util
+import java.util.{HashSet, Set}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.CarbonEnv
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.datamap.bloom.{BloomCacheKeyValue, BloomCoarseGrainDataMapFactory}
+import org.apache.carbondata.events._
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+
+object ShowCachePreAggEventListener extends OperationEventListener {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override protected def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    event match {
+      case showTableCacheEvent: ShowTableCacheEvent =>
+        val carbonTable = showTableCacheEvent.carbonTable
+        val sparkSession = showTableCacheEvent.sparkSession
+        val internalCall = showTableCacheEvent.internalCall
+        if (carbonTable.isChildDataMap && !internalCall) {
+          throw new UnsupportedOperationException("Operation not allowed on child table.")
+        }
+
+        val currentTableSizeMap = operationContext.getProperty(carbonTable.getTableUniqueName)
+          .asInstanceOf[mutable.Map[String, (String, Long, Long)]]
+
+        if (carbonTable.hasDataMapSchema) {
+          val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
+            .filter(_.getRelationIdentifier != null)
+          for (childSchema <- childrenSchemas) {
+            val datamapName = childSchema.getDataMapName
+            val datamapProvider = childSchema.getProviderName
+            val childCarbonTable = CarbonEnv.getCarbonTable(
+              TableIdentifier(childSchema.getRelationIdentifier.getTableName,
+                Some(carbonTable.getDatabaseName)))(sparkSession)
+
+            val resultForChild = CarbonShowCacheCommand(None, true)
+              .getTableCache(sparkSession, childCarbonTable)
+            val datamapSize = resultForChild.head.getLong(1)
+            currentTableSizeMap.put(datamapName, (datamapProvider, datamapSize, 0L))
+          }
+        }
+    }
+  }
+}
+
+
+object ShowCacheBloomEventListener extends OperationEventListener {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override protected def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    event match {
+      case showTableCacheEvent: ShowTableCacheEvent =>
+        val carbonTable = showTableCacheEvent.carbonTable
+        val cache = CacheProvider.getInstance().getCarbonCache
+        val currentTableSizeMap = operationContext.getProperty(carbonTable.getTableUniqueName)
+          .asInstanceOf[mutable.Map[String, (String, Long, Long)]]
+
+        // Extract all datamaps for the table
+        val datamaps = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+          .asScala
+
+        datamaps.foreach {
+          case datamap if datamap.getProviderName
+            .equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName) =>
+
+            // Get datamap keys
+            val datamapKeys = CacheUtil.getBloomCacheKeys(carbonTable, datamap)
+
+            // calculate the memory size if key exists in cache
+            val datamapSize = datamapKeys.collect {
+              case key if cache.get(key) != null =>
+                cache.get(key).getMemorySize
+            }.sum
+
+            // put the datmap size into main table's map.
+            currentTableSizeMap
+              .put(datamap.getDataMapName, (datamap.getProviderName, 0L, datamapSize))
+
+          case _ =>
+        }
+    }
+  }
+}


Mime
View raw message