carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [carbondata] branch master updated: [HOTFIX] Fix InsertFromStage complex data type issue for partition table
Date Fri, 03 Jan 2020 08:59:34 GMT
This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new eb0848f  [HOTFIX] Fix InsertFromStage complex data type issue for partition table
eb0848f is described below

commit eb0848f882ca9b80c53a7ba00179d4e70ebe68af
Author: liuzhi <371684521@qq.com>
AuthorDate: Thu Jan 2 22:43:55 2020 +0800

    [HOTFIX] Fix InsertFromStage complex data type issue for partition table
    
    Why is this PR needed?
    
    CarbonInsertFromStageCommand with complex data type are not working fine.
    
    What changes were proposed in this PR?
    
    For the partition table, the complex data type of target table should be converted to
the binary data type.
    
    Does this PR introduce any user interface change?
    
    No
    
    Is any new testcase added?
    
    Yes
    
    This closes #3556
---
 .../apache/carbondata/core/util/DataTypeUtil.java  |  33 ++---
 .../org/apache/carbon/flink/CarbonLocalWriter.java |   3 +
 .../org/apache/carbon/flink/CarbonS3Writer.java    |   3 +
 .../carbon/flink/TestCarbonPartitionWriter.scala   | 103 +++++++++++++--
 .../org/apache/carbon/flink/TestCarbonWriter.scala |  10 +-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  20 +--
 .../org/apache/spark/sql/util/SparkSQLUtil.scala   |  10 +-
 .../management/CarbonInsertFromStageCommand.scala  | 140 +++++++++++----------
 .../command/management/CarbonLoadDataCommand.scala |  29 +----
 9 files changed, 214 insertions(+), 137 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index c07f08b..5471420 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -75,7 +75,7 @@ public final class DataTypeUtil {
   /**
    * DataType converter for different computing engines
    */
-  private static final ThreadLocal<DataTypeConverter> converter = new ThreadLocal<>();
+  private static DataTypeConverter converter;
 
   /**
    * This method will convert a given value to its specific type
@@ -84,8 +84,8 @@ public final class DataTypeUtil {
    * @param dataType
    * @return
    */
-  public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
-      int scale, int precision) {
+  public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
int scale,
+      int precision) {
     return getMeasureValueBasedOnDataType(msrValue, dataType, scale, precision, false);
   }
 
