Repository: incubator-carbondata
Updated Branches:
refs/heads/master a011aafb0 -> 7788f468c
inserInto without kettle for spark2
fix comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/498cf982
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/498cf982
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/498cf982
Branch: refs/heads/master
Commit: 498cf982995feba7012ce3993b3fd5172d2e5a15
Parents: a011aaf
Author: QiangCai <qiangcai@qq.com>
Authored: Mon Dec 19 11:01:40 2016 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Wed Dec 28 14:45:18 2016 +0800
----------------------------------------------------------------------
.../spark/rdd/NewCarbonDataLoadRDD.scala | 132 +++++++--------
.../readsupport/SparkRowReadSupportImpl.java | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 22 +--
.../emptyrow/TestCSVHavingOnlySpaceChar.scala | 1 -
.../testsuite/emptyrow/TestEmptyRows.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 36 +++--
.../apache/spark/sql/hive/CarbonMetastore.scala | 1 -
.../InsertIntoCarbonTableTestCase.scala | 162 +++++++++++++++++++
.../carbondata/CarbonDataSourceSuite.scala | 1 +
.../sql/common/util/CarbonSessionTest.scala | 0
.../store/CarbonFactDataHandlerColumnar.java | 29 +++-
.../store/SingleThreadFinalSortFilesMerger.java | 2 +-
.../store/writer/AbstractFactDataWriter.java | 19 ++-
13 files changed, 287 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 96bb5ed..64b8b61 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -18,20 +18,23 @@
package org.apache.carbondata.spark.rdd
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util
import java.util.{Date, UUID}
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SparkUtil
import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -41,12 +44,13 @@ import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.hadoop.csv.CSVInputFormat
import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
+import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.newflow.DataLoadExecutor
import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
import org.apache.carbondata.spark.DataLoadResult
import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.CarbonQueryUtil
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil}
class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
@@ -323,7 +327,7 @@ class NewDataFrameLoaderRDD[K, V](
loadCount: Integer,
tableCreationTime: Long,
schemaLastUpdatedTime: Long,
- prev: RDD[Row]) extends RDD[(K, V)](prev) {
+ prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
@@ -342,29 +346,25 @@ class NewDataFrameLoaderRDD[K, V](
carbonLoadModel.setSegmentId(String.valueOf(loadCount))
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- val iterator = new NewRddIterator(
- firstParent[Row].iterator(theSplit, context),
- carbonLoadModel)
-
- class CarbonIteratorImpl(iterator: util.Iterator[Array[AnyRef]])
- extends CarbonIterator[Array[AnyRef]] {
- override def initialize(): Unit = {}
-
- override def close(): Unit = {}
-
- override def next(): Array[AnyRef] = {
- iterator.next
- }
-
- override def hasNext: Boolean = {
- iterator.hasNext
+ val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
+ val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context)
+ val serializer = SparkEnv.get.closureSerializer.newInstance()
+ var serializeBuffer: ByteBuffer = null
+ while(partitionIterator.hasNext) {
+ val value = partitionIterator.next()
+ val newInstance = {
+ if (serializeBuffer == null) {
+ serializeBuffer = serializer.serialize[RDD[Row]](value.rdd)
+ }
+ serializeBuffer.rewind()
+ serializer.deserialize[RDD[Row]](serializeBuffer)
}
+ recordReaders += new CarbonIteratorImpl(
+ new NewRddIterator(newInstance.iterator(value.partition, context),
+ carbonLoadModel,
+ context))
}
-
- val recordReaders: Array[CarbonIterator[Array[AnyRef]]] =
- Array(new CarbonIteratorImpl(iterator))
-
val loader = new SparkPartitionLoader(model,
theSplit.index,
null,
@@ -375,7 +375,7 @@ class NewDataFrameLoaderRDD[K, V](
loader.initialize()
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
- new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
+ new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
} catch {
case e: BadRecordFoundException =>
@@ -402,76 +402,52 @@ class NewDataFrameLoaderRDD[K, V](
/**
* This class wrap Scala's Iterator to Java's Iterator.
- * It also convert all columns to string data since carbondata will recognize the right type
- * according to schema from spark DataFrame.
- * @see org.apache.carbondata.spark.rdd.RddIterator
+ * It also convert all columns to string data to use csv data loading flow.
+ *
* @param rddIter
* @param carbonLoadModel
+ * @param context
*/
class NewRddIterator(rddIter: Iterator[Row],
- carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[AnyRef]] {
+ carbonLoadModel: CarbonLoadModel,
+ context: TaskContext) extends JavaRddIterator[Array[AnyRef]] {
+
val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
val format = new SimpleDateFormat(formatString)
val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-
+ val serializationNullFormat =
+ carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
def hasNext: Boolean = rddIter.hasNext
- private def getString(value: Any, level: Int = 1): String = {
- if (value == null) {
- ""
- } else {
- value match {
- case s: String => s
- case i: java.lang.Integer => i.toString
- case d: java.lang.Double => d.toString
- case t: java.sql.Timestamp => format format t
- case d: java.sql.Date => format format d
- case d: java.math.BigDecimal => d.toPlainString
- case b: java.lang.Boolean => b.toString
- case s: java.lang.Short => s.toString
- case f: java.lang.Float => f.toString
- case bs: Array[Byte] => new String(bs)
- case s: scala.collection.Seq[Any] =>
- val delimiter = if (level == 1) {
- delimiterLevel1
- } else {
- delimiterLevel2
- }
- val builder = new StringBuilder()
- s.foreach { x =>
- builder.append(getString(x, level + 1)).append(delimiter)
- }
- builder.substring(0, builder.length - 1)
- case m: scala.collection.Map[Any, Any] =>
- throw new Exception("Unsupported data type: Map")
- case r: org.apache.spark.sql.Row =>
- val delimiter = if (level == 1) {
- delimiterLevel1
- } else {
- delimiterLevel2
- }
- val builder = new StringBuilder()
- for (i <- 0 until r.length) {
- builder.append(getString(r(i), level + 1)).append(delimiter)
- }
- builder.substring(0, builder.length - 1)
- case other => other.toString
- }
- }
- }
-
def next: Array[AnyRef] = {
val row = rddIter.next()
- val columns = new Array[Object](row.length)
- for (i <- 0 until row.length) {
- columns(i) = getString(row(i))
+ val columns = new Array[AnyRef](row.length)
+ for (i <- 0 until columns.length) {
+ columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
+ delimiterLevel1, delimiterLevel2, format)
}
columns
}
- def remove(): Unit = {
+ def initialize: Unit = {
+ SparkUtil.setTaskContext(context)
}
}
+
+class CarbonIteratorImpl(iterator: NewRddIterator)
+ extends CarbonIterator[Array[AnyRef]] {
+ override def initialize(): Unit = iterator.initialize
+
+ override def close(): Unit = {}
+
+ override def next(): Array[AnyRef] = {
+ iterator.next
+ }
+
+ override def hasNext: Boolean = {
+ iterator.hasNext
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 68f923d..46e5244 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -71,7 +71,7 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
//convert the long to timestamp in case of direct dictionary column
if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
- data[i] = new Timestamp((long) data[i]);
+ data[i] = new Timestamp((long) data[i] / 1000L);
} else if (DataType.DATE == carbonColumns[i].getDataType()) {
data[i] = new Date((long) data[i]);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ff7bf23..d975502 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -635,15 +635,14 @@ object CarbonDataRDDFactory {
try {
val rdd = dataFrame.get.rdd
- if (useKettle) {
-
- val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
- DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
- }.distinct.size
- val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
- sqlContext.sparkContext)
- val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
+ val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
+ DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+ }.distinct.size
+ val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
+ sqlContext.sparkContext)
+ val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
+ if (useKettle) {
status = new DataFrameLoaderRDD(sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
@@ -655,18 +654,13 @@ object CarbonDataRDDFactory {
schemaLastUpdatedTime,
newRdd).collect()
} else {
-
- var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
- numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
- val coalesceRdd = rdd.coalesce(numPartitions, shuffle = false)
-
status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
currentLoadCount,
tableCreationTime,
schemaLastUpdatedTime,
- coalesceRdd).collect()
+ newRdd).collect()
}
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala
index 82d6fdf..06cfadf 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala
@@ -61,7 +61,6 @@ class TestCSVHavingOnlySpaceChar extends QueryTest with BeforeAndAfterAll {
override def afterAll {
sql("drop table emptyRowCarbonTable")
- sql("drop table emptyRowHiveTable")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
index 44165ac..de2c541 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
@@ -56,7 +56,7 @@ class TestEmptyRows extends QueryTest with BeforeAndAfterAll {
sql(
"LOAD DATA LOCAL INPATH '" + csvFilePath + "' into table " +
"emptyRowHiveTable"
- );
+ )
}
test("select eid from table") {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f451a54..0f32ad9 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -336,7 +336,7 @@ object CarbonDataRDDFactory {
carbonLoadModel: CarbonLoadModel,
storePath: String,
kettleHomePath: String,
- columinar: Boolean,
+ columnar: Boolean,
partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
useKettle: Boolean,
dataFrame: Option[DataFrame] = None): Unit = {
@@ -612,7 +612,7 @@ object CarbonDataRDDFactory {
carbonLoadModel,
storePath,
kettleHomePath,
- columinar,
+ columnar,
currentLoadCount,
tableCreationTime,
schemaLastUpdatedTime,
@@ -632,6 +632,7 @@ object CarbonDataRDDFactory {
def loadDataFrame(): Unit = {
try {
val rdd = dataFrame.get.rdd
+
val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
}.distinct.size
@@ -639,16 +640,27 @@ object CarbonDataRDDFactory {
sqlContext.sparkContext)
val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
- status = new DataFrameLoaderRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- storePath,
- kettleHomePath,
- columinar,
- currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
- newRdd).collect()
+ if (useKettle) {
+ status = new DataFrameLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ storePath,
+ kettleHomePath,
+ columnar,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ newRdd).collect()
+ } else {
+ status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ newRdd).collect()
+ }
+
} catch {
case ex: Exception =>
LOGGER.error(ex, "load data frame failed")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index f174126..f9ad661 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -157,7 +157,6 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
CarbonRelation(database, tableIdentifier.table,
CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)
case None =>
- LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
throw new NoSuchTableException(database, tableIdentifier.table)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
new file mode 100644
index 0000000..adb7a1c
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.allqueries
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
+ var timeStampPropOrig: String = _
+ override def beforeAll {
+ dropTableIfExists
+ timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions st
ring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
+ sql("LOAD DATA local INPATH '../spark/src/test/resources/100_olap.csv' INTO TABLE THive")
+ }
+ test("insert from hive") {
+ sql("create table TCarbon1 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions
string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon1')")
+ sql("insert into TCarbon1 select * from THive")
+ checkAnswer(
+ sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from THive order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Lates
t_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
+ sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbon1 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
+ )
+ }
+ test("insert from hive-sum expression") {
+ sql("create table TCarbon2 (MAC string,deviceInformationIdSum int) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon2')")
+ sql("insert into TCarbon2 select MAC,sum(deviceInformationId+ 10) as a from THive group by MAC")
+ checkAnswer(
+ sql("select MAC,deviceInformationIdSum from TCarbon2 order by MAC"),
+ sql("select MAC,sum(deviceInformationId+ 10) as a from THive group by MAC order by MAC")
+ )
+ }
+ test("insert from carbon-select columns") {
+ sql("create table TCarbon3 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions
string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon3')")
+ sql("insert into TCarbon3 select * from TCarbon1")
+ checkAnswer(
+ sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbon1 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
+ sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbon3 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
+ )
+ }
+ test("insert from carbon-select columns-source table has more column then target column") {
+ sql("create table TCarbon10 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon10')")
+ try{
+ sql("insert into TCarbon10 select * from TCarbon1")
+ assert(false)
+ } catch {
+ case ex: AnalysisException =>
+ if (ex.getMessage().contains("the number of columns are different")) {
+ assert(true)
+ } else {
+ assert(false)
+ }
+ case _ => assert(false)
+ }
+ }
+ test("insert from carbon-select * columns") {
+ sql("create table TCarbon4 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions
string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon4')")
+ sql("insert into TCarbon4 select * from TCarbon1")
+ checkAnswer(
+ sql("select * from TCarbon1"),
+ sql("select * from TCarbon4")
+ )
+ }
+ test("insert->hive column more than carbon column->success") {
+ sql("create table TCarbon5 (imei string,deviceInformationId int,MAC string,deviceColor string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon5')")
+ try {
+ sql("insert into TCarbon5 select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber,device_backColor,modelId,CUPAudit,CPIClocked from THive")
+ assert(false)
+ } catch {
+ case ex: AnalysisException =>
+ if (ex.getMessage().contains("the number of columns are different")) {
+ assert(true)
+ } else {
+ assert(false)
+ }
+ case _ => assert(false)
+ }
+
+ }
+ test("insert->carbon column is more then hive-fails") {
+ sql("create table TCarbon6 (imei string,deviceInformationId int,MAC string,deviceColor string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon6')")
+ try {
+ sql("insert into TCarbon6 select imei,deviceInformationId,MAC,deviceColor,gamePointId from THive")
+ assert(false)
+ } catch {
+ case ex: Exception => assert(true)
+ }
+ }
+ test("insert->insert wrong data types-pass") {
+ sql("create table TCarbon7 (imei string,deviceInformationId int,MAC string) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon7')")
+ sql("insert into TCarbon7 select imei,MAC,deviceInformationId from THive")
+ sql("create table THive7 (imei string,deviceInformationId int,MAC string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
+ sql("insert into THive7 select imei,MAC,deviceInformationId from THive")
+ checkAnswer(
+ sql("select imei,deviceInformationId,MAC from TCarbon7"),
+ sql("select imei,deviceInformationId,MAC from THive7")
+ )
+ }
+ test("insert->insert empty data -pass") {
+ sql("create table TCarbon8 (imei string,deviceInformationId int,MAC string) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon8')")
+ sql("insert into TCarbon8 select imei,deviceInformationId,MAC from THive where MAC='wrongdata'")
+ checkAnswer(
+ sql("select imei,deviceInformationId,MAC from THive where MAC='wrongdata'"),
+ sql("select imei,deviceInformationId,MAC from TCarbon8 where MAC='wrongdata'")
+ )
+ }
+ test("insert into existing load-pass") {
+ sql("create table TCarbon9 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions
string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon9')")
+ sql("create table THive9 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions s
tring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
+ sql("insert into TCarbon9 select * from THive")
+ sql("insert into TCarbon9 select * from THive")
+ sql("insert into THive9 select * from THive")
+ sql("insert into THive9 select * from THive")
+ checkAnswer(
+ sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from THive9 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Late
st_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
+ sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbon9 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
+ )
+ }
+ override def afterAll {
+ dropTableIfExists
+ }
+
+ def dropTableIfExists: Unit = {
+ sql("DROP TABLE IF EXISTS THive")
+ sql("drop table if exists TCarbonSource3")
+ sql("drop table if exists TCarbonSource4")
+ sql("drop table if exists load")
+ sql("drop table if exists inser")
+ sql("drop table if exists TCarbon1")
+ sql("drop table if exists TCarbon2")
+ sql("drop table if exists TCarbon3")
+ sql("drop table if exists TCarbon4")
+ sql("drop table if exists TCarbon5")
+ sql("drop table if exists TCarbon6")
+ sql("drop table if exists TCarbon7")
+ sql("drop table if exists TCarbon8")
+ sql("drop table if exists TCarbon9")
+ if (timeStampPropOrig != null) {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 057d894..3ba9f6a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -26,6 +26,7 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll {
// Drop table
spark.sql("DROP TABLE IF EXISTS carbon_testtable")
spark.sql("DROP TABLE IF EXISTS csv_table")
+
// Create table
spark.sql(
s"""
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index cde19bd..2d468ac 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -493,7 +493,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
} else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
max[i] = -Double.MAX_VALUE;
} else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- //max[i] = new BigDecimal(0.0);
Long[] bigdMinVal = new Long[2];
bigdMinVal[0] = Long.MIN_VALUE;
bigdMinVal[1] = Long.MIN_VALUE;
@@ -513,9 +512,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
Long[] bigdMaxVal = new Long[2];
bigdMaxVal[0] = Long.MAX_VALUE;
bigdMaxVal[1] = Long.MAX_VALUE;
- //min[i] = new BigDecimal(Double.MAX_VALUE);
min[i] = bigdMaxVal;
- uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
Long[] bigdUniqueVal = new Long[2];
bigdUniqueVal[0] = Long.MIN_VALUE;
bigdUniqueVal[1] = Long.MIN_VALUE;
@@ -654,7 +651,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
} else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
max[i] = -Double.MAX_VALUE;
} else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- max[i] = new BigDecimal(0.0);
+ Long[] bigdMinVal = new Long[2];
+ bigdMinVal[0] = Long.MIN_VALUE;
+ bigdMinVal[1] = Long.MIN_VALUE;
+ max[i] = bigdMinVal;
} else {
max[i] = 0.0;
}
@@ -667,8 +667,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
min[i] = Double.MAX_VALUE;
uniqueValue[i] = Double.MIN_VALUE;
} else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- min[i] = new BigDecimal(Double.MAX_VALUE);
- uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
+ Long[] bigdMaxVal = new Long[2];
+ bigdMaxVal[0] = Long.MAX_VALUE;
+ bigdMaxVal[1] = Long.MAX_VALUE;
+ min[i] = bigdMaxVal;
+ Long[] bigdUniqueVal = new Long[2];
+ bigdUniqueVal[0] = Long.MIN_VALUE;
+ bigdUniqueVal[1] = Long.MIN_VALUE;
+ uniqueValue[i] = bigdUniqueVal;
} else {
min[i] = 0.0;
uniqueValue[i] = 0.0;
@@ -747,11 +753,22 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
b = (byte[]) row[customMeasureIndex[i]];
}
}
+ BigDecimal value = DataTypeUtil.byteToBigDecimal(b);
+ String[] bigdVals = value.toPlainString().split("\\.");
+ long[] bigDvalue = new long[2];
+ if (bigdVals.length == 2) {
+ bigDvalue[0] = Long.parseLong(bigdVals[0]);
+ BigDecimal bd = new BigDecimal(CarbonCommonConstants.POINT+bigdVals[1]);
+ bigDvalue[1] = (long)(bd.doubleValue()*Math.pow(10, value.scale()));
+ } else {
+ bigDvalue[0] = Long.parseLong(bigdVals[0]);
+ }
byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
byteBuffer.putInt(b.length);
byteBuffer.put(b);
byteBuffer.flip();
b = byteBuffer.array();
+ dataHolder[customMeasureIndex[i]].setWritableBigDecimalValueByIndex(count, bigDvalue);
dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b);
}
calculateMaxMin(max, min, decimal, customMeasureIndex, row);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 1579415..2c82672 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -134,7 +134,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
}
});
- if (null == fileList || fileList.length < 0) {
+ if (null == fileList || fileList.length == 0) {
return;
}
startSorting(fileList);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a94bea5..a1e7311 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -402,12 +402,15 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
public void closeWriter() throws CarbonDataWriterException {
CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
- renameCarbonDataFile();
- copyCarbonDataFileToCarbonStorePath(this.fileName.substring(0, this.fileName.lastIndexOf('.')));
- try {
- writeIndexFile();
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem while writing the index file", e);
+ if (this.blockletInfoList.size() > 0) {
+ renameCarbonDataFile();
+ copyCarbonDataFileToCarbonStorePath(
+ this.fileName.substring(0, this.fileName.lastIndexOf('.')));
+ try {
+ writeIndexFile();
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the index file", e);
+ }
}
closeExecutorService();
}
@@ -539,7 +542,9 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* @throws CarbonDataWriterException
*/
@Override public void writeBlockletInfoToFile() throws CarbonDataWriterException {
- writeBlockletInfoToFile(this.blockletInfoList, fileChannel, fileName);
+ if (this.blockletInfoList.size() > 0) {
+ writeBlockletInfoToFile(this.blockletInfoList, fileChannel, fileName);
+ }
}
/**
|