carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [carbondata] 03/22: [CARBONDATA-3331] Fix for external table in Show Metacache
Date Thu, 16 May 2019 19:05:36 GMT
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit f0e270667b0be7c7b66d922ddc176ebe4334a695
Author: namanrastogi <naman.rastogi.52@gmail.com>
AuthorDate: Tue Mar 26 19:39:26 2019 +0530

    [CARBONDATA-3331] Fix for external table in Show Metacache
    
    Problem: In SHOW METACACHE command when an external table is queries upon, database size
is more than ALL size for index column. External table is created upon a store on a table
which is also present in current database.
    
    Bug: Index size for database was being summed up blindly for all ths tables. So some cache
entries were being summed up multiple times.
    
    Solution: Compute the database index size while iterating over cache, when cache key contains
path of one of the table's path.
    
    This closes #3164
---
 .../command/cache/CarbonShowCacheCommand.scala     | 91 ++++++++++++----------
 1 file changed, 52 insertions(+), 39 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index 8461bf3..3b85313 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.cache
 import scala.collection.mutable
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.mapred.JobConf
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -28,16 +27,13 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.execution.command.{Checker, MetadataCommand}
 import org.apache.spark.sql.types.StringType
 
-import org.apache.carbondata.core.cache.{CacheProvider, CacheType}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
 import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, ShowTableCacheEvent}
-import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 import org.apache.carbondata.spark.util.CommonUtil.bytesToDisplaySize
 
 
@@ -45,6 +41,8 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
     internalCall: Boolean = false)
   extends MetadataCommand {
 
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   override def output: Seq[AttributeReference] = {
     if (tableIdentifier.isEmpty) {
       Seq(
@@ -71,38 +69,48 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
         Row("ALL", "ALL", 0L, 0L, 0L),
         Row(currentDatabase, "ALL", 0L, 0L, 0L))
     } else {
-      val carbonTables = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-        .listAllTables(sparkSession).filter {
-        carbonTable =>
-          carbonTable.getDatabaseName.equalsIgnoreCase(currentDatabase) &&
-          isValidTable(carbonTable, sparkSession) &&
-          !carbonTable.isChildDataMap
+      var carbonTables = mutable.ArrayBuffer[CarbonTable]()
+      sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
+        tableIdent =>
+          try {
+            val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
+            if (!carbonTable.isChildDataMap) {
+              carbonTables += carbonTable
+            }
+          } catch {
+            case ex: NoSuchTableException =>
+              LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+          }
       }
 
       // All tables of current database
-      var (dbIndexSize, dbDatamapSize, dbDictSize) = (0L, 0L, 0L)
-      val tableList: Seq[Row] = carbonTables.map {
+      var (dbDatamapSize, dbDictSize) = (0L, 0L)
+      val tableList = carbonTables.flatMap {
         carbonTable =>
-          val tableResult = getTableCache(sparkSession, carbonTable)
-          var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
-          tableResult.drop(2).foreach {
-            row =>
-              indexSize += row.getLong(1)
-              datamapSize += row.getLong(2)
-          }
-          val dictSize = tableResult(1).getLong(1)
-
-          dbIndexSize += indexSize
-          dbDictSize += dictSize
-          dbDatamapSize += datamapSize
-
-          val tableName = if (!carbonTable.isTransactionalTable) {
-            carbonTable.getTableName + " (external table)"
+          try {
+            val tableResult = getTableCache(sparkSession, carbonTable)
+            var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
+            tableResult.drop(2).foreach {
+              row =>
+                indexSize += row.getLong(1)
+                datamapSize += row.getLong(2)
+            }
+            val dictSize = tableResult(1).getLong(1)
+
+            dbDictSize += dictSize
+            dbDatamapSize += datamapSize
+
+            val tableName = if (!carbonTable.isTransactionalTable) {
+              carbonTable.getTableName + " (external table)"
+            }
+            else {
+              carbonTable.getTableName
+            }
+            Seq((currentDatabase, tableName, indexSize, datamapSize, dictSize))
+          } catch {
+            case ex: UnsupportedOperationException =>
+              Seq.empty
           }
-          else {
-            carbonTable.getTableName
-          }
-          (currentDatabase, tableName, indexSize, datamapSize, dictSize)
       }.collect {
         case (db, table, indexSize, datamapSize, dictSize) if !((indexSize == 0) &&
                                                                 (datamapSize == 0) &&
@@ -110,13 +118,23 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
           Row(db, table, indexSize, datamapSize, dictSize)
       }
 
+      val tablePaths = carbonTables.map {
+        carbonTable =>
+          carbonTable.getTablePath
+      }
+
       // Scan whole cache and fill the entries for All-Database-All-Tables
+      // and Current-Database-All-Tables
       var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
+      var dbIndexSize = 0L
       cache.getCacheMap.asScala.foreach {
-        case (_, cacheable) =>
+        case (key, cacheable) =>
           cacheable match {
             case _: BlockletDataMapIndexWrapper =>
               allIndexSize += cacheable.getMemorySize
+              if (tablePaths.exists { path => key.startsWith(path) }) {
+                dbIndexSize += cacheable.getMemorySize
+              }
             case _: BloomCacheKeyValue.CacheValue =>
               allDatamapSize += cacheable.getMemorySize
             case _: AbstractColumnDictionaryInfo =>
@@ -217,9 +235,4 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
       }
     }
   }
-
-  def isValidTable(carbonTable: CarbonTable, sparkSession: SparkSession): Boolean = {
-    CarbonEnv.getInstance(sparkSession).carbonMetaStore.tableExists(carbonTable.getTableName,
-      Some(carbonTable.getDatabaseName))(sparkSession)
-  }
 }


Mime
View raw message