carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [carbondata] 09/41: [CARBONDATA-3305] Support show metacache command to list the cache sizes for all tables
Date Tue, 02 Apr 2019 02:41:29 GMT
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit fdb48d0d05bd7f6923b8d6b7493f13beb2f56d93
Author: QiangCai <qiangcai@qq.com>
AuthorDate: Thu Jan 17 15:39:33 2019 +0800

    [CARBONDATA-3305] Support show metacache command to list the cache sizes for all tables
    
    >>> SHOW METACACHE
    +--------+--------+----------+------------+---------------+
    |Database|Table   |Index size|Datamap size|Dictionary size|
    +--------+--------+----------+------------+---------------+
    |ALL     |ALL     |842 Bytes |982 Bytes   |80.34 KB       |
    |default |ALL     |842 Bytes |982 Bytes   |80.34 KB       |
    |default |t1      |225 Bytes |982 Bytes   |0              |
    |default |t1_dpagg|259 Bytes |0           |0              |
    |default |t2      |358 Bytes |0           |80.34 KB       |
    +--------+--------+----------+------------+---------------+
    
    >>> SHOW METACACHE FOR TABLE t1
    +----------+---------+----------------------+
    |Field     |Size     |Comment               |
    +----------+---------+----------------------+
    |Index     |225 Bytes|1/1 index files cached|
    |Dictionary|0        |                      |
    |dpagg     |259 Bytes|preaggregate          |
    |dblom     |982 Bytes|bloomfilter           |
    +----------+---------+----------------------+
    
    >>> SHOW METACACHE FOR TABLE t2
    +----------+---------+----------------------+
    |Field     |Size     |Comment               |
    +----------+---------+----------------------+
    |Index     |358 Bytes|2/2 index files cached|
    |Dictionary|80.34 KB |                      |
    +----------+---------+----------------------+
    
    This closes #3078
---
 .../carbondata/core/cache/CacheProvider.java       |   4 +
 .../carbondata/core/cache/CarbonLRUCache.java      |   4 +
 docs/ddl-of-carbondata.md                          |  21 ++
 .../sql/commands/TestCarbonShowCacheCommand.scala  | 163 +++++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   1 +
 .../command/cache/CarbonShowCacheCommand.scala     | 312 +++++++++++++++++++++
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |  12 +-
 7 files changed, 515 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index 99b1693..deb48e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -195,4 +195,8 @@ public class CacheProvider {
     }
     cacheTypeToCacheMap.clear();
   }
+
+  public CarbonLRUCache getCarbonCache() {
+    return carbonLRUCache;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
index 87254e3..74ff8a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
@@ -305,4 +305,8 @@ public final class CarbonLRUCache {
       lruCacheMap.clear();
     }
   }
