carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-1856][PARTITION] Support insert/load data for partition table
Date Mon, 18 Dec 2017 16:30:48 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 6e224dce6 -> 4430178c0


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
new file mode 100644
index 0000000..99ce7fa
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.util.{DataLoadingUtil, Util}
+
+class CarbonFileFormat
+  extends FileFormat
+    with DataSourceRegister
+    with Logging
+with Serializable {
+
+  override def shortName(): String = "carbondata"
+
+  override def inferSchema(sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    None
+  }
+
+  override def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    conf.setClass(
+      SQLConf.OUTPUT_COMMITTER_CLASS.key,
+      classOf[CarbonOutputCommitter],
+      classOf[CarbonOutputCommitter])
+    conf.set("carbon.commit.protocol", "carbon.commit.protocol")
+    sparkSession.sessionState.conf.setConfString(
+      "spark.sql.sources.commitProtocolClass",
+      "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+
+    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
+    var table = CarbonEnv.getCarbonTable(
+      TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
+    val model = new CarbonLoadModel
+    val carbonProperty = CarbonProperties.getInstance()
+    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+    val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+    optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
+      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+    val partitionStr =
+      table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
+        _.getColumnName.toLowerCase).mkString(",")
+    optionsFinal.put(
+      "fileheader",
+      dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
+    DataLoadingUtil.buildCarbonLoadModel(
+      table,
+      carbonProperty,
+      options,
+      optionsFinal,
+      model,
+      conf
+    )
+    model.setPartitionId("0")
+    model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
+    model.setDictionaryServerHost(options.getOrElse("dicthost", null))
+    model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
+    CarbonTableOutputFormat.setLoadModel(conf, model)
+    CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
+
+    new OutputWriterFactory {
+
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
+        var storeLocation: Array[String] = Array[String]()
+        val isCarbonUseLocalDir = CarbonProperties.getInstance()
+          .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
+        val tmpLocationSuffix = File.separator + System.nanoTime()
+        if (isCarbonUseLocalDir) {
+          val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+          if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty)
{
+            // use single dir
+            storeLocation = storeLocation :+
+              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
+            if (storeLocation == null || storeLocation.isEmpty) {
+              storeLocation = storeLocation :+
+                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
+            }
+          } else {
+            // use all the yarn dirs
+            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
+          }
+        } else {
+          storeLocation =
+            storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
+        }
+        CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
+        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType))
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        ".carbondata"
+      }
+
+    }
+  }
+}
+
+case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend:
Boolean)
+  extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
+  override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext,
+      absoluteDir: String,
+      ext: String): String = {
+    val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol")
+    if (carbonFlow != null) {
+      super.newTaskTempFile(taskContext, Some(absoluteDir), ext)
+    } else {
+      super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+    }
+  }
+}
+
+/**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+private trait AbstractCarbonOutputWriter {
+  def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+  def writeInternal(row: InternalRow): Unit = {
+    writeCarbon(row)
+  }
+  def write(row: InternalRow): Unit = {
+    writeCarbon(row)
+  }
+  def writeCarbon(row: InternalRow): Unit
+}
+
+private class CarbonOutputWriter(path: String,
+    context: TaskAttemptContext,
+    fieldTypes: Seq[DataType])
+  extends OutputWriter with AbstractCarbonOutputWriter {
+  val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName)
+  val partitionData = if (partitions.nonEmpty) {
+    partitions.map(_.split("=")(1))
+  } else {
+    Array.empty
+  }
+  val writable = new StringArrayWritable()
+
+  private val recordWriter: CarbonRecordWriter = {
+
+    new CarbonTableOutputFormat() {
+      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path
= {
+        new Path(path)
+      }
+    }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter]
+  }
+
+  // TODO Implement writesupport interface to support writing Row directly to recordwriter
+  def writeCarbon(row: InternalRow): Unit = {
+    val data = new Array[String](fieldTypes.length + partitionData.length)
+    var i = 0
+    while (i < fieldTypes.length) {
+      if (!row.isNullAt(i)) {
+        data(i) = row.getString(i)
+      }
+      i += 1
+    }
+    if (partitionData.length > 0) {
+      System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length)
+    }
+    writable.set(data)
+    recordWriter.write(NullWritable.get(), writable)
+  }
+
+
+  override def writeInternal(row: InternalRow): Unit = {
+    writeCarbon(row)
+  }
+
+  override def close(): Unit = {
+    recordWriter.close(context)
+    val loadModel = recordWriter.getLoadModel
+    val segmentPath = CarbonTablePath.getSegmentPath(loadModel.getTablePath, loadModel.getSegmentId)
+    // write partition info to new file.
+    val partitonList = new util.ArrayList[String]()
+    partitions.foreach(partitonList.add)
+    new PartitionMapFileStore().writePartitionMapFile(
+      segmentPath,
+      loadModel.getTaskNo,
+      partitonList)
+  }
+
+  def getPartitionsFromPath(path: String, attemptContext: TaskAttemptContext): Array[String]
= {
+    var attemptId = attemptContext.getTaskAttemptID.toString + "/"
+    if (path.indexOf(attemptId) <= 0) {
+      val model = CarbonTableOutputFormat.getLoadModel(attemptContext.getConfiguration)
+      attemptId = model.getTableName + "/"
+    }
+    val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
+    if (str.length > 0) {
+      str.split("/")
+    } else {
+      Array.empty
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 8b247f7..7f9bdf7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -40,7 +40,6 @@ import org.apache.carbondata.core.metadata.schema.BucketingInfo
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.format.DataType
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 8745900..21b81ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -74,8 +74,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           CarbonDropTableCommand(ifNotExists, identifier.database,
             identifier.table.toLowerCase)) :: Nil
       case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
-      _, child: LogicalPlan, overwrite, _) =>
-        ExecutedCommandExec(CarbonInsertIntoCommand(relation, child, overwrite)) :: Nil
+      partition, child: LogicalPlan, overwrite, _) =>
+        ExecutedCommandExec(CarbonInsertIntoCommand(relation, child, overwrite, partition))
:: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
         FileUtils.createDatabaseDirectory(dbName, CarbonProperties.getStorePath)
         ExecutedCommandExec(createDb) :: Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index de5fa7e..ff7e06a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -17,18 +17,25 @@
 
 package org.apache.spark.sql.hive
 
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction,
UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
+import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FileFormat,
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
 case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
 
   private lazy val parser = sparkSession.sessionState.sqlParser
@@ -200,3 +207,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends
Rule[Logica
     }
   }
 }
+
+/**
+ * Insert into carbon table from other source
+ */
+case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[LogicalPlan]
{
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transform {
+      // Wait until children are resolved.
+      case p: LogicalPlan if !p.childrenResolved => p
+
+      case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
+        if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        castChildOutput(p, relation, child)
+    }
+  }
+
+  def castChildOutput(p: InsertIntoTable,
+      relation: LogicalRelation,
+      child: LogicalPlan): LogicalPlan = {
+    val carbonDSRelation = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+    if (carbonDSRelation.carbonRelation.output.size > CarbonCommonConstants
+      .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
+      CarbonException.analysisException(
+        s"Maximum number of columns supported: " +
+          s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}")
+    }
+    if (child.output.size >= carbonDSRelation.carbonRelation.output.size ||
+        carbonDSRelation.carbonTable.isHivePartitionTable) {
+      val newChildOutput = child.output.zipWithIndex.map { columnWithIndex =>
+        columnWithIndex._1 match {
+          case attr: Alias =>
+            Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
+          case attr: Attribute =>
+            Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
+          case attr => attr
+        }
+      }
+      val version = SPARK_VERSION
+      val newChild: LogicalPlan = if (newChildOutput == child.output) {
+        if (version.startsWith("2.1")) {
+          CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
+        } else if (version.startsWith("2.2")) {
+          CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
+        } else {
+          throw new UnsupportedOperationException(s"Spark version $version is not supported")
+        }
+      } else {
+        Project(newChildOutput, child)
+      }
+
+      val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
+
+      InsertIntoCarbonTable(carbonDSRelation, p.partition, newChild, overwrite, true)
+    } else {
+      CarbonException.analysisException(
+        "Cannot insert into target table because number of columns mismatch")
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 623d309..c28cc44 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -1322,59 +1322,3 @@ object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan]
{
       }
   }
 }
