carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3666] Avoided listing of table dir in refresh command
Date Wed, 22 Jan 2020 05:00:22 GMT
This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8865671  [CARBONDATA-3666] Avoided listing of table dir in refresh command
8865671 is described below

commit 8865671c32b1cf450ecc1fdc8c278904fe4a8c3f
Author: kunal642 <kunalkapoor642@gmail.com>
AuthorDate: Thu Jan 16 14:28:57 2020 +0530

    [CARBONDATA-3666] Avoided listing of table dir in refresh command
    
    Why is this PR needed?
    Currently if a refresh command is fired on a parquet table using carbon session then carbon
will list all the tables and check whether the table exists or not, then we check if the schema
file exists or not by listing the Metadata folder. This can be a problem in cloud scenarios
as the listing on S3 is slow.
    
    What changes were proposed in this PR?
    get the metadata for the specified table, Then go for table listing only if the provider
is carbon or the table is not registered in hive
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3581
---
 .../apache/carbondata/spark/util/CommonUtil.scala  | 10 +++++++
 .../apache/spark/sql/hive/CarbonSessionUtil.scala  | 26 +++++++++---------
 .../management/RefreshCarbonTableCommand.scala     | 31 +++++++++++++---------
 .../spark/sql/execution/strategy/DDLStrategy.scala | 16 +++++------
 .../spark/sql/hive/CarbonFileMetastore.scala       | 18 +++++--------
 5 files changed, 53 insertions(+), 48 deletions(-)

diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f0fe08b..e70fc24 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
 import org.apache.spark.util.FileUtils
 
@@ -832,4 +833,13 @@ object CommonUtil {
     }
     displaySize
   }
+
+  def isCarbonDataSource(catalogTable: CatalogTable): Boolean = {
+    catalogTable.provider match {
+      case Some(x) => x.equalsIgnoreCase("org.apache.spark.sql.CarbonSource") ||
+                      x.equalsIgnoreCase("carbondata")
+      case None => false
+    }
+  }
+
 }
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index e3f1d3f..968738a 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -35,6 +35,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
+import org.apache.carbondata.spark.util.CommonUtil
+
 /**
  * This class refresh the relation from cache if the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table.
@@ -59,20 +61,16 @@ object CarbonSessionUtil {
      * Set the stats to none in case of carbontable
      */
     def setStatsNone(catalogTable: CatalogTable): Unit = {
-      catalogTable.provider match {
-        case Some(provider)
-          if provider.equals("org.apache.spark.sql.CarbonSource") ||
-             provider.equalsIgnoreCase("carbondata") =>
-          // Update stats to none in case of carbon table as we are not expecting any stats
from
-          // Hive. Hive gives wrong stats for carbon table.
-          catalogTable.stats match {
-            case Some(stats) =>
-              CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", None)
-            case _ =>
-          }
-          isRelationRefreshed =
-            CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
-        case _ =>
+      if (CommonUtil.isCarbonDataSource(catalogTable)) {
+        // Update stats to none in case of carbon table as we are not expecting any stats
from
+        // Hive. Hive gives wrong stats for carbon table.
+        catalogTable.stats match {
+          case Some(stats) =>
+            CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", None)
+          case _ =>
+        }
+        isRelationRefreshed =
+          CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
       }
     }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 9251cf0..17e628f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
@@ -41,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent,
RefreshTablePreExecutionEvent}
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Command to register carbon table from existing carbon table data
@@ -52,24 +54,31 @@ case class RefreshCarbonTableCommand(
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     setAuditTable(databaseName, tableName)
     // Steps
-    // 1. get table path
-    // 2. perform the below steps
-    // 2.1 check if the table already register with hive then ignore and continue with the
next
-    // schema
+    // 1. Get Table Metadata from spark.
+    // 2 Perform below steps:
+    // 2.1 If table exists then check if provider if carbon. If yes then go for carbon
+    // refresh otherwise no need to do anything.
+    // 2.1.1 If table does not exists then consider the table as carbon and check for schema
file
+    // existence.
     // 2.2 register the table with the hive check if the table being registered has aggregate
table
     // then do the below steps
     // 2.2.1 validate that all the aggregate tables are copied at the store location.
     // 2.2.2 Register the aggregate tables
-    val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName.toLowerCase)(sparkSession)
-    val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName.toLowerCase)
     // 2.1 check if the table already register with hive then ignore and continue with the
next
     // schema
