Repository: incubator-carbondata
Updated Branches:
refs/heads/master ecf29472e -> 526243b09
fixLoadTableForSpark2
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1b5e7fb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1b5e7fb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1b5e7fb4
Branch: refs/heads/master
Commit: 1b5e7fb442dd99b859b819fad5dea8cbc754e4c2
Parents: ecf2947
Author: QiangCai <qiangcai@qq.com>
Authored: Fri Dec 16 00:26:09 2016 +0800
Committer: QiangCai <qiangcai@qq.com>
Committed: Fri Dec 16 00:46:54 2016 +0800
----------------------------------------------------------------------
.../execution/command/carbonTableSchema.scala | 46 +++++++++++++++++++-
1 file changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1b5e7fb4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 7f74d92..10fffd9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.util.FileUtils
@@ -34,6 +34,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.path.CarbonStorePath
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -43,7 +45,7 @@ import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
/**
@@ -259,6 +261,45 @@ case class DeleteLoadsByLoadDate(
}
+object LoadTable {
+
+ def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
+ sqlContext: SQLContext,
+ model: DictionaryLoadModel,
+ noDictDimension: Array[CarbonDimension]): Unit = {
+
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
+ model.table)
+ val schemaFilePath = carbonTablePath.getSchemaFilePath
+
+ // read TableInfo
+ val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
+
+ // modify TableInfo
+ val columns = tableInfo.getFact_table.getTable_columns
+ for (i <- 0 until columns.size) {
+ if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId)))
{
+ columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
+ }
+ }
+
+ // write TableInfo
+ CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
+
+ // update Metadata
+ val catalog = CarbonEnv.get.carbonMetastore
+ catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
+ model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+
+ // update CarbonDataLoadSchema
+ val carbonTable = catalog.lookupRelation(Option(model.table.getDatabaseName),
+ model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
+ .carbonTable
+ carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+ }
+
+}
+
case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
{
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def run(sparkSession: SparkSession): Seq[Row] = {
@@ -436,6 +477,7 @@ case class LoadTable(
carbonLoadModel.setCsvHeader(fileHeader)
carbonLoadModel.setColDictFilePath(columnDict)
carbonLoadModel.setDirectLoad(true)
+ GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
GlobalDictionaryUtil
.generateGlobalDictionary(
sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath, dataFrame)
|