@@ -105,7 +105,7 @@ public final class DataTypeUtil {
           new BigDecimal(msrValue).setScale(scale, RoundingMode.HALF_UP);
       BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
       if (useConverter) {
-        return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
+        return converter.convertFromBigDecimalToDecimal(decimal);
       } else {
         return decimal;
       }
@@ -140,11 +140,10 @@ public final class DataTypeUtil {
     if (dataType == DataTypes.BOOLEAN) {
       return BooleanConvert.parseBoolean(dimValue);
     } else if (DataTypes.isDecimal(dataType)) {
-      BigDecimal bigDecimal =
-          new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
+      BigDecimal bigDecimal = new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
       BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
       if (useConverter) {
-        return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
+        return converter.convertFromBigDecimalToDecimal(decimal);
       } else {
         return decimal;
       }
@@ -457,7 +456,7 @@ public final class DataTypeUtil {
       }
     } else {
       // Default action for String/Varchar
-      return getDataTypeConverter().convertFromStringToUTF8String(dimensionValue);
+      return converter.convertFromStringToUTF8String(dimensionValue);
     }
   }
 
@@ -518,7 +517,7 @@ public final class DataTypeUtil {
     } else if (actualDataType == DataTypes.LONG) {
       return ByteUtil.toXorBytes((Long) dimensionValue);
     } else if (actualDataType == DataTypes.TIMESTAMP) {
-      return ByteUtil.toXorBytes((Long)dimensionValue);
+      return ByteUtil.toXorBytes((Long) dimensionValue);
     } else {
       // Default action for String/Varchar
       return ByteUtil.toBytes(dimensionValue.toString());
@@ -970,11 +969,12 @@ public final class DataTypeUtil {
 
   /**
    * set the data type converter as per computing engine
+   *
    * @param converterLocal
    */
   public static void setDataTypeConverter(DataTypeConverter converterLocal) {
     if (converterLocal != null) {
-      converter.set(converterLocal);
+      converter = converterLocal;
       timeStampformatter.remove();
       dateformatter.remove();
     }
@@ -989,17 +989,10 @@ public final class DataTypeUtil {
   }
 
   public static DataTypeConverter getDataTypeConverter() {
-    DataTypeConverter dataTypeConverter = converter.get();
-    if (dataTypeConverter == null) {
-      synchronized (converter) {
-        dataTypeConverter = converter.get();
-        if (dataTypeConverter == null) {
-          dataTypeConverter = new DataTypeConverterImpl();
-          converter.set(dataTypeConverter);
-        }
-      }
+    if (converter == null) {
+      converter = new DataTypeConverterImpl();
     }
-    return dataTypeConverter;
+    return converter;
   }
 
   public static DataType valueOf(String name) {
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
index c24c3bf..ac39bd0 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -113,6 +113,9 @@ final class CarbonLocalWriter extends CarbonWriter {
     synchronized (this) {
       if (!this.flushed) {
         this.closeWriters();
+        this.commit();
+        this.writerFactory.reset();
+        this.writeCount.set(0);
         this.flushed = true;
       }
     }
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
index 0c8ccbd..1d3ec6b 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -120,6 +120,9 @@ final class CarbonS3Writer extends CarbonWriter {
     synchronized (this) {
       if (!this.flushed) {
         this.closeWriters();
+        this.commit();
+        this.writerFactory.reset();
+        this.writeCount.set(0);
         this.flushed = true;
       }
     }
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index cc3c4b4..447e83e 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -71,11 +71,11 @@ class TestCarbonPartitionWriter extends QueryTest {
       environment.enableCheckpointing(2000L)
       environment.setRestartStrategy(RestartStrategies.noRestart)
 
-      val dataCount = 10000
+      val dataCount = 1000
       val source = new TestSource(dataCount) {
         @throws[InterruptedException]
         override def get(index: Int): Array[AnyRef] = {
-          val data = new Array[AnyRef](5)
+          val data = new Array[AnyRef](7)
           data(0) = "test" + index
           data(1) = index.asInstanceOf[AnyRef]
           data(2) = 12345.asInstanceOf[AnyRef]
@@ -86,7 +86,7 @@ class TestCarbonPartitionWriter extends QueryTest {
 
         @throws[InterruptedException]
         override def onFinish(): Unit = {
-          Thread.sleep(30000L)
+          Thread.sleep(5000L)
         }
       }
       val stream = environment.addSource(source)
@@ -118,18 +118,99 @@ class TestCarbonPartitionWriter extends QueryTest {
       assertResult(false)(FileFactory
         .getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
 
-      // ensure the carbon data file count in data directory
-      // is same of the data file count which stage files recorded.
-      assertResult(true)(FileFactory.getCarbonFile(dataLocation).listFiles().length ==
-        collectStageInputs(CarbonTablePath.getStageDir(tablePath)).map(
-          stageInput =>
-            stageInput.getLocations.asScala.map(location => location.getFiles.size()).sum
-        ).sum
+      sql(s"INSERT INTO $tableName STAGE")
+
+      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+
+    } finally {
+      sql(s"drop table if exists $tableName").collect()
+      delDir(new File(dataPath))
+    }
+  }
+
+  @Test
+  def testComplexType(): Unit = {
+    sql(s"drop table if exists $tableName").collect()
+    sql(
+      s"""
+         | CREATE TABLE $tableName (stringField string, intField int, shortField short,
+         | structField struct<value1:string,value2:int,value3:int>, binaryField struct<value1:binary>)
+         | STORED AS carbondata
+         | PARTITIONED BY (hour_ string, date_ string)
+         | TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin
+    ).collect()
+
+    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+    val dataTempPath = rootPath + "/data/temp/"
+    val dataPath = rootPath + "/data/"
+    delDir(new File(dataPath))
+    new File(dataPath).mkdir()
+
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+
+      val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.setParallelism(6)
+      environment.enableCheckpointing(2000L)
+      environment.setRestartStrategy(RestartStrategies.noRestart)
+
+      val dataCount = 1000
+      val source = new TestSource(dataCount) {
+        @throws[InterruptedException]
+        override def get(index: Int): Array[AnyRef] = {
+          val data = new Array[AnyRef](7)
+          data(0) = "test" + index
+          data(1) = index.asInstanceOf[AnyRef]
+          data(2) = 12345.asInstanceOf[AnyRef]
+          data(3) = "test\0011\0012"
+          data(4) = "test"
+          data(5) = Integer.toString(TestSource.randomCache.get().nextInt(24))
+          data(6) = "20191218"
+          data
+        }
+
+        @throws[InterruptedException]
+        override def onFinish(): Unit = {
+          Thread.sleep(5000L)
+        }
+      }
+      val stream = environment.addSource(source)
+      val factory = CarbonWriterFactory.builder("Local").build(
+        "default",
+        tableName,
+        tablePath,
+        new Properties,
+        writerProperties,
+        carbonProperties
       )
+      val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI),
factory).build
+
+      stream.keyBy(new KeySelector[Array[AnyRef], AnyRef] {
+        override def getKey(value: Array[AnyRef]): AnyRef = value(3) // return hour_
+      }).addSink(streamSink)
+
+      try environment.execute
+      catch {
+        case exception: Exception =>
+          // TODO
+          throw new UnsupportedOperationException(exception)
+      }
+
+      val dataLocation = dataPath + "default" + CarbonCommonConstants.FILE_SEPARATOR +
+                         tableName + CarbonCommonConstants.FILE_SEPARATOR
+
+      assertResult(true)(FileFactory.isFileExist(dataLocation))
+      assertResult(false)(FileFactory
+        .getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
 
       sql(s"INSERT INTO $tableName STAGE")
 
-      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
+      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
 
     } finally {
       sql(s"drop table if exists $tableName").collect()
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 67c7bab..9195863 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -64,7 +64,7 @@ class TestCarbonWriter extends QueryTest {
       environment.enableCheckpointing(2000L)
       environment.setRestartStrategy(RestartStrategies.noRestart)
 
-      val dataCount = 10000
+      val dataCount = 1000
       val source = new TestSource(dataCount) {
         @throws[InterruptedException]
         override def get(index: Int): Array[AnyRef] = {
@@ -103,7 +103,7 @@ class TestCarbonWriter extends QueryTest {
 
       sql(s"INSERT INTO $tableName STAGE")
 
-      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
+      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
 
       // ensure the stage snapshot file and all stage files are deleted
       assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
@@ -116,9 +116,9 @@ class TestCarbonWriter extends QueryTest {
   }
 
   private def newWriterProperties(
-                                   dataTempPath: String,
-                                   dataPath: String,
-                                   storeLocation: String) = {
+    dataTempPath: String,
+    dataPath: String,
+    storeLocation: String) = {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
     properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index f7b8668..bb5e946 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -26,15 +26,17 @@ import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.spark.{Accumulator, CarbonInputMetrics, DataSkewRangePartitioner, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, StructField, StructType}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -99,7 +101,7 @@ object DataLoadProcessBuilderOnSpark {
       ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
       DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
         convertStepRowCounter)
-    }.filter(_ != null)// Filter the bad record
+    }.filter(_ != null) // Filter the bad record
 
     // 3. Sort
     val configuration = DataLoadProcessBuilder.createConfiguration(model)
@@ -269,7 +271,7 @@ object DataLoadProcessBuilderOnSpark {
     val configuration = DataLoadProcessBuilder.createConfiguration(model)
     val header = configuration.getHeader
     val rangeColumn = model.getRangePartitionColumn
-    val rangeColumnIndex = (0 until header.length).find{
+    val rangeColumnIndex = (0 until header.length).find {
       index =>
         header(index).equalsIgnoreCase(rangeColumn.getColName)
     }.get
@@ -427,7 +429,7 @@ object DataLoadProcessBuilderOnSpark {
       .map(_.getColName)
       .toArray
     val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
-    val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow](
+    val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow](
       sparkSession,
       columnProjection = new CarbonProjection(columns),
       null,
@@ -436,13 +438,13 @@ object DataLoadProcessBuilderOnSpark {
       carbonTable.getTableInfo,
       new CarbonInputMetrics,
       null,
-      null,
+      classOf[SparkDataTypeConverterImpl],
       classOf[CarbonRowReadSupport],
       splits.asJava)
       .map { row =>
-        new GenericRow(row.getData.asInstanceOf[Array[Any]])
+        new GenericInternalRow(row.getData.asInstanceOf[Array[Any]])
       }
-    sparkSession.createDataFrame(rdd, schema)
+    SparkSQLUtil.execute(rdd, schema, sparkSession)
   }
 }
 
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 8f39f9b..13e7c45 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -22,13 +22,17 @@ import java.lang.reflect.Method
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.EmptyRule
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq,
Cast, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq,
NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil,
Utils}
 
 object SparkSQLUtil {
@@ -38,6 +42,10 @@ object SparkSQLUtil {
     Dataset.ofRows(sparkSession, logicalPlan)
   }
 
+  def execute(rdd: RDD[InternalRow], schema: StructType, sparkSession: SparkSession): DataFrame
= {
+    execute(LogicalRDD(schema.toAttributes, rdd)(sparkSession), sparkSession)
+  }
+
   def getSparkSession: SparkSession = {
     SparkSession.getDefaultSession.get
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index eb63d03..24e7765 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -32,12 +32,12 @@ import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -85,7 +85,6 @@ case class CarbonInsertFromStageCommand(
     val tablePath = table.getTablePath
     val stagePath = CarbonTablePath.getStageDir(tablePath)
     val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath)
-    var loadModel: CarbonLoadModel = null
     val lock = acquireIngestLock(table)
 
     try {
@@ -133,44 +132,21 @@ case class CarbonInsertFromStageCommand(
       val executorService = Executors.newFixedThreadPool(numThreads)
       val stageInputs = collectStageInputs(executorService, stageFiles)
 
-      // 3) add new segment with INSERT_IN_PROGRESS into table status
-      loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
-      CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
-
-      // 4) write all existing stage file names and segmentId into a new snapshot file
-      // The content of snapshot file is: first line is segmentId, followed by each line
is
-      // one stage file name
-      val content =
-        (Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
-      FileFactory.writeFile(content, snapshotFilePath)
-
-      // 5) perform data loading
+      // 3) perform data loading
       if (table.isHivePartitionTable) {
-        startLoadingWithPartition(spark, table, loadModel, stageInputs)
+        startLoadingWithPartition(spark, table, stageInputs, stageFiles, snapshotFilePath)
       } else {
-        startLoading(spark, table, loadModel, stageInputs)
+        startLoading(spark, table, stageInputs, stageFiles, snapshotFilePath)
       }
 
-      // 6) write segment file and update the segment entry to SUCCESS
-      val segmentFileName = SegmentFileStore.writeSegmentFile(
-        table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
-      SegmentFileStore.updateTableStatusFile(
-        table, loadModel.getSegmentId, segmentFileName,
-        table.getCarbonTableIdentifier.getTableId,
-        new SegmentFileStore(table.getTablePath, segmentFileName),
-        SegmentStatus.SUCCESS)
-
-      // 7) delete stage files
+      // 4) delete stage files
       deleteStageFiles(executorService, stageFiles)
 
-      // 8) delete the snapshot file
+      // 5) delete the snapshot file
       FileFactory.getCarbonFile(snapshotFilePath).delete()
     } catch {
       case ex: Throwable =>
         LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}",
ex)
-        if (loadModel != null) {
-          CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
-        }
         throw ex
     } finally {
       lock.unlock()
@@ -266,24 +242,55 @@ case class CarbonInsertFromStageCommand(
   private def startLoading(
       spark: SparkSession,
       table: CarbonTable,
-      loadModel: CarbonLoadModel,
-      stageInput: Seq[StageInput]
+      stageInput: Seq[StageInput],
+      stageFiles: Array[(CarbonFile, CarbonFile)],
+      snapshotFilePath: String
   ): Unit = {
-    val splits = stageInput.flatMap(_.createSplits().asScala)
-    LOGGER.info(s"start to load ${splits.size} files into " +
-                s"${table.getDatabaseName}.${table.getTableName}")
-    val start = System.currentTimeMillis()
-    val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
-    DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
-      spark,
-      Option(dataFrame),
-      loadModel,
-      SparkSQLUtil.sessionState(spark).newHadoopConf()
-    ).map { row =>
-        (row._1, FailureCauses.NONE == row._2._2.failureCauses)
-    }
+    var loadModel: CarbonLoadModel = null
+    try {
+      // 1) add new segment with INSERT_IN_PROGRESS into table status
+      loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
+      CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
 
-    LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
+      // 2) write all existing stage file names and segmentId into a new snapshot file
+      // The content of snapshot file is: first line is segmentId, followed by each line
is
+      // one stage file name
+      val content =
+      (Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
+      FileFactory.writeFile(content, snapshotFilePath)
+
+      // 3) do loading.
+      val splits = stageInput.flatMap(_.createSplits().asScala)
+      LOGGER.info(s"start to load ${splits.size} files into " +
+                  s"${table.getDatabaseName}.${table.getTableName}")
+      val start = System.currentTimeMillis()
+      val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
+      DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+        spark,
+        Option(dataFrame),
+        loadModel,
+        SparkSQLUtil.sessionState(spark).newHadoopConf()
+      ).map { row =>
+          (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+      }
+      LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
+
+      // 4) write segment file and update the segment entry to SUCCESS
+      val segmentFileName = SegmentFileStore.writeSegmentFile(
+        table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
+      SegmentFileStore.updateTableStatusFile(
+        table, loadModel.getSegmentId, segmentFileName,
+        table.getCarbonTableIdentifier.getTableId,
+        new SegmentFileStore(table.getTablePath, segmentFileName),
+        SegmentStatus.SUCCESS)
+    } catch {
+      case ex: Throwable =>
+        LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}",
ex)
+        if (loadModel != null) {
+          CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
+        }
+        throw ex
+    }
   }
 
   /**
@@ -292,15 +299,18 @@ case class CarbonInsertFromStageCommand(
   private def startLoadingWithPartition(
       spark: SparkSession,
       table: CarbonTable,
-      loadModel: CarbonLoadModel,
-      stageInput: Seq[StageInput]
+      stageInput: Seq[StageInput],
+      stageFiles: Array[(CarbonFile, CarbonFile)],
+      snapshotFilePath: String
     ): Unit = {
     val partitionDataList = listPartitionFiles(stageInput)
+
+    val content = stageFiles.map(_._1.getAbsolutePath).mkString("\n")
+    FileFactory.writeFile(content, snapshotFilePath)
+
     val start = System.currentTimeMillis()
-    var index = 0
     partitionDataList.map {
       case (partition, splits) =>
-        index = index + 1
         LOGGER.info(s"start to load ${splits.size} files into " +
           s"${table.getDatabaseName}.${table.getTableName}. " +
           s"Partition information: ${partition.mkString(",")}")
@@ -484,22 +494,20 @@ case class CarbonInsertFromStageCommand(
       .map(_.getColName)
       .toArray
     val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
-    val rdd: RDD[Row] = new CarbonScanRDD[InternalRow](
-        sparkSession,
-        columnProjection = new CarbonProjection(columns),
-        null,
-        carbonTable.getAbsoluteTableIdentifier,
-        carbonTable.getTableInfo.serialize,
-        carbonTable.getTableInfo,
-        new CarbonInputMetrics,
-        null,
-        null,
-        classOf[SparkRowReadSupportImpl],
-        splits.asJava
-      ).map { row =>
-        new GenericRow(row.toSeq(schema).toArray)
-      }
-    sparkSession.createDataFrame(rdd, schema)
+    val rdd: RDD[InternalRow] = new CarbonScanRDD[InternalRow](
+      sparkSession,
+      columnProjection = new CarbonProjection(columns),
+      null,
+      carbonTable.getAbsoluteTableIdentifier,
+      carbonTable.getTableInfo.serialize,
+      carbonTable.getTableInfo,
+      new CarbonInputMetrics,
+      null,
+      classOf[SparkDataTypeConverterImpl],
+      classOf[SparkRowReadSupportImpl],
+      splits.asJava
+    )
+    SparkSQLUtil.execute(rdd, schema, sparkSession)
   }
 
   override protected def opName: String = "INSERT STAGE"
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0309e91..1334178 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -52,7 +52,6 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -662,34 +661,14 @@ case class CarbonLoadDataCommand(
       curAttributes: Seq[AttributeReference],
       sortScope: SortScopeOptions.SortScope,
       isDataFrame: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
+    val catalogAttributes = catalogTable.schema.toAttributes
     // Converts the data as per the loading steps before give it to writer or sorter
-    val convertedRdd = convertData(
+    val updatedRdd = convertData(
       rdd,
       sparkSession,
       loadModel,
       isDataFrame,
       partitionValues)
-    val updatedRdd = if (isDataFrame) {
-      val columnCount = loadModel.getCsvHeaderColumns.length
-      convertedRdd.map { row =>
-        val array = new Array[AnyRef](columnCount)
-        val data = row.getData
-        var i = 0
-        while (i < columnCount) {
-          data(i) match {
-            case string: String =>
-              array(i) = UTF8String.fromString(string)
-            case _ =>
-              array(i) = data(i)
-          }
-          i = i + 1
-        }
-        array
-      }.map(row => InternalRow.fromSeq(row))
-    } else {
-      convertedRdd.map(row => InternalRow.fromSeq(row.getData))
-    }
-    val catalogAttributes = catalogTable.schema.toAttributes
     var attributes = curAttributes.map(a => {
       catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
     })
@@ -783,7 +762,7 @@ case class CarbonLoadDataCommand(
       sparkSession: SparkSession,
       model: CarbonLoadModel,
       isDataFrame: Boolean,
-      partitionValues: Array[String]): RDD[CarbonRow] = {
+      partitionValues: Array[String]): RDD[InternalRow] = {
     val sc = sparkSession.sparkContext
     val info =
       model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
@@ -827,7 +806,7 @@ case class CarbonLoadDataCommand(
           partialSuccessAccum,
           inputStepRowCounter,
           keepActualData = true)
-      }.filter(_ != null)
+      }.filter(_ != null).map(row => InternalRow.fromSeq(row.getData))
 
     finalRDD
   }


Mime
View raw message