carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3662] Changes to show metacache command
Date Wed, 22 Jan 2020 12:49:54 GMT
This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c78cf66  [CARBONDATA-3662] Changes to show metacache command
c78cf66 is described below

commit c78cf668e5109d66c6119831083770de6c615dcd
Author: Vikram Ahuja <vikramahuja8803@gmail.com>
AuthorDate: Wed Jan 8 11:28:02 2020 +0530

    [CARBONDATA-3662] Changes to show metacache command
    
    Why is this PR needed?
    1. There is no command to show the cache occupied by each of the
    executors(only applicable for index server).
    This command will not show per table, only per node
    2. Show metacache command does not show the complete cache used used by IndexServer side
and Driver side. It just shows the total of both of them.
    
    What changes were proposed in this PR?
    1. To change show metacache command to show executor metacache, where the "executor" keyword
is optional.
    2. Added TOTAL cache value separately for Index Server and Driver.
    
    This closes #3565
---
 docs/ddl-of-carbondata.md                          |   8 +
 .../sql/commands/TestCarbonShowCacheCommand.scala  |   5 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   1 +
 .../indexserver/DistributedShowCacheRDD.scala      |  22 ++-
 .../carbondata/indexserver/IndexServer.scala       |  27 ++--
 .../command/cache/CarbonShowCacheCommand.scala     | 163 +++++++++++++--------
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |   6 +-
 7 files changed, 149 insertions(+), 83 deletions(-)

diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 7171ff3..6f74266 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -1027,6 +1027,14 @@ Users can specify which columns to include and exclude for local dictionary
gene
   datamaps. This also shows the cache usage by all the tables and children tables in the
current 
   database.
   
+   ```sql
+    SHOW EXECUTOR METACACHE
+   ``` 
+    
+   This shows the overall memory consumed by the cache in each executor of the Index
+   Server. This command is only allowed when the carbon property `carbon.enable.index.server`

+   is set to true.
+  
   ```sql
   SHOW METACACHE ON TABLE tableName
   ```
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
index 9818847..94b1566 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
@@ -179,13 +179,12 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll
{
     // Empty database
     sql("use cache_empty_db").collect()
     val result1 = sql("show metacache").collect()
-    assertResult(2)(result1.length)
-    assertResult(Row("cache_empty_db", "ALL", "0 B", "0 B", "0 B", "DRIVER"))(result1(1))
+    assertResult(0)(result1.length)
 
     // Database with 3 tables but only 2 are in cache
     sql("use cache_db").collect()
     val result2 = sql("show metacache").collect()
-    assertResult(4)(result2.length)
+    assertResult(3)(result2.length)
 
     // Make sure PreAgg tables are not in SHOW METADATA
     sql("use default").collect()
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 6f3e29f..cfc5425 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -119,6 +119,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
   protected val WITH = carbonKeyWord("WITH")
   protected val AGGREGATETABLE = carbonKeyWord("AGGREGATETABLE")
   protected val ABS = carbonKeyWord("abs")
+  protected val EXECUTOR = carbonKeyWord("EXECUTOR")
 
   protected val FOR = carbonKeyWord("FOR")
   protected val SCRIPTS = carbonKeyWord("SCRIPTS")
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
index 2c9721d..542f140 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
@@ -18,7 +18,7 @@ package org.apache.carbondata.indexserver
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.hive.DistributionUtil
 
@@ -27,7 +27,9 @@ import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactor
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.spark.rdd.CarbonRDD
 
-class DistributedShowCacheRDD(@transient private val ss: SparkSession, tableUniqueId: String)
+class DistributedShowCacheRDD(@transient private val ss: SparkSession,
+    tableUniqueId: String,
+    executorCache: Boolean)
   extends CarbonRDD[String](ss, Nil) {
 
   val executorsList: Array[String] = DistributionUtil
@@ -71,10 +73,20 @@ class DistributedShowCacheRDD(@transient private val ss: SparkSession,
tableUniq
                 .getTableUniqueName
             } else {
               dataMap.getDataMapSchema.getRelationIdentifier.getDatabaseName + "_" + dataMap
-              .getDataMapSchema.getDataMapName
+                .getDataMapSchema.getDataMapName
+            }
+            if (executorCache) {
+              val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
+                SparkEnv.get.blockManager.blockManagerId.executorId
+              }"
+              s"${ executorIP }:${ dataMap.getDataMapFactory.getCacheSize }:${
+                dataMap.getDataMapSchema.getProviderName
+              }"
+            } else {
+              s"${dataMapName}:${dataMap.getDataMapFactory.getCacheSize}:${
+                dataMap.getDataMapSchema.getProviderName
+              }"
             }
