carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-2437]Fixed No dictionary complex type issues when csv contains null values
Date Sun, 06 May 2018 19:02:39 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 5cad92f4f -> 729286919


[CARBONDATA-2437]Fixed No dictionary complex type issues when csv contains null values

Problem:Complex Type data loading is failing is for null values
Rootcause: This is failing because for null values Primitive type it is not writing length
and during converting to columnar format it is throwing BufferUnderFlowException
Solution:Write null values in LV format
Added code to support No Dictionary for complex type column, now default complex type column
will be No dictionary
No Dictionary complex column behaviour will same as Normal column

This closes #2266


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/72928691
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/72928691
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/72928691

Branch: refs/heads/master
Commit: 729286919cc68eaf356d52c4ff6c340fd3d96290
Parents: 5cad92f
Author: kumarvishal09 <kumarvishal1802@gmail.com>
Authored: Thu May 3 20:59:18 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Mon May 7 00:32:17 2018 +0530

----------------------------------------------------------------------
 .../complexType/TestComplexTypeQuery.scala      | 22 ++++++++----
 .../complexType/TestCreateTableWithDouble.scala |  4 +--
 ...estLoadDataWithHiveSyntaxDefaultFormat.scala |  6 ++--
 .../TestLoadDataWithHiveSyntaxUnsafe.scala      |  6 ++--
 .../command/carbonTableSchemaCommon.scala       | 35 ++++++++++++++++----
 .../TestStreamingTableOperation.scala           | 18 +++++-----
 .../TestStreamingTableWithRowParser.scala       | 18 +++++-----
 .../processing/datatypes/ArrayDataType.java     |  7 ++--
 .../processing/datatypes/PrimitiveDataType.java | 19 +++++++----
 .../processing/datatypes/StructDataType.java    |  8 +++--
 .../streaming/CarbonStreamInputFormat.java      | 18 +++++-----
 11 files changed, 102 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
