carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with binary non-sort columns
Date Sun, 05 Jan 2020 06:46:42 GMT
This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 00f516b  [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from
CSV flow with binary non-sort columns
00f516b is described below

commit 00f516bbfaa58efa5f2e7914cfe01bddb0bcc812
Author: ajantha-bhat <ajanthabhat@gmail.com>
AuthorDate: Sat Dec 28 16:54:06 2019 +0800

    [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with
binary non-sort columns
    
    Problem:
    Global sort throws exception in load from CSV flow with binary non-sort columns.
    Exception is attached in JIRA and test case is added in PR.
    
    Cause: For global sort flow, we make csvRDD from file. But again this String RDD is converted
to new string scala objects, here binary was exceeding 32000 length. Hence the exception
    
    Solution: For CSV load flow, avoid this new String object conversion as it is already
string object.
    This can also handle [CARBONDATA-3638]
    
    This closes #3540
---
 .../testsuite/binary/TestBinaryDataType.scala      | 42 ++++++++++++++++++++++
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 25 ++++++++++---
 .../spark/load/DataLoadProcessorStepOnSpark.scala  | 29 +++++++++++++++
 3 files changed, 91 insertions(+), 5 deletions(-)

diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 7550581..84675fc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -112,6 +112,48 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
         }
     }
 
+    test("Create table and load data with binary column with other global sort columns")
{
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED AS CARBONDATA
+               | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE' = 'global_sort')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
+               | INTO TABLE binaryTable
+               | OPTIONS('header'='false')
+             """.stripMargin)
+
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+        try {
+            val df = sql("SELECT * FROM binaryTable").collect()
+            assert(3 == df.length)
+            df.foreach { each =>
+                assert(5 == each.length)
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+                val bytes40 = each.getAs[Array[Byte]](3).slice(0, 40)
+                val binaryName = each(2).toString
+                val expectedBytes = Hex.encodeHex(firstBytes20.get(binaryName).get)
+                assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40),
"incorrect value for binary data")
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        }
+    }
+
     private val firstBytes20 = Map("1.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26,
10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 1, 74),
         "2.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72,
68, 82, 0, 0, 2, -11),
         "3.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72,
68, 82, 0, 0, 1, 54)
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index bb5e946..ae859c0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -52,7 +52,7 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims,
SortParameters}
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, TableOptionConstant}
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow}
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.store.CarbonRowReadSupport
 
@@ -67,10 +67,12 @@ object DataLoadProcessBuilderOnSpark {
       dataFrame: Option[DataFrame],
       model: CarbonLoadModel,
       hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))]
= {
+    var isLoadFromCSV = false
     val originRDD = if (dataFrame.isDefined) {
       dataFrame.get.rdd
     } else {
       // input data from files
+      isLoadFromCSV = true
       val columnCount = model.getCsvHeaderColumns.length
       CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
         .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
@@ -90,11 +92,24 @@ object DataLoadProcessBuilderOnSpark {
 
     val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input
-    val inputRDD = originRDD
-      .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
-      .mapPartitionsWithIndex { case (index, rows) =>
-        DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter)
+    val inputRDD = if (isLoadFromCSV) {
+      // No need of wrap with NewRDDIterator, which converts object to string,
+      // as it is already a string.
+      // So, this will avoid new object creation in case of CSV global sort load for each
row
+      originRDD.mapPartitionsWithIndex { case (index, rows) =>
+        DataLoadProcessorStepOnSpark.inputFuncForCsvRows(
+          rows.asInstanceOf[Iterator[StringArrayRow]],
+          index,
+          modelBroadcast,
+          inputStepRowCounter)
       }
+    } else {
+      originRDD
+        .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
+        .mapPartitionsWithIndex { case (index, rows) =>
+          DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter)
+        }
+    }
 
     // 2. Convert
     val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index ff0e1bf..041019a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -97,6 +97,35 @@ object DataLoadProcessorStepOnSpark {
     }
   }
 
+  def inputFuncForCsvRows(
+      rows: Iterator[StringArrayRow],
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel],
+      rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+    val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
+    val conf = DataLoadProcessBuilder.createConfiguration(model)
+    val rowParser = new RowParserImpl(conf.getDataFields, conf)
+    val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
+    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
+      wrapException(e, model)
+    }
+
+    new Iterator[CarbonRow] {
+      override def hasNext: Boolean = rows.hasNext
+
+      override def next(): CarbonRow = {
+        val rawRow = rows.next().values.asInstanceOf[Array[Object]]
+        val row = if (isRawDataRequired) {
+          new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+        } else {
+          new CarbonRow(rowParser.parseRow(rawRow))
+        }
+        rowCounter.add(1)
+        row
+      }
+    }
+  }
+
   def internalInputFunc(
       rows: Iterator[InternalRow],
       index: Int,


Mime
View raw message