-
-/**
- * Insert into carbon table from other source
- */
-case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[LogicalPlan]
{
-  def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transform {
-      // Wait until children are resolved.
-      case p: LogicalPlan if !p.childrenResolved => p
-
-      case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
-        if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation],
child)
-    }
-  }
-
-  def castChildOutput(p: InsertIntoTable,
-      relation: CarbonDatasourceHadoopRelation,
-      child: LogicalPlan): LogicalPlan = {
-    if (relation.carbonRelation.output.size > CarbonCommonConstants
-      .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
-      CarbonException.analysisException("Maximum number of columns supported:" +
-        s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}")
-    }
-    if (child.output.size >= relation.carbonRelation.output.size) {
-      val newChildOutput = child.output.zipWithIndex.map { columnWithIndex =>
-        columnWithIndex._1 match {
-          case attr: Alias =>
-            Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
-          case attr: Attribute =>
-            Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
-          case attr => attr
-        }
-      }
-      val version = SPARK_VERSION
-      val newChild: LogicalPlan = if (newChildOutput == child.output) {
-        if (version.startsWith("2.1")) {
-          CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
-        } else if (version.startsWith("2.2")) {
-          CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
-      } else {
-          throw new UnsupportedOperationException(s"Spark version $version is not supported")
-        }
-      } else {
-        Project(newChildOutput, child)
-      }
-
-      val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
-
-      InsertIntoCarbonTable(relation, p.partition, newChild, overwrite, true)
-    } else {
-      CarbonException.analysisException(
-        "Cannot insert into target table because number of columns mismatch")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 55d784f..79095ca 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -199,7 +199,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
     // validate partition clause
     if (partitionFields.nonEmpty) {
       if (!CommonUtil.validatePartitionColumns(tableProperties, partitionFields)) {
-        throw new MalformedCarbonCommandException("Error: Invalid partition definition")
+         throw new MalformedCarbonCommandException("Error: Invalid partition definition")
       }
       // partition columns should not be part of the schema
       val badPartCols = partitionFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
@@ -251,7 +251,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
     }
     if (partitionerFields.nonEmpty) {
       if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
-        throw new MalformedCarbonCommandException("Error: Invalid partition definition")
+         throw new MalformedCarbonCommandException("Error: Invalid partition definition")
       }
       // partition columns should not be part of the schema
       val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index abe90f1..0cd1331 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -55,7 +55,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]>
{
 
   @Override
   public String[] next() {
-    if (readBatch == null || !readBatch.hasNext()) {
+    if (readBatch == null || !readBatch.hasNext()  && !close) {
       try {
         readBatch = queue.take();
       } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index c32aa51..d3caa99 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -46,16 +46,16 @@ public abstract class AbstractResultProcessor {
       CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
     CarbonDataFileAttributes carbonDataFileAttributes;
     if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
-      int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
+      long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
           CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
               carbonTable.getCarbonTableIdentifier()));
       // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
       // be written in same segment. So the TaskNo should be incremented by 1 from max val.
-      int index = taskNo + 1;
+      long index = taskNo + 1;
       carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp());
     } else {
       carbonDataFileAttributes =
-          new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+          new CarbonDataFileAttributes(Long.parseLong(loadModel.getTaskNo()),
               loadModel.getFactTimeStamp());
     }
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 48c5471..68a212e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -51,7 +51,7 @@ public class RowResultProcessor {
         CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
             segProp, tableName, tempStoreLocation);
     CarbonDataFileAttributes carbonDataFileAttributes =
-        new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+        new CarbonDataFileAttributes(Long.parseLong(loadModel.getTaskNo()),
             loadModel.getFactTimeStamp());
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
     carbonFactDataHandlerModel.setBucketId(bucketId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
index b69815e..8bedd80 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
@@ -25,7 +25,7 @@ public class CarbonDataFileAttributes {
   /**
    * task Id which is unique for each spark task
    */
-  private int taskId;
+  private long taskId;
 
   /**
    * load start time
@@ -36,7 +36,7 @@ public class CarbonDataFileAttributes {
    * @param taskId
    * @param factTimeStamp
    */
-  public CarbonDataFileAttributes(int taskId, long factTimeStamp) {
+  public CarbonDataFileAttributes(long taskId, long factTimeStamp) {
     this.taskId = taskId;
     this.factTimeStamp = factTimeStamp;
   }
@@ -44,7 +44,7 @@ public class CarbonDataFileAttributes {
   /**
    * @return
    */
-  public int getTaskId() {
+  public long getTaskId() {
     return taskId;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index a8ae513..96bd2e3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -223,7 +223,7 @@ public class CarbonFactDataHandlerModel {
     }
 
     CarbonDataFileAttributes carbonDataFileAttributes =
-        new CarbonDataFileAttributes(Integer.parseInt(configuration.getTaskNo()),
+        new CarbonDataFileAttributes(Long.parseLong(configuration.getTaskNo()),
             (Long) configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP));
     String carbonDataDirectoryPath = getCarbonDataFolderLocation(configuration);
 


Mime
View raw message