-    if (!sparkSession.sessionState.catalog.listTables(databaseName)
-      .exists(_.table.equalsIgnoreCase(tableName))) {
+    val isCarbonDataSource = try {
+      CommonUtil.isCarbonDataSource(sparkSession.sessionState.catalog
+        .getTableMetadata(TableIdentifier(tableName, databaseNameOp)))
+    } catch {
+      case _: NoSuchTableException =>
+        true
+    }
+    if (isCarbonDataSource) {
+      val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName.toLowerCase)(sparkSession)
+      val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName.toLowerCase)
       // check the existence of the schema file to know its a carbon table
       val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
       // if schema file does not exist then the table will either non carbon table or stale
@@ -106,9 +115,7 @@ case class RefreshCarbonTableCommand(
         }
       }
     }
-    RefreshTable(
-      TableIdentifier(identifier.getTableName, Option(identifier.getDatabaseName))
-    ).run(sparkSession)
+    RefreshTable(TableIdentifier(tableName, Option(databaseName))).run(sparkSession)
   }
 
   /**
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 80d3044..68f7442 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand,
C
 import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Carbon strategies for ddl commands
@@ -147,20 +149,20 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy
{
         if isCarbonTable(truncateTable.tableName) =>
         ExecutedCommandExec(CarbonTruncateCommand(truncateTable)) :: Nil
       case createTable@org.apache.spark.sql.execution.datasources.CreateTable(_, _, None)
-        if isCarbonDataSourceTable(createTable.tableDesc) =>
+        if CommonUtil.isCarbonDataSource(createTable.tableDesc) =>
         ExecutedCommandExec(DDLHelper.createDataSourceTable(createTable, sparkSession)) ::
Nil
       case MatchCreateDataSourceTable(tableDesc, mode, query)
-        if isCarbonDataSourceTable(tableDesc) =>
+        if CommonUtil.isCarbonDataSource(tableDesc) =>
         ExecutedCommandExec(
           DDLHelper.createDataSourceTableAsSelect(tableDesc, query, mode, sparkSession)
         ) :: Nil
       case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, query)
-        if isCarbonDataSourceTable(tableDesc) =>
+        if CommonUtil.isCarbonDataSource(tableDesc) =>
         ExecutedCommandExec(
           DDLHelper.createDataSourceTableAsSelect(tableDesc, query.get, mode, sparkSession)
         ) :: Nil
       case createTable@CreateDataSourceTableCommand(table, _)
-        if isCarbonDataSourceTable(table) =>
+        if CommonUtil.isCarbonDataSource(table) =>
         ExecutedCommandExec(
           DDLHelper.createDataSourceTable(createTable, sparkSession)
         ) :: Nil
@@ -195,12 +197,6 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
     CarbonPlanHelper.isCarbonTable(tableIdent, sparkSession)
   }
 
-  private def isCarbonDataSourceTable(table: CatalogTable): Boolean = {
-    table.provider.get != DDLUtils.HIVE_PROVIDER &&
-    (table.provider.get.equals("org.apache.spark.sql.CarbonSource") ||
-     table.provider.get.equalsIgnoreCase("carbondata"))
-  }
-
   private def isCarbonHiveTable(table: CatalogTable): Boolean = {
     table.provider.isDefined &&
     DDLUtils.HIVE_PROVIDER == table.provider.get &&
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 6c7b1f2..15b2a51 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -52,7 +52,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.util.CarbonSparkUtil
+import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
 
 case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
   // use to lock the carbonTables
@@ -216,12 +216,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
               "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
         val catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
-        catalogTable.provider match {
-          case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
-            || name.equalsIgnoreCase("carbondata")) => name
-          case _ =>
-            CarbonMetadata.getInstance().removeTable(database, tableIdentifier.table)
-            throw new NoSuchTableException(database, tableIdentifier.table)
+        if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+          CarbonMetadata.getInstance().removeTable(database, tableIdentifier.table)
+          throw new NoSuchTableException(database, tableIdentifier.table)
         }
         val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
            catalogTable.location.toString, database, tableIdentifier.table)
@@ -540,11 +537,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
               "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
         val catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
-        catalogTable.provider match {
-          case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
-            || name.equalsIgnoreCase("carbondata")) => name
-          case _ =>
-            throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table)
+        if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+          throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table)
         }
         val tableLocation = catalogTable.storage.locationUri match {
           case tableLoc@Some(uri) =>


Mime
View raw message