carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: complex data type supported with spark 2.1
Date Tue, 21 Feb 2017 06:24:01 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 1d925a656 -> b400b82ce


complex data type supported with spark 2.1

complex datatype supported with spark 1.6

imports optimized

imports optimized

style error resolved

CarbonDictionaryDecoder changed to support complex data type

scalastyle error resolved

complex datatype check added


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3adb62e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3adb62e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3adb62e1

Branch: refs/heads/master
Commit: 3adb62e1fb1234f115ebdc1091df691f62740577
Parents: 1d925a6
Author: rahulforallp <rahul.kumar@knoldus.in>
Authored: Thu Feb 9 12:40:21 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Feb 21 11:52:45 2017 +0530

----------------------------------------------------------------------
 examples/spark2/src/main/resources/data.csv     | 20 +++++-----
 .../examples/CarbonSessionExample.scala         |  5 ++-
 .../spark/sql/CarbonDictionaryDecoder.scala     | 12 ++++--
 .../spark/sql/CarbonDictionaryDecoder.scala     | 40 +++++++++++---------
 4 files changed, 43 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3adb62e1/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv
index a9cdf9e..da474a0 100644
--- a/examples/spark2/src/main/resources/data.csv
+++ b/examples/spark2/src/main/resources/data.csv
@@ -1,10 +1,10 @@
-1,10,100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23 11:01:01,aaa,2.5
-5,17,140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27 11:01:02,bbb,2.5
-1,11,100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23 11:01:03,ccc,2.5
-1,10,150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24 11:01:04,ddd,2.5
-1,10,100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23 11:01:05,eeee,3.5
-3,14,160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26 11:01:06,ff,2.5
-2,10,100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23 11:01:07,ggg,2.5
-1,10,100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23 11:01:08,hhh,2.5
-4,16,130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23 11:01:09,iii,2.5
-1,10,100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23 11:01:10,jjj,2.5
+1,10,100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23 11:01:01,aaa,2.5,'foo'#'bar'#'world'
+5,17,140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27 11:01:02,bbb,2.5,'foo'#'bar'#'world'
+1,11,100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23 11:01:03,ccc,2.5,'foo'#'bar'#'world'
+1,10,150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24 11:01:04,ddd,2.5,'foo'#'bar'#'world'
+1,10,100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23 11:01:05,eeee,3.5,'foo'#'bar'#'world'
+3,14,160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26 11:01:06,ff,2.5,'foo'#'bar'#'world'
+2,10,100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23 11:01:07,ggg,2.5,'foo'#'bar'#'world'
+1,10,100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23 11:01:08,hhh,2.5,'foo'#'bar'#'world'
+4,16,130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23 11:01:09,iii,2.5,'foo'#'bar'#'world'
+1,10,100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23 11:01:10,jjj,2.5,'foo'#'bar'#'world'

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3adb62e1/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 1d485cd..1c3b7f0 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -63,7 +63,8 @@ object CarbonSessionExample {
          |    decimalField decimal(18,2),
          |    dateField date,
          |    charField char(5),
-         |    floatField float
+         |    floatField float,
+         |    complexData array<string>
          | )
          | STORED BY 'carbondata'
          | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