-            s"${ dataMapName }:${ dataMap.getDataMapFactory.getCacheSize }:${
-              dataMap.getDataMapSchema.getProviderName}"
           }
         sizeAndIndexLengths
     }.flatten.toIterator
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 5fad12f..fc2df80 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -57,7 +57,7 @@ trait ServerInterface {
   /**
    * Get the cache size for the specified tables.
    */
-  def showCache(tableIds: String) : Array[String]
+  def showCache(tableIds: String, executorCache: Boolean) : Array[String]
 
   /**
    * Invalidate the cache for the specified segments only. Used in case of compaction/Update/Delete.
@@ -202,15 +202,22 @@ object IndexServer extends ServerInterface {
     }
   }
 
-  override def showCache(tableId: String = ""): Array[String] = doAs {
-    val jobgroup: String = "Show Cache " + (tableId match {
-      case "" => "for all tables"
-      case table => s"for $table"
-    })
-    val sparkSession = SparkSQLUtil.getSparkSession
-    sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", UUID.randomUUID().toString)
-    sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
-    new DistributedShowCacheRDD(sparkSession, tableId).collect()
+  override def showCache(tableId: String = "", executorCache: Boolean): Array[String] = {
+    doAs {
+      val jobgroup: String = "Show Cache " + (tableId match {
+        case "" =>
+          if (executorCache) {
+            "for all the Executors."
+          } else {
+            "for all tables."
+          }
+        case table => s"for $table"
+      })
+      val sparkSession = SparkSQLUtil.getSparkSession
+      sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", UUID.randomUUID().toString)
+      sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
+      new DistributedShowCacheRDD(sparkSession, tableId, executorCache).collect()
+    }
   }
 
   def main(args: Array[String]): Unit = {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index a922c98..7f6889d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -20,6 +20,7 @@ import scala.collection.mutable
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -41,25 +42,37 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
 import org.apache.carbondata.spark.util.CommonUtil.bytesToDisplaySize
 
 
-case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
+case class CarbonShowCacheCommand(showExecutorCache: Boolean,
+    tableIdentifier: Option[TableIdentifier],
     internalCall: Boolean = false)
   extends MetadataCommand {
 
   private lazy val cacheResult: Seq[(String, Int, Long, String)] = {
-    executeJobToGetCache(List())
+    executeJobToGetCache(List(), showExecutorCache)
   }
 
   private val LOGGER = LogServiceFactory.getLogService(classOf[CarbonShowCacheCommand].getName)
 
   override def output: Seq[AttributeReference] = {
     if (tableIdentifier.isEmpty) {
-      Seq(
-        AttributeReference("Database", StringType, nullable = false)(),
-        AttributeReference("Table", StringType, nullable = false)(),
-        AttributeReference("Index size", StringType, nullable = false)(),
-        AttributeReference("Datamap size", StringType, nullable = false)(),
-        AttributeReference("Dictionary size", StringType, nullable = false)(),
-        AttributeReference("Cache Location", StringType, nullable = false)())
+      val isDistributedPruningEnabled = CarbonProperties.getInstance()
+        .isDistributedPruningEnabled("", "")
+      if (showExecutorCache) {
+        if (isDistributedPruningEnabled) {
+          Seq(
+            AttributeReference("Executor ID", StringType, nullable = false)(),
+            AttributeReference("Index Size", StringType, nullable = false)())
+        } else {
+          Seq()
+        }
+      } else {
+        Seq(
+          AttributeReference("Identifier", StringType, nullable = false)(),
+          AttributeReference("Index size", StringType, nullable = false)(),
+          AttributeReference("Datamap size", StringType, nullable = false)(),
+          AttributeReference("Dictionary size", StringType, nullable = false)(),
+          AttributeReference("Cache Location", StringType, nullable = false)())
+      }
     } else {
       Seq(
         AttributeReference("Field", StringType, nullable = false)(),
@@ -74,7 +87,15 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
       /**
        * Assemble result for database
        */
-      getAllTablesCache(sparkSession)
+      if (!showExecutorCache) {
+        getAllTablesCache(sparkSession)
+      }
+      /**
+       * Assemble result for all Index Server executors
+       */
+      else {
+        getAllExecutorCache(sparkSession)
+      }
     } else {
       /**
        * Assemble result for table
@@ -87,7 +108,8 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
       val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled
       (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase),
         tableIdentifier.get.table)) {
-        getTableCacheFromIndexServer(carbonTable, numberOfIndexFiles)(sparkSession)
+        getTableCacheFromIndexServer(carbonTable,
+          numberOfIndexFiles)(showExecutorCache)(sparkSession)
       } else { Seq() }
       val result = driverRawResults.slice(0, 2) ++
                    driverRawResults.drop(2).map { row =>
@@ -110,13 +132,40 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
     }
   }
 
+  def getAllExecutorCache(sparkSession: SparkSession): Seq[Row] = {
+    val isDistributedPruningEnabled = CarbonProperties.getInstance()
+      .isDistributedPruningEnabled("", "")
+    if (!isDistributedPruningEnabled) {
+      // Block here. this feature is only with index server enabled
+      throw new UnsupportedOperationException(
+        "Show Executor Metacache is only avalable with Index Server Enabled")
+    } else {
+      // get all the executor details from the index server
+      try {
+        val executorCacheValue = executeJobToGetCache(List(), showExecutorCache)
+        val result = executorCacheValue.flatMap {
+          iterator =>
+            Seq(Row(iterator._1, bytesToDisplaySize(iterator._3)))
+        }
+        result
+      }
+      catch {
+        case ex: Exception =>
+          LOGGER.error("Error while getting cache from the Index Server", ex)
+          Seq()
+      }
+    }
+  }
+
   def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
     val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
     val cache = CacheProvider.getInstance().getCarbonCache
     val isDistributedPruningEnabled = CarbonProperties.getInstance()
       .isDistributedPruningEnabled("", "")
-    if (cache == null && !isDistributedPruningEnabled) {
-      return makeEmptyCacheRows(currentDatabase)
+    if (!isDistributedPruningEnabled) {
+      if (cache == null || cache.getCurrentSize == 0) {
+        return Seq()
+      }
     }
     var carbonTables = mutable.ArrayBuffer[CarbonTable]()
     sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
@@ -127,6 +176,8 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
             carbonTables += carbonTable
           }
         } catch {
+          case ex: AnalysisException =>
+            LOGGER.debug("Unable to access Carbon table object for table" + tableIdent.table)
           case _: NoSuchTableException =>
             LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
         }
@@ -135,12 +186,15 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
       carbonTables.flatMap {
         mainTable =>
           try {
-            makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), mainTable)
+            makeRows(getTableCacheFromIndexServer(mainTable)(showExecutorCache)(sparkSession),
+              mainTable)
           } catch {
             case ex: UnsupportedOperationException => Seq()
           }
       }
-    } else { Seq() }
+    } else {
+      Seq()
+    }
 
     val driverRows = if (cache != null) {
       carbonTables.flatMap {
@@ -151,16 +205,16 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
             case ex: UnsupportedOperationException => Seq()
           }
       }
-    } else { Seq() }
+    } else {
+      Seq()
+    }
 
-    val (driverdbIndexSize, driverdbDatamapSize, driverdbDictSize) = calculateDBIndexAndDatamapSize(
-      driverRows)
     val (indexdbIndexSize, indexdbDatamapSize, indexAllDictSize) = calculateDBIndexAndDatamapSize(
       indexServerRows)
     val (indexAllIndexSize, indexAllDatamapSize) = if (isDistributedPruningEnabled) {
       getIndexServerCacheSizeForCurrentDB
     } else {
-      (0, 0)
+      (0L, 0L)
     }
     val driverDisplayRows = if (cache != null) {
       val tablePaths = carbonTables.map {
@@ -169,41 +223,29 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
       }
       val (driverIndexSize, driverDatamapSize, allDictSize) = getAllDriverCacheSize(tablePaths
         .toList)
-      if (driverRows.nonEmpty) {
+      if (driverIndexSize + driverDatamapSize + allDictSize != 0 && driverRows.size!=0)
{
         (Seq(
-          Row("ALL", "ALL", driverIndexSize, driverDatamapSize, allDictSize, "DRIVER"),
-          Row(currentDatabase,
-            "ALL",
-            driverdbIndexSize,
-            driverdbDatamapSize,
-            driverdbDictSize,
-            "DRIVER")
+          Row("TOTAL", driverIndexSize, driverDatamapSize, allDictSize, "DRIVER")
         ) ++ driverRows).collect {
-          case row if row.getLong(2) != 0L || row.getLong(3) != 0L || row.getLong(4) != 0L
=>
-            Row(row(0), row(1), bytesToDisplaySize(row.getLong(2)),
-              bytesToDisplaySize(row.getLong(3)), bytesToDisplaySize(row.getLong(4)), "DRIVER")
+          case row if row.getLong(1) != 0L || row.getLong(2) != 0L || row.getLong(3) != 0L
=>
+            Row(row(0), bytesToDisplaySize(row.getLong(1)),
+              bytesToDisplaySize(row.getLong(2)), bytesToDisplaySize(row.getLong(3)), "DRIVER")
         }
       } else {
-        makeEmptyCacheRows(currentDatabase)
+        Seq()
       }
     } else {
-      makeEmptyCacheRows(currentDatabase)
+      Seq()
     }
 
-    //      val (serverIndexSize, serverDataMapSize) = getAllIndexServerCacheSize
-    val indexDisplayRows = if (indexServerRows.nonEmpty) {
+    val indexDisplayRows = if (indexAllIndexSize + indexAllDatamapSize != 0 &&
+                               indexServerRows.size != 0) {
       (Seq(
-        Row("ALL", "ALL", indexAllIndexSize, indexAllDatamapSize, indexAllDictSize, "INDEX
SERVER"),
-        Row(currentDatabase,
-          "ALL",
-          indexdbIndexSize,
-          indexdbDatamapSize,
-          driverdbDictSize,
-          "INDEX SERVER")
+        Row("TOTAL", indexAllIndexSize, indexAllDatamapSize, indexAllDictSize, "INDEX SERVER")
       ) ++ indexServerRows).collect {
-        case row if row.getLong(2) != 0L || row.getLong(3) != 0L || row.getLong(4) != 0L
=>
-          Row(row.get(0), row.get(1), bytesToDisplaySize(row.getLong(2)),
-            bytesToDisplaySize(row.getLong(3)), bytesToDisplaySize(row.getLong(4)), "INDEX
SERVER")
+        case row if row.getLong(1) != 0L || row.getLong(2) != 0L || row.getLong(3) != 0L
=>
+          Row(row.get(0), bytesToDisplaySize(row.getLong(1)),
+            bytesToDisplaySize(row.getLong(2)), bytesToDisplaySize(row.getLong(3)), "INDEX
SERVER")
       }
     } else {
       Seq()
@@ -264,17 +306,10 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
 
   override protected def opName: String = "SHOW CACHE"
 
-  private def makeEmptyCacheRows(currentDatabase: String) = {
-    Seq(
-      Row("ALL", "ALL", bytesToDisplaySize(0), bytesToDisplaySize(0), bytesToDisplaySize(0),
""),
-      Row(currentDatabase, "ALL", bytesToDisplaySize(0), bytesToDisplaySize(0),
-        bytesToDisplaySize(0), "DRIVER"))
-  }
-
   private def calculateDBIndexAndDatamapSize(rows: Seq[Row]): (Long, Long, Long) = {
     rows.map {
       row =>
-        (row(2).asInstanceOf[Long], row(3).asInstanceOf[Long], row.get(4).asInstanceOf[Long])
+        (row(1).asInstanceOf[Long], row(2).asInstanceOf[Long], row.get(3).asInstanceOf[Long])
     }.fold((0L, 0L, 0L)) {
       case (a, b) =>
         (a._1 + b._1, a._2 + b._2, a._3 + b._3)
@@ -289,17 +324,21 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
         datamapSize += row.getLong(2)
     }
     val dictSize = tableResult(1).getLong(1)
-    Seq(Row(carbonTable.getDatabaseName, carbonTable.getTableName,
-      indexSize,
-      datamapSize,
-      dictSize))
+    if (indexSize == 0 && datamapSize == 0 && dictSize == 0) {
+      Seq()
+    } else {
+      Seq(Row(carbonTable.getDatabaseName + "." + carbonTable.getTableName,
+        indexSize,
+        datamapSize,
+        dictSize))
+    }
   }
 
-  private def getTableCacheFromIndexServer(mainTable: CarbonTable, numberOfIndexFiles: Int
= 0)
-    (sparkSession: SparkSession): Seq[Row] = {
+  private def getTableCacheFromIndexServer(mainTable: CarbonTable,
+      numberOfIndexFiles: Int = 0)(executorCache: Boolean)(sparkSession: SparkSession): Seq[Row]
= {
     val childTables = getChildTableList(mainTable)(sparkSession)
     val cache = if (tableIdentifier.nonEmpty) {
-      executeJobToGetCache(childTables.map(_._3) ++ List(mainTable.getTableId))
+      executeJobToGetCache(childTables.map(_._3) ++ List(mainTable.getTableId), executorCache)
     } else {
       cacheResult
     }
@@ -335,11 +374,11 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
 
   }
 
-  private def executeJobToGetCache(tableIds: List[String]): Seq[(String, Int, Long,
-    String)] = {
+  private def executeJobToGetCache(tableIds: List[String], executorCache: Boolean):
+      Seq[(String, Int, Long, String)] = {
     try {
       val (result, time) = CarbonScalaUtil.logTime {
-        IndexServer.getClient.showCache(tableIds.mkString(",")).map(_.split(":"))
+        IndexServer.getClient.showCache(tableIds.mkString(","), executorCache).map(_.split(":"))
           .groupBy(_.head).map { t =>
           var sum = 0L
           var length = 0
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 2480f7e..102a3df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -549,9 +549,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   protected lazy val showCache: Parser[LogicalPlan] =
-    SHOW ~> METACACHE ~> opt(ontable) <~ opt(";") ^^ {
-      case table =>
-        CarbonShowCacheCommand(table)
+    (SHOW ~> opt(EXECUTOR) <~ METACACHE) ~ opt(ontable) <~ opt(";") ^^ {
+      case (executor ~ table) =>
+        CarbonShowCacheCommand(executor.isDefined, table)
     }
 
   protected lazy val dropCache: Parser[LogicalPlan] =


Mime
View raw message