index 1f66f26..bc44df0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
@@ -21,13 +21,23 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 /**
  * Test class of creating and loading for carbon table with double
  *
  */
 class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll {
 
+  var timestampFormat: String = _
   override def beforeAll: Unit = {
+    timestampFormat = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
     sql("drop table if exists complexcarbontable")
     sql("drop table if exists complexhivetable")
     sql("drop table if exists complex_filter")
@@ -43,9 +53,7 @@ class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll {
       "array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string,
" +
       "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>,
" +
       "proddate struct<productionDate:string,activeDeactivedate:array<string>>,
gamePointId " +
-      "double,contractNumber double)  STORED BY 'org.apache.carbondata.format'  TBLPROPERTIES
" +
-      "('DICTIONARY_INCLUDE'='deviceInformationId', 'DICTIONARY_EXCLUDE'='channelsId'," +
-      "'COLUMN_GROUP'='(ROMSize,ROMName)')")
+      "double,contractNumber double)  STORED BY 'org.apache.carbondata.format'")
     sql("LOAD DATA local inpath '" + resourcesPath +
         "/complextypesample.csv' INTO table complexcarbontable  OPTIONS('DELIMITER'=',',
" +
         "'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId,ROMSize,ROMName,"
+
@@ -63,13 +71,12 @@ class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll {
         s"complexhivetable")
     sql(
       "create table complex_filter(test1 int, test2 array<String>,test3 array<bigint>,test4
" +
-      "array<int>,test5 array<decimal>,test6 array<timestamp>,test7 array<double>)
STORED BY 'org" +
+      "array<int>,test5 array<string>,test6 array<timestamp>,test7 array<string>)
STORED BY 'org" +
       ".apache.carbondata.format'")
     sql("LOAD DATA INPATH '" + resourcesPath +
         "/array1.csv'  INTO TABLE complex_filter options ('DELIMITER'=',', 'QUOTECHAR'='\"',
" +
         "'COMPLEX_DELIMITER_LEVEL_1'='$', 'FILEHEADER'= 'test1,test2,test3,test4,test5,test6,"
+
         "test7')")
-      ()
 
     sql(
       "create table structusingarraycarbon (MAC struct<MAC1:array<string>," +
@@ -125,8 +132,7 @@ class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll {
       "ActiveCountry:string, ActiveProvince:string, Activecity:string, ActiveDistrict:string,
" +
       "ActiveStreet:string>>, proddate struct<productionDate:string," +
       "activeDeactivedate:array<string>>, gamePointId double,contractNumber double)
 STORED BY " +
-      "'org.apache.carbondata.format'  TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId',"
+
-      " 'DICTIONARY_EXCLUDE'='channelsId','COLUMN_GROUP'='(ROMSize,ROMName)')");
+      "'org.apache.carbondata.format'");
     sql("LOAD DATA local inpath '" + resourcesPath +
         "/complextypespecialchardelimiter.csv' INTO table complexcarbonwithspecialchardelimeter
 " +
         "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId,"
+
@@ -288,5 +294,7 @@ class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists structusingarrayhive")
     sql("drop table if exists complex_filter")
     sql("drop table if exists carbon_table")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestampFormat)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
index 2bda616..008ec6a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
@@ -46,7 +46,7 @@ class TestCreateTableWithDouble extends QueryTest with BeforeAndAfterAll
{
     try {
       sql("CREATE TABLE doubleComplex (Id int, number double, name string, " +
         "gamePoint array<double>, mac struct<num:double>) " +
-        "STORED BY 'org.apache.carbondata.format'")
+        "STORED BY 'org.apache.carbondata.format' tblproperties('dictionary_include' = 'gamePoint,mac')")
       sql(s"LOAD DATA LOCAL INPATH '$dataPath' INTO TABLE doubleComplex")
       countNum = sql(s"SELECT COUNT(*) FROM doubleComplex").collect
       doubleField = sql("SELECT number FROM doubleComplex SORT BY Id").collect
@@ -65,7 +65,7 @@ class TestCreateTableWithDouble extends QueryTest with BeforeAndAfterAll
{
       sql("CREATE TABLE doubleComplex2 (Id int, number double, name string, " +
         "gamePoint array<double>, mac struct<num:double>) " +
         "STORED BY 'org.apache.carbondata.format' " +
-        "TBLPROPERTIES('DICTIONARY_INCLUDE'='number')")
+        "TBLPROPERTIES('DICTIONARY_INCLUDE'='number,gamePoint,mac')")
       sql(s"LOAD DATA LOCAL INPATH '$dataPath' INTO TABLE doubleComplex2")
       countNum = sql(s"SELECT COUNT(*) FROM doubleComplex2").collect
       doubleField = sql(s"SELECT number FROM doubleComplex2 SORT BY Id").collect

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
index 1d5b33b..d0d578d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
@@ -566,7 +566,7 @@ class TestLoadDataWithHiveSyntaxDefaultFormat extends QueryTest with BeforeAndAf
            (ID decimal(5,5), date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, complex
            array<decimal(4,2)>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' tblproperties('dictionary_include'='complex')
       """
     )
 
@@ -588,7 +588,7 @@ class TestLoadDataWithHiveSyntaxDefaultFormat extends QueryTest with BeforeAndAf
            (ID decimal(5,5), date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, complex
            struct<a:decimal(4,2)>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' tblproperties('dictionary_include'='complex')
       """
     )
 
@@ -611,7 +611,7 @@ class TestLoadDataWithHiveSyntaxDefaultFormat extends QueryTest with BeforeAndAf
            (ID decimal, date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, complex
            array<struct<a:decimal(4,2),str:string>>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' tblproperties('dictionary_include'='complex')
       """
     )
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
index 599126b..de01092 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
@@ -576,7 +576,7 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
            (ID decimal(5,5), date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, complex
            array<decimal(4,2)>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' tblproperties('dictionary_include'='complex')
       """
     )
 
@@ -598,7 +598,7 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
            (ID decimal(5,5), date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, complex
            struct<a:decimal(4,2)>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' tblproperties('dictionary_include'='complex')
       """
     )
 
@@ -621,7 +621,7 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
            (ID decimal, date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, complex
            array<struct<a:decimal(4,2),str:string>>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' tblproperties('dictionary_include'='complex')
       """
     )
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index bb3b73a..c55d726 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.util.CarbonException
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -386,23 +387,34 @@ object TableNewProcessor {
 
 class TableNewProcessor(cm: TableModel) {
 
-  def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
+  def getAllChildren(fieldChildren: Option[List[Field]],
+      useDictionaryEncoding: Boolean): Seq[ColumnSchema] = {
     var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
     fieldChildren.foreach(fields => {
       fields.foreach(field => {
+        if (!useDictionaryEncoding &&
+            (field.dataType.get.equalsIgnoreCase("double") ||
+             field.dataType.get.equalsIgnoreCase("date") ||
+             field.dataType.get.equalsIgnoreCase("decimal"))) {
+          throw new MalformedCarbonCommandException(s"DICTIONARY_EXCLUDE is unsupported for
${
+            field.dataType.get} data type column: ${ field.column }")
+        }
         val encoders = new java.util.ArrayList[Encoding]()
-        encoders.add(Encoding.DICTIONARY)
+        if (useDictionaryEncoding) {
+          encoders.add(Encoding.DICTIONARY)
+        }
         val columnSchema: ColumnSchema = getColumnSchema(
           DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
           field.name.getOrElse(field.column),
           encoders,
           true,
           field,
-          cm.dataMapRelation)
+          cm.dataMapRelation,
+          useDictionaryEncoding = useDictionaryEncoding)
         allColumns ++= Seq(columnSchema)
         if (field.children.get != null) {
           columnSchema.setNumberOfChild(field.children.get.size)
-          allColumns ++= getAllChildren(field.children)
+          allColumns ++= getAllChildren(field.children, useDictionaryEncoding)
         }
       })
     })
@@ -415,7 +427,8 @@ class TableNewProcessor(cm: TableModel) {
       encoders: java.util.List[Encoding],
       isDimensionCol: Boolean,
       field: Field,
-      map: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]]) : ColumnSchema
= {
+      map: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]],
+      useDictionaryEncoding: Boolean = true) : ColumnSchema = {
     val columnSchema = new ColumnSchema()
     columnSchema.setDataType(dataType)
     columnSchema.setColumnName(colName)
@@ -428,7 +441,8 @@ class TableNewProcessor(cm: TableModel) {
     if (dataType == DataTypes.DATE) {
         encoders.add(Encoding.DIRECT_DICTIONARY)
       }
-    if (dataType == DataTypes.TIMESTAMP && !highCardinalityDims.contains(colName))
{
+      if (dataType == DataTypes.TIMESTAMP &&
+          !highCardinalityDims.contains(colName) && useDictionaryEncoding) {
         encoders.add(Encoding.DIRECT_DICTIONARY)
       }
     }
@@ -506,6 +520,9 @@ class TableNewProcessor(cm: TableModel) {
       index = index + 1
     }
 
+    val dictionaryIncludeCols = cm.tableProperties
+      .getOrElse(CarbonCommonConstants.DICTIONARY_INCLUDE, "")
+
     cm.dimCols.foreach { field =>
       val sortField = cm.sortKeyDims.get.find(field.column equals _)
       if (sortField.isEmpty) {
@@ -529,8 +546,12 @@ class TableNewProcessor(cm: TableModel) {
         allColumns :+= columnSchema
         index = index + 1
         if (field.children.isDefined && field.children.get != null) {
+          val includeDictionaryEncoding = dictionaryIncludeCols.contains(field.column)
+          if (!includeDictionaryEncoding) {
+            columnSchema.getEncodingList.remove(Encoding.DICTIONARY)
+          }
           columnSchema.setNumberOfChild(field.children.get.size)
-          allColumns ++= getAllChildren(field.children)
+          allColumns ++= getAllChildren(field.children, includeDictionaryEncoding)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index ae0425d..f46505a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -1153,12 +1153,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null order by name"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null)),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"),
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6",
"school_66")), 6))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where name = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and name <> ''"),
@@ -1166,7 +1166,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where city = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and city <> ''"),
@@ -1174,7 +1174,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where salary is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -1182,7 +1182,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where tax is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and tax is not null"),
@@ -1190,7 +1190,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where percent is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -1198,7 +1198,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where birthday is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and birthday is not
null"),
@@ -1206,7 +1206,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where register is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and register is not
null"),
@@ -1214,7 +1214,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where updated is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and updated is not
null"),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 064ff28..a6b0fec 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -590,12 +590,12 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null order by name"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null)),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"),
null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")),
6))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where name = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and name <> ''"),
@@ -603,7 +603,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where city = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and city <> ''"),
@@ -611,7 +611,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where salary is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -619,7 +619,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where tax is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and tax is not null"),
@@ -627,7 +627,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where percent is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -635,7 +635,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where birthday is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and birthday is not
null"),
@@ -643,7 +643,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where register is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null)),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"),
null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")),
6))))
 
     checkAnswer(
@@ -652,7 +652,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where updated is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)),
null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and updated is not
null"),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index fb198ea..d7d59ce 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -236,7 +236,10 @@ public class ArrayDataType implements GenericDataType<ArrayObject>
{
     columnsArray.get(this.outputArrayIndex).add(b.array());
 
     if (children instanceof PrimitiveDataType) {
-      ((PrimitiveDataType) children).setKeySize(inputArray.getInt());
+      PrimitiveDataType child = ((PrimitiveDataType) children);
+      if (child.getIsColumnDictionary()) {
+        child.setKeySize(inputArray.getInt());
+      }
     }
     for (int i = 0; i < dataLength; i++) {
       children.getColumnarDataForComplexType(columnsArray, inputArray);
@@ -284,4 +287,4 @@ public class ArrayDataType implements GenericDataType<ArrayObject>
{
   public GenericDataType<ArrayObject> deepCopy() {
     return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy());
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index fa60bf6..dee8968 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -336,8 +336,10 @@ public class PrimitiveDataType implements GenericDataType<Object>
{
 
   private void updateNullValue(DataOutputStream dataOutputStream) throws IOException {
     if (this.carbonDimension.getDataType() == DataTypes.STRING) {
+      dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
       dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     } else {
+      dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
       dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
     }
   }
@@ -393,12 +395,17 @@ public class PrimitiveDataType implements GenericDataType<Object>
{
   /*
    * split column and return metadata and primitive column
    */
-  @Override
-  public void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray,
+  @Override public void getColumnarDataForComplexType(List<ArrayList<byte[]>>
columnsArray,
       ByteBuffer inputArray) {
-    byte[] key = new byte[keySize];
-    inputArray.get(key);
-    columnsArray.get(outputArrayIndex).add(key);
+    if (!isDictionary) {
+      byte[] key = new byte[inputArray.getInt()];
+      inputArray.get(key);
+      columnsArray.get(outputArrayIndex).add(key);
+    } else {
+      byte[] key = new byte[keySize];
+      inputArray.get(key);
+      columnsArray.get(outputArrayIndex).add(key);
+    }
     dataCounter++;
   }
 
@@ -459,4 +466,4 @@ public class PrimitiveDataType implements GenericDataType<Object>
{
 
     return dataType;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 36899a9..4fe6255 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -154,7 +154,6 @@ public class StructDataType implements GenericDataType<StructObject>
{
       throws IOException, DictionaryGenerationException {
     dataOutputStream.writeInt(children.size());
     if (input == null) {
-      dataOutputStream.writeInt(children.size());
       for (int i = 0; i < children.size(); i++) {
         children.get(i).writeByteArray(null, dataOutputStream);
       }
@@ -267,7 +266,10 @@ public class StructDataType implements GenericDataType<StructObject>
{
 
     for (int i = 0; i < childElement; i++) {
       if (children.get(i) instanceof PrimitiveDataType) {
-        ((PrimitiveDataType) children.get(i)).setKeySize(inputArray.getInt());
+        PrimitiveDataType child = ((PrimitiveDataType) children.get(i));
+        if (child.getIsColumnDictionary()) {
+          child.setKeySize(inputArray.getInt());
+        }
       }
       children.get(i).getColumnarDataForComplexType(columnsArray, inputArray);
     }
@@ -324,4 +326,4 @@ public class StructDataType implements GenericDataType<StructObject>
{
     }
     return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
index 266fabd..644ac4c 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
@@ -97,16 +97,18 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void,
Object> {
             CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
         boolean isDictionary =
             CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DICTIONARY);
-
-        String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties()
-            .get(CarbonCommonConstants.DICTIONARY_PATH);
-        DictionaryColumnUniqueIdentifier dictionarIdentifier =
-            new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
-                child.getColumnIdentifier(), child.getDataType(), dictionaryPath);
-
+        Dictionary dictionary = null;
+        if (isDictionary) {
+          String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties()
+              .get(CarbonCommonConstants.DICTIONARY_PATH);
+          DictionaryColumnUniqueIdentifier dictionarIdentifier =
+              new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
+                  child.getColumnIdentifier(), child.getDataType(), dictionaryPath);
+          dictionary = cache.get(dictionarIdentifier);
+        }
         queryType =
             new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex,
-                child.getDataType(), 4, cache.get(dictionarIdentifier),
+                child.getDataType(), 4, dictionary,
                 isDirectDictionary);
       }
       parentQueryType.addChildren(queryType);


Mime
View raw message