+
+  public Map<String, Cacheable> getCacheMap() {
+    return lruCacheMap;
+  }
 }
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 0d0e5bd..3476475 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -67,6 +67,7 @@ CarbonData DDL statements are documented here,which includes:
   * [SPLIT PARTITION](#split-a-partition)
   * [DROP PARTITION](#drop-a-partition)
 * [BUCKETING](#bucketing)
+* [CACHE](#cache)
 
 ## CREATE TABLE
 
@@ -1088,4 +1089,24 @@ Users can specify which columns to include and exclude for local dictionary
gene
   TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='productName')
   ```
 
+## CACHE
 
+  CarbonData internally uses LRU caching to improve the performance. The user can get information

+  about current cache used status in memory through the following command:
+
+  ```sql
+  SHOW METADATA
+  ``` 
+  
+  This shows the overall memory consumed in the cache by categories - index files, dictionary
and 
+  datamaps. This also shows the cache usage by all the tables and children tables in the
current 
+  database.
+  
+  ```sql
+  SHOW METADATA ON TABLE tableName
+  ```
+  
+  This shows detailed information on cache usage by the table `tableName` and its carbonindex
files, 
+  its dictionary files, its datamaps and children tables.
+  
+  This command is not allowed on child tables.
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
new file mode 100644
index 0000000..0e1cd00
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sql.commands
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
+  override protected def beforeAll(): Unit = {
+    // use new database
+    sql("drop database if exists cache_db cascade").collect()
+    sql("drop database if exists cache_empty_db cascade").collect()
+    sql("create database cache_db").collect()
+    sql("create database cache_empty_db").collect()
+    dropTable
+    sql("use cache_db").collect()
+    sql(
+      """
+        | CREATE TABLE cache_db.cache_1
+        | (empno int, empname String, designation String, doj Timestamp, workgroupcategory
int,
+        |  workgroupcategoryname String, deptno int, deptname String, projectcode int,
+        |  projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,
+        |  salary int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('DICTIONARY_INCLUDE'='deptname')
+      """.stripMargin)
+    // bloom
+    sql("CREATE DATAMAP IF NOT EXISTS cache_1_bloom ON TABLE cache_db.cache_1 USING 'bloomfilter'
" +
+        "DMPROPERTIES('INDEX_COLUMNS'='deptno')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_1 ")
+
+    sql(
+      """
+        | CREATE TABLE cache_2
+        | (empno int, empname String, designation String, doj Timestamp, workgroupcategory
int,
+        |  workgroupcategoryname String, deptno int, deptname String, projectcode int,
+        |  projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,
+        |  salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_db.cache_2 ")
+    sql("insert into table cache_2 select * from cache_1").collect()
+
+    sql(
+      """
+        | CREATE TABLE cache_3
+        | (empno int, empname String, designation String, doj Timestamp, workgroupcategory
int,
+        |  workgroupcategoryname String, deptno int, deptname String, projectcode int,
+        |  projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,
+        |  salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_3 ")
+
+    // use default database
+    sql("use default").collect()
+    sql(
+      """
+        | CREATE TABLE cache_4
+        | (empno int, empname String, designation String, doj Timestamp, workgroupcategory
int,
+        |  workgroupcategoryname String, deptno int, deptname String, projectcode int,
+        |  projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,
+        |  salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("insert into table cache_4 select * from cache_db.cache_2").collect()
+
+    // standard partition table
+    sql(
+      """
+        | CREATE TABLE cache_5
+        | (empno int, empname String, designation String, doj Timestamp, workgroupcategory
int,
+        |  workgroupcategoryname String, deptname String, projectcode int,
+        |  projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,
+        |  salary int)
+        | PARTITIONED BY (deptno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      "insert into table cache_5 select empno,empname,designation,doj,workgroupcategory,"
+
+      "workgroupcategoryname,deptname,projectcode,projectjoindate,projectenddate,attendance,"
+
+      "utilization,salary,deptno from cache_4").collect()
+
+    // datamap
+    sql("create datamap cache_4_count on table cache_4 using 'preaggregate' as " +
+        "select workgroupcategoryname,count(empname) as count from cache_4 group by workgroupcategoryname")
+
+    // count star to cache index
+    sql("select max(deptname) from cache_db.cache_1").collect()
+    sql("SELECT deptno FROM cache_db.cache_1 where deptno=10").collect()
+    sql("select count(*) from cache_db.cache_2").collect()
+    sql("select count(*) from cache_4").collect()
+    sql("select count(*) from cache_5").collect()
+    sql("select workgroupcategoryname,count(empname) as count from cache_4 group by workgroupcategoryname").collect()
+  }
+
+
+  override protected def afterAll(): Unit = {
+    sql("use default").collect()
+    dropTable
+  }
+
+  private def dropTable = {
+    sql("DROP TABLE IF EXISTS cache_db.cache_1")
+    sql("DROP TABLE IF EXISTS cache_db.cache_2")
+    sql("DROP TABLE IF EXISTS cache_db.cache_3")
+    sql("DROP TABLE IF EXISTS default.cache_4")
+    sql("DROP TABLE IF EXISTS default.cache_5")
+  }
+
+  test("show cache") {
+    sql("use cache_empty_db").collect()
+    val result1 = sql("show metacache").collect()
+    assertResult(2)(result1.length)
+    assertResult(Row("cache_empty_db", "ALL", "0", "0", "0"))(result1(1))
+
+    sql("use cache_db").collect()
+    val result2 = sql("show metacache").collect()
+    assertResult(4)(result2.length)
+
+    sql("use default").collect()
+    val result3 = sql("show metacache").collect()
+    val dataMapCacheInfo = result3
+      .map(row => row.getString(1))
+      .filter(table => table.equals("cache_4_cache_4_count"))
+    assertResult(1)(dataMapCacheInfo.length)
+  }
+
+  test("show metacache on table") {
+    sql("use cache_db").collect()
+    val result1 = sql("show metacache on table cache_1").collect()
+    assertResult(3)(result1.length)
+
+    val result2 = sql("show metacache on table cache_db.cache_2").collect()
+    assertResult(2)(result2.length)
+
+    checkAnswer(sql("show metacache on table cache_db.cache_3"),
+      Seq(Row("Index", "0 bytes", "0/1 index files cached"), Row("Dictionary", "0 bytes",
"")))
+
+    val result4 = sql("show metacache on table default.cache_4").collect()
+    assertResult(3)(result4.length)
+
+    sql("use default").collect()
+    val result5 = sql("show metacache on table cache_5").collect()
+    assertResult(2)(result5.length)
+  }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index dc75243..e03bebd 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -155,6 +155,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
   protected val HISTORY = carbonKeyWord("HISTORY")
   protected val SEGMENTS = carbonKeyWord("SEGMENTS")
   protected val SEGMENT = carbonKeyWord("SEGMENT")
+  protected val METACACHE = carbonKeyWord("METACACHE")
 
   protected val STRING = carbonKeyWord("STRING")
   protected val INTEGER = carbonKeyWord("INTEGER")
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
new file mode 100644
index 0000000..e937c32
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.cache
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.FileUtils.byteCountToDisplaySize
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.types.{LongType, StringType}
+
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+
+/**
+ * SHOW CACHE
+ */
+case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
+  extends MetadataCommand {
+
+  override def output: Seq[AttributeReference] = {
+    if (tableIdentifier.isEmpty) {
+      Seq(
+        AttributeReference("Database", StringType, nullable = false)(),
+        AttributeReference("Table", StringType, nullable = false)(),
+        AttributeReference("Index size", StringType, nullable = false)(),
+        AttributeReference("Datamap size", StringType, nullable = false)(),
+        AttributeReference("Dictionary size", StringType, nullable = false)())
+    } else {
+      Seq(
+        AttributeReference("Field", StringType, nullable = false)(),
+        AttributeReference("Size", StringType, nullable = false)(),
+        AttributeReference("Comment", StringType, nullable = false)())
+    }
+  }
+
+  override protected def opName: String = "SHOW CACHE"
+
+  def showAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
+    val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
+    val cache = CacheProvider.getInstance().getCarbonCache()
+    if (cache == null) {
+      Seq(Row("ALL", "ALL", 0L, 0L, 0L),
+        Row(currentDatabase, "ALL", 0L, 0L, 0L))
+    } else {
+      val tableIdents = sparkSession.sessionState.catalog.listTables(currentDatabase).toArray
+      val dbLocation = CarbonEnv.getDatabaseLocation(currentDatabase, sparkSession)
+      val tempLocation = dbLocation.replace(
+        CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
+      val tablePaths = tableIdents.map { tableIdent =>
+        (tempLocation + CarbonCommonConstants.FILE_SEPARATOR +
+         tableIdent.table + CarbonCommonConstants.FILE_SEPARATOR,
+          CarbonEnv.getDatabaseName(tableIdent.database)(sparkSession) + "." + tableIdent.table)
+      }
+
+      val dictIds = tableIdents
+        .map { tableIdent =>
+          var table: CarbonTable = null
+          try {
+            table = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
+          } catch {
+            case _ =>
+          }
+          table
+        }
+        .filter(_ != null)
+        .flatMap { table =>
+          table
+            .getAllDimensions
+            .asScala
+            .filter(_.isGlobalDictionaryEncoding)
+            .toArray
+            .map(dim => (dim.getColumnId, table.getDatabaseName + "." + table.getTableName))
+        }
+
+      // all databases
+      var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
+      // current database
+      var (dbIndexSize, dbDatamapSize, dbDictSize) = (0L, 0L, 0L)
+      val tableMapIndexSize = mutable.HashMap[String, Long]()
+      val tableMapDatamapSize = mutable.HashMap[String, Long]()
+      val tableMapDictSize = mutable.HashMap[String, Long]()
+      val cacheIterator = cache.getCacheMap.entrySet().iterator()
+      while (cacheIterator.hasNext) {
+        val entry = cacheIterator.next()
+        val cache = entry.getValue
+        if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
+          // index
+          allIndexSize = allIndexSize + cache.getMemorySize
+          val indexPath = entry.getKey.replace(
+            CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
+          val tablePath = tablePaths.find(path => indexPath.startsWith(path._1))
+          if (tablePath.isDefined) {
+            dbIndexSize = dbIndexSize + cache.getMemorySize
+            val memorySize = tableMapIndexSize.get(tablePath.get._2)
+            if (memorySize.isEmpty) {
+              tableMapIndexSize.put(tablePath.get._2, cache.getMemorySize)
+            } else {
+              tableMapIndexSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
+            }
+          }
+        } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
+          // bloom datamap
+          allDatamapSize = allDatamapSize + cache.getMemorySize
+          val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+          val tablePath = tablePaths.find(path => shardPath.contains(path._1))
+          if (tablePath.isDefined) {
+            dbDatamapSize = dbDatamapSize + cache.getMemorySize
+            val memorySize = tableMapDatamapSize.get(tablePath.get._2)
+            if (memorySize.isEmpty) {
+              tableMapDatamapSize.put(tablePath.get._2, cache.getMemorySize)
+            } else {
+              tableMapDatamapSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
+            }
+          }
+        } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
+          // dictionary
+          allDictSize = allDictSize + cache.getMemorySize
+          val dictId = dictIds.find(id => entry.getKey.startsWith(id._1))
+          if (dictId.isDefined) {
+            dbDictSize = dbDictSize + cache.getMemorySize
+            val memorySize = tableMapDictSize.get(dictId.get._2)
+            if (memorySize.isEmpty) {
+              tableMapDictSize.put(dictId.get._2, cache.getMemorySize)
+            } else {
+              tableMapDictSize.put(dictId.get._2, memorySize.get + cache.getMemorySize)
+            }
+          }
+        }
+      }
+      if (tableMapIndexSize.isEmpty && tableMapDatamapSize.isEmpty && tableMapDictSize.isEmpty)
{
+        Seq(
+          Row("ALL", "ALL", byteCountToDisplaySize(allIndexSize),
+            byteCountToDisplaySize(allDatamapSize), byteCountToDisplaySize(allDictSize)),
+          Row(currentDatabase, "ALL", "0", "0", "0"))
+      } else {
+        val tableList = tableMapIndexSize
+          .map(_._1)
+          .toSeq
+          .union(tableMapDictSize.map(_._1).toSeq)
+          .distinct
+          .sorted
+          .map { uniqueName =>
+            val values = uniqueName.split("\\.")
+            val indexSize = tableMapIndexSize.getOrElse(uniqueName, 0L)
+            val datamapSize = tableMapDatamapSize.getOrElse(uniqueName, 0L)
+            val dictSize = tableMapDictSize.getOrElse(uniqueName, 0L)
+            Row(values(0), values(1), byteCountToDisplaySize(indexSize),
+              byteCountToDisplaySize(datamapSize), byteCountToDisplaySize(dictSize))
+          }
+
+        Seq(
+          Row("ALL", "ALL", byteCountToDisplaySize(allIndexSize),
+            byteCountToDisplaySize(allDatamapSize), byteCountToDisplaySize(allDictSize)),
+          Row(currentDatabase, "ALL", byteCountToDisplaySize(dbIndexSize),
+            byteCountToDisplaySize(dbDatamapSize), byteCountToDisplaySize(dbDictSize))
+        ) ++ tableList
+      }
+    }
+  }
+
+  def showTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
+    val tableName = carbonTable.getTableName
+    val databaseName = carbonTable.getDatabaseName
+    val cache = CacheProvider.getInstance().getCarbonCache()
+    if (cache == null) {
+      Seq.empty
+    } else {
+      val dbLocation = CarbonEnv
+        .getDatabaseLocation(databaseName, sparkSession)
+        .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
+      val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR +
+                      tableName + CarbonCommonConstants.FILE_SEPARATOR
+      var numIndexFilesCached = 0
+
+      // Path -> Name, Type
+      val datamapName = mutable.Map[String, (String, String)]()
+      // Path -> Size
+      val datamapSize = mutable.Map[String, Long]()
+      // parent table
+      datamapName.put(tablePath, ("", ""))
+      datamapSize.put(tablePath, 0)
+      // children tables
+      for( schema <- carbonTable.getTableInfo.getDataMapSchemaList.asScala ) {
+        val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + "_" +
+                   schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR
+        val name = schema.getDataMapName
+        val dmType = schema.getProviderName
+        datamapName.put(path, (name, dmType))
+        datamapSize.put(path, 0)
+      }
+      // index schemas
+      for (schema <- DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+        .asScala) {
+        val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName +
+                   CarbonCommonConstants.FILE_SEPARATOR + schema.getDataMapName +
+                   CarbonCommonConstants.FILE_SEPARATOR
+        val name = schema.getDataMapName
+        val dmType = schema.getProviderName
+        datamapName.put(path, (name, dmType))
+        datamapSize.put(path, 0)
+      }
+
+      var dictSize = 0L
+
+      // dictionary column ids
+      val dictIds = carbonTable
+        .getAllDimensions
+        .asScala
+        .filter(_.isGlobalDictionaryEncoding)
+        .map(_.getColumnId)
+        .toArray
+
+      val cacheIterator = cache.getCacheMap.entrySet().iterator()
+      while (cacheIterator.hasNext) {
+        val entry = cacheIterator.next()
+        val cache = entry.getValue
+
+        if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
+          // index
+          val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+          val pathEntry = datamapSize.filter(entry => indexPath.startsWith(entry._1))
+          if(pathEntry.nonEmpty) {
+            val (path, size) = pathEntry.iterator.next()
+            datamapSize.put(path, size + cache.getMemorySize)
+          }
+          if(indexPath.startsWith(tablePath)) {
+            numIndexFilesCached += 1
+          }
+        } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
+          // bloom datamap
+          val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+          val pathEntry = datamapSize.filter(entry => shardPath.contains(entry._1))
+          if(pathEntry.nonEmpty) {
+            val (path, size) = pathEntry.iterator.next()
+            datamapSize.put(path, size + cache.getMemorySize)
+          }
+        } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
+          // dictionary
+          val dictId = dictIds.find(id => entry.getKey.startsWith(id))
+          if (dictId.isDefined) {
+            dictSize = dictSize + cache.getMemorySize
+          }
+        }
+      }
+
+      // get all index files
+      val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath)
+      val numIndexFilesAll = CarbonDataMergerUtil.getValidSegmentList(absoluteTableIdentifier)
+        .asScala.map {
+          segment =>
+            segment.getCommittedIndexFile
+        }.flatMap {
+        indexFilesMap => indexFilesMap.keySet().toArray
+      }.size
+
+      var result = Seq(
+        Row("Index", byteCountToDisplaySize(datamapSize.get(tablePath).get),
+          numIndexFilesCached + "/" + numIndexFilesAll + " index files cached"),
+        Row("Dictionary", byteCountToDisplaySize(dictSize), "")
+      )
+      for ((path, size) <- datamapSize) {
+        if (path != tablePath) {
+          val (dmName, dmType) = datamapName.get(path).get
+          result = result :+ Row(dmName, byteCountToDisplaySize(size), dmType)
+        }
+      }
+      result
+    }
+  }
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    if (tableIdentifier.isEmpty) {
+      showAllTablesCache(sparkSession)
+    } else {
+      val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
+      if (carbonTable.isChildDataMap) {
+        throw new UnsupportedOperationException("Operation not allowed on child table.")
+      }
+      showTableCache(sparkSession, carbonTable)
+    }
+  }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index d1023fa..a2923b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -33,11 +33,11 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.execution.command.cache.CarbonShowCacheCommand
 import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand,
CarbonShowStreamsCommand}
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonOption
@@ -77,7 +77,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
   protected lazy val startCommand: Parser[LogicalPlan] =
     loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords |
-    alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli
+    alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli | cacheManagement
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -94,6 +94,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val stream: Parser[LogicalPlan] =
     createStream | dropStream | showStreams
 
+  protected lazy val cacheManagement: Parser[LogicalPlan] =
+    showCache
+
   protected lazy val alterAddPartition: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~>
       "(" ~> repsep(stringLit, ",") <~ ")") <~ opt(";") ^^ {
@@ -494,6 +497,11 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           showHistory.isDefined)
     }
 
+  protected lazy val showCache: Parser[LogicalPlan] =
+    SHOW ~> METACACHE ~> opt(ontable) <~ opt(";") ^^ {
+      case table =>
+        CarbonShowCacheCommand(table)
+    }
 
   protected lazy val cli: Parser[LogicalPlan] =
     (CARBONCLI ~> FOR ~> TABLE) ~> (ident <~ ".").? ~ ident ~


Mime
View raw message