carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: fixLoadTableForSpark2
Date Fri, 16 Dec 2016 01:49:28 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master ecf29472e -> 526243b09


fixLoadTableForSpark2


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1b5e7fb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1b5e7fb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1b5e7fb4

Branch: refs/heads/master
Commit: 1b5e7fb442dd99b859b819fad5dea8cbc754e4c2
Parents: ecf2947
Author: QiangCai <qiangcai@qq.com>
Authored: Fri Dec 16 00:26:09 2016 +0800
Committer: QiangCai <qiangcai@qq.com>
Committed: Fri Dec 16 00:46:54 2016 +0800

----------------------------------------------------------------------
 .../execution/command/carbonTableSchema.scala   | 46 +++++++++++++++++++-
 1 file changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1b5e7fb4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 7f74d92..10fffd9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
 import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.util.FileUtils
 
@@ -34,6 +34,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -43,7 +45,7 @@ import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
 
 /**
@@ -259,6 +261,45 @@ case class DeleteLoadsByLoadDate(
 
 }
 
+object LoadTable {
+
+  def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      model: DictionaryLoadModel,
+      noDictDimension: Array[CarbonDimension]): Unit = {
+
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
+      model.table)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+
+    // read TableInfo
+    val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
+
+    // modify TableInfo
+    val columns = tableInfo.getFact_table.getTable_columns
+    for (i <- 0 until columns.size) {
+      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId)))
{
+        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
+      }
+    }
+
+    // write TableInfo
+    CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
+
+    // update Metadata
+    val catalog = CarbonEnv.get.carbonMetastore
+    catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
+      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+
+    // update CarbonDataLoadSchema
+    val carbonTable = catalog.lookupRelation(Option(model.table.getDatabaseName),
+      model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+  }
+
+}
+
 case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
{
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   def run(sparkSession: SparkSession): Seq[Row] = {
@@ -436,6 +477,7 @@ case class LoadTable(
         carbonLoadModel.setCsvHeader(fileHeader)
         carbonLoadModel.setColDictFilePath(columnDict)
         carbonLoadModel.setDirectLoad(true)
+        GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
         GlobalDictionaryUtil
           .generateGlobalDictionary(
           sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath, dataFrame)


Mime
View raw message