@@ -76,7 +77,7 @@ object CarbonSessionExample {
       s"""
          | LOAD DATA LOCAL INPATH '$path'
          | INTO TABLE carbon_table
-         | options('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField')
+         | options('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData','COMPLEX_DELIMITER_LEVEL_1'='#')
        """.stripMargin)
     // scalastyle:on
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3adb62e1/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 4204eca..470ca1f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -54,13 +54,14 @@ case class CarbonDictionaryDecoder(
     child.output.map { a =>
       val attr = aliasMap.getOrElse(a, a)
       val relation = relations.find(p => p.contains(attr))
-      if(relation.isDefined && canBeDecoded(attr)) {
+      if (relation.isDefined && canBeDecoded(attr)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension = carbonTable
           .getDimensionByName(carbonTable.getFactTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+            !carbonDimension.isComplex()) {
           val newAttr = AttributeReference(a.name,
             convertCarbonToSparkDataType(carbonDimension,
               relation.get.carbonRelation.carbonRelation),
@@ -123,13 +124,14 @@ case class CarbonDictionaryDecoder(
     val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
       val attr = aliasMap.getOrElse(a, a)
       val relation = relations.find(p => p.contains(attr))
-      if(relation.isDefined && canBeDecoded(attr)) {
+      if (relation.isDefined && canBeDecoded(attr)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension =
           carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+            !carbonDimension.isComplex()) {
           (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
             carbonDimension.getDataType)
         } else {
@@ -182,6 +184,7 @@ case class CarbonDictionaryDecoder(
             val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
             var flag = true
             var total = 0L
+
             override final def hasNext: Boolean = {
               flag = iter.hasNext
               if (!flag && total > 0) {
@@ -193,6 +196,7 @@ case class CarbonDictionaryDecoder(
               }
               flag
             }
+
             override final def next(): InternalRow = {
               val startTime = System.currentTimeMillis()
               val row: InternalRow = iter.next()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3adb62e1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 904edb8..0c80a36 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -52,13 +52,14 @@ case class CarbonDictionaryDecoder(
     child.output.map { a =>
       val attr = aliasMap.getOrElse(a, a)
       val relation = relations.find(p => p.contains(attr))
-      if(relation.isDefined && canBeDecoded(attr)) {
+      if (relation.isDefined && canBeDecoded(attr)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension = carbonTable
           .getDimensionByName(carbonTable.getFactTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+            !carbonDimension.isComplex()) {
           val newAttr = AttributeReference(a.name,
             convertCarbonToSparkDataType(carbonDimension,
               relation.get.carbonRelation.carbonRelation),
@@ -124,13 +125,14 @@ case class CarbonDictionaryDecoder(
     val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
       val attr = aliasMap.getOrElse(a, a)
       val relation = relations.find(p => p.contains(attr))
-      if(relation.isDefined && canBeDecoded(attr)) {
+      if (relation.isDefined && canBeDecoded(attr)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension =
           carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+            !carbonDimension.isComplex()) {
           (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
             carbonDimension.getDataType)
         } else {
@@ -175,7 +177,9 @@ case class CarbonDictionaryDecoder(
             val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
             var flag = true
             var total = 0L
+
             override final def hasNext: Boolean = iter.hasNext
+
             override final def next(): InternalRow = {
               val startTime = System.currentTimeMillis()
               val row: InternalRow = iter.next()
@@ -227,15 +231,13 @@ case class CarbonDictionaryDecoder(
 }
 
 
-
-
 class CarbonDecoderRDD(
     relations: Seq[CarbonDecoderRelation],
     profile: CarbonProfile,
     aliasMap: CarbonAliasDecoderRelation,
     prev: RDD[InternalRow],
     output: Seq[Attribute])
-    extends RDD[InternalRow](prev) {
+  extends RDD[InternalRow](prev) {
 
   private val storepath = CarbonEnv.get.carbonMetastore.storePath
 
@@ -243,16 +245,16 @@ class CarbonDecoderRDD(
     profile match {
       case ip: IncludeProfile if ip.attributes.nonEmpty =>
         ip.attributes
-            .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
       case ep: ExcludeProfile =>
         !ep.attributes
-            .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
       case _ => true
     }
   }
 
   def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
-                                   relation: CarbonRelation): types.DataType = {
+      relation: CarbonRelation): types.DataType = {
     carbonDimension.getDataType match {
       case DataType.STRING => StringType
       case DataType.SHORT => ShortType
@@ -272,10 +274,10 @@ class CarbonDecoderRDD(
       case DataType.DATE => DateType
       case DataType.STRUCT =>
         CarbonMetastoreTypes
-            .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName)
}>")
+          .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName)
}>")
       case DataType.ARRAY =>
         CarbonMetastoreTypes
-            .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName)
}>")
+          .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName)
}>")
     }
   }
 
@@ -283,15 +285,16 @@ class CarbonDecoderRDD(
     val dictIds: Array[(String, ColumnIdentifier, DataType)] = output.map { a =>
       val attr = aliasMap.getOrElse(a, a)
       val relation = relations.find(p => p.contains(attr))
-      if(relation.isDefined && canBeDecoded(attr)) {
+      if (relation.isDefined && canBeDecoded(attr)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension =
           carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+            !carbonDimension.isComplex()) {
           (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
-              carbonDimension.getDataType)
+            carbonDimension.getDataType)
         } else {
           (null, null, null)
         }
@@ -330,15 +333,16 @@ class CarbonDecoderRDD(
       var flag = true
       var total = 0L
       val dataTypes = output.map { attr => attr.dataType }
+
       override final def hasNext: Boolean = iter.hasNext
 
       override final def next(): InternalRow = {
         val row: InternalRow = iter.next()
         val data = row.toSeq(dataTypes).toArray
         dictIndex.foreach { index =>
-          if ( data(index) != null) {
+          if (data(index) != null) {
             data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
-                .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+              .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
               getDictionaryColumnIds(index)._3)
           }
         }
@@ -348,7 +352,7 @@ class CarbonDecoderRDD(
   }
 
   private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
-                            cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) =
{
+      cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
     val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
       if (f._2 != null) {
         try {


Mime
View raw message