carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3495] Fix Insert into Complex data type of Binary failure with Carbon & SparkFileFormat
Date Wed, 11 Sep 2019 05:01:08 GMT
This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1345154  [CARBONDATA-3495] Fix Insert into Complex data type of Binary failure with
Carbon & SparkFileFormat
1345154 is described below

commit 13451547a7e5c6169d8ce375b152013a4f6b45bb
Author: Indhumathi27 <indhumathim27@gmail.com>
AuthorDate: Thu Aug 22 16:01:49 2019 +0530

    [CARBONDATA-3495] Fix Insert into Complex data type of Binary
    failure with Carbon & SparkFileFormat
    
    Problem:
    Insert into Complex data type(Array/Struct/Map) of binary data
    type fails with Invalid data type name, because Binary with
    complex data types is not handled
    
    Solution:
    Handle Binary data type to work with complex data types
    
    This closes #3361
---
 .../core/datastore/page/ComplexColumnPage.java     |   1 +
 .../apache/carbondata/core/util/DataTypeUtil.java  |   3 +
 .../src/test/resources/complexbinary.csv           |   3 +
 .../complexType/TestComplexDataType.scala          | 114 +++++++++++++++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   2 +
 .../SparkCarbonDataSourceBinaryTest.scala          |  88 ++++++++++++++++
 .../processing/datatypes/PrimitiveDataType.java    |   3 +
 .../org/apache/carbondata/sdk/file/ImageTest.java  |  41 ++++++++
 8 files changed, 255 insertions(+)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index 921ae50..c4f8849 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -124,6 +124,7 @@ public class ComplexColumnPage {
             DataTypes.isMapType(dataType) ||
             (dataType == DataTypes.STRING) ||
             (dataType == DataTypes.VARCHAR) ||
+            (dataType == DataTypes.BINARY) ||
             (dataType == DataTypes.DATE) ||
             DataTypes.isDecimal(dataType))))) {
       // For all these above condition the ColumnPage should be Taken as BYTE_ARRAY
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 9aea579..adb63cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -530,6 +530,7 @@ public final class DataTypeUtil {
   public static boolean isFixedSizeDataType(DataType dataType) {
     if (dataType == DataTypes.STRING ||
         dataType == DataTypes.VARCHAR ||
+        dataType == DataTypes.BINARY ||
         DataTypes.isDecimal(dataType)) {
       return false;
     } else {
@@ -1019,6 +1020,8 @@ public final class DataTypeUtil {
       return DataTypes.BYTE_ARRAY;
     } else if (DataTypes.BYTE_ARRAY.getName().equalsIgnoreCase(name)) {
       return DataTypes.BYTE_ARRAY;
+    } else if (DataTypes.BINARY.getName().equalsIgnoreCase(name)) {
+      return DataTypes.BINARY;
     } else if (name.equalsIgnoreCase("decimal")) {
       return DataTypes.createDefaultDecimalType();
     } else if (name.equalsIgnoreCase("array")) {
diff --git a/integration/spark-common-test/src/test/resources/complexbinary.csv b/integration/spark-common-test/src/test/resources/complexbinary.csv
new file mode 100644
index 0000000..3870f5f
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/complexbinary.csv
@@ -0,0 +1,3 @@
+1,true,abc,binary1$binary2,binary1,1&binary1
+2,false,abcd,binary11$binary12,binary11,1&binary2
+3,true,abcde,binary13$binary13,binary13,1&binary3
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index b5f77c2..9d6b4d1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -1013,4 +1013,118 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll
{
     checkAnswer(sql("select id,name,structField.intval,name,structField.stringval from table1"),Seq(Row(null,"aaa",23,"aaa","bb")))
   }
 
+  test("test array of binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+    sql("create table if not exists hive_table(id int, label boolean, name string," +
+        "binaryField array<binary>, autoLabel boolean) row format delimited fields
terminated by ','")
+    sql("insert into hive_table values(1,true,'abc',array('binary'),false)")
+    sql("create table if not exists carbon_table(id int, label boolean, name string," +
+        "binaryField array<binary>, autoLabel boolean) stored by 'carbondata'")
+    sql("insert into carbon_table values(1,true,'abc',array('binary'),false)")
+    checkAnswer(sql("SELECT binaryField[0] FROM carbon_table"),
+      sql("SELECT binaryField[0] FROM hive_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+  }
+
+  test("test struct of binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    sql("create table if not exists parquet_table(id int, label boolean, name string," +
+        "binaryField struct<b:binary>, autoLabel boolean) using parquet")
+    sql("insert into parquet_table values(1,true,'abc',named_struct('b','binary'),false)")
+    sql("create table if not exists carbon_table(id int, label boolean, name string," +
+        "binaryField struct<b:binary>, autoLabel boolean) stored by 'carbondata'")
+    sql("insert into carbon_table values(1,true,'abc',named_struct('b','binary'),false)")
+    sql("SELECT binaryField.b FROM carbon_table").show(false)
+    checkAnswer(sql("SELECT binaryField.b FROM carbon_table"),
+      sql("SELECT binaryField.b FROM parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+  }
+
+  test("test map of binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+    sql("create table if not exists hive_table(id int, label boolean, name string," +
+        "binaryField map<int, binary>, autoLabel boolean) row format delimited fields
terminated by ','")
+    sql("insert into hive_table values(1,true,'abc',map(1,'binary'),false)")
+    sql("create table if not exists carbon_table(id int, label boolean, name string," +
+        "binaryField map<int, binary>, autoLabel boolean) stored by 'carbondata'")
+    sql("insert into carbon_table values(1,true,'abc',map(1,'binary'),false)")
+    checkAnswer(sql("SELECT binaryField[1] FROM carbon_table"),
+      sql("SELECT binaryField[1] FROM hive_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+  }
+
+  test("test map of array and struct binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    sql("create table if not exists parquet_table(id int, label boolean, name string," +
+        "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>>
) " +
+        "using parquet")
+    sql("insert into parquet_table values(1,true,'abc',map(1,array('binary')),map(1," +
+        "named_struct('b','binary')))")
+    sql("create table if not exists carbon_table(id int, label boolean, name string," +
+        "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>>
) " +
+        "stored by 'carbondata'")
+    sql("insert into carbon_table values(1,true,'abc',map(1,array('binary')),map(1," +
+        "named_struct('b','binary')))")
+    checkAnswer(sql("SELECT binaryField1[1][1] FROM carbon_table"),
+      sql("SELECT binaryField1[1][1] FROM parquet_table"))
+    checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"),
+      sql("SELECT binaryField2[1].b FROM parquet_table"))
+    sql("drop table if exists hive_table")
+    sql("drop table if exists carbon_table")
+  }
+
+  test("test of array of struct and struct of array of binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+    sql("create table if not exists hive_table(id int, label boolean, name string," +
+        "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>>
) " +
+        "row format delimited fields terminated by ','")
+    sql("insert into hive_table values(1,true,'abc',array(named_struct('b1','binary')),"
+
+        "named_struct('b2',array('binary')))")
+    sql("create table if not exists carbon_table(id int, label boolean, name string," +
+        "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>>
) " +
+        "stored by 'carbondata'")
+    sql("insert into carbon_table values(1,true,'abc',array(named_struct('b1','binary')),"
+
+        "named_struct('b2',array('binary')))")
+    checkAnswer(sql("SELECT binaryField1[1].b1 FROM carbon_table"),
+      sql("SELECT  binaryField1[1].b1 FROM hive_table"))
+    checkAnswer(sql("SELECT binaryField2.b2[0] FROM carbon_table"),
+      sql("SELECT binaryField2.b2[0] FROM hive_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+  }
+
+  test("test dataload to complex of binary type column using load ddl ") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+    sql("create table if not exists hive_table(id int, label boolean, name string," +
+        "binaryField1 array<binary>, binaryField2 struct<b2:binary>, binaryField3
map<int," +
+        "binary>) row format delimited fields terminated by ','")
+    sql(
+      "insert into hive_table values(1,true,'abc',array('binary1','binary2'), named_struct('b2',"
+
+      "'binary1'), map(1,'binary1'))")
+    sql("create table if not exists carbon_table(id int, label boolean, name string," +
+        "binaryField1 array<binary>, binaryField2 struct<b2:binary>, binaryField3
map<int,binary>) " +
+        "stored by 'carbondata'")
+    sql(
+      "load data inpath '" + resourcesPath + "/complexbinary.csv' into table carbon_table
options" +
+      "('delimiter'=',',  'quotechar'='\\','fileheader'='id,label,name,binaryField1,binaryField2,"
+
+      "binaryField3','complex_delimiter_level_1'='$', 'complex_delimiter_level_2'='&')")
+    checkAnswer(sql("SELECT binaryField1[0] FROM carbon_table where id=1"),
+      sql("SELECT  binaryField1[0] FROM hive_table where id=1"))
+    checkAnswer(sql("SELECT binaryField2.b2 FROM carbon_table where id=1"),
+      sql("SELECT  binaryField2.b2 FROM hive_table where id=1"))
+    checkAnswer(sql("SELECT binaryField3[1] FROM carbon_table where id=1"),
+      sql("SELECT binaryField3[1] FROM hive_table where id=1"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+  }
+
 }
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3d3b89d..c331532 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -1511,6 +1511,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
     field.dataType.getOrElse("NIL") match {
       case "String" => Field(parentName + "." + field.column, Some("String"),
         Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "binary" => Field(parentName + "." + field.column, Some("Binary"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
       case "SmallInt" => Field(parentName + "." + field.column, Some("SmallInt"),
         Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
       case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
index 9ebf54f..9a3a9f0 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.carbondata.datasource
 
 import java.io.File
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.sdk.util.BinaryUtil
@@ -24,6 +25,7 @@ import org.apache.commons.codec.binary.{Base64, Hex}
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.util.SparkUtil
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
@@ -635,4 +637,90 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll
{
         assert(exception.getMessage.contains("Operation not allowed: DELETE FROM"))
     }
 
+    test("test array of binary data type with sparkfileformat ") {
+        sql("drop table if exists carbon_table")
+        sql("drop table if exists parquet_table")
+        sql("create table if not exists carbon_table(id int, label boolean, name string,"
+
+            "binaryField array<binary>, autoLabel boolean) using carbon")
+        sql("insert into carbon_table values(1,true,'abc',array('binary'),false)")
+        sql("create table if not exists parquet_table(id int, label boolean, name string,"
+
+            "binaryField array<binary>, autoLabel boolean) using parquet")
+        sql("insert into parquet_table values(1,true,'abc',array('binary'),false)")
+        checkAnswer(sql("SELECT binaryField[0] FROM carbon_table"),
+            sql("SELECT binaryField[0] FROM parquet_table"))
+        sql("drop table if exists carbon_table")
+        sql("drop table if exists parquet_table")
+    }
+
+    test("test struct of binary data type with sparkfileformat ") {
+        sql("drop table if exists carbon_table")
+        sql("drop table if exists parquet_table")
+        sql("create table if not exists carbon_table(id int, label boolean, name string,"
+
+            "binaryField struct<b:binary>, autoLabel boolean) using carbon")
+        sql("insert into carbon_table values(1,true,'abc',named_struct('b','binary'),false)")
+        sql("create table if not exists parquet_table(id int, label boolean, name string,"
+
+            "binaryField struct<b:binary>, autoLabel boolean) using parquet")
+        sql("insert into parquet_table values(1,true,'abc',named_struct('b','binary'),false)")
+        checkAnswer(sql("SELECT binaryField.b FROM carbon_table"),
+            sql("SELECT binaryField.b FROM parquet_table"))
+        sql("drop table if exists carbon_table")
+        sql("drop table if exists parquet_table")
+    }
+
+    test("test map of binary data type with sparkfileformat") {
+        sql("drop table if exists carbon_table")
+        sql("drop table if exists parquet_table")
+        sql("create table if not exists parquet_table(id int, label boolean, name string,"
+
+            "binaryField map<int, binary>, autoLabel boolean) using parquet")
+        sql("insert into parquet_table values(1,true,'abc',map(1,'binary'),false)")
+        sql("create table if not exists carbon_table(id int, label boolean, name string,"
+
+            "binaryField map<int, binary>, autoLabel boolean) using carbon")
+        sql("insert into carbon_table values(1,true,'abc',map(1,'binary'),false)")
+        checkAnswer(sql("SELECT binaryField[1] FROM carbon_table"),
+            sql("SELECT binaryField[1] FROM parquet_table"))
+        sql("drop table if exists carbon_table")
+        sql("drop table if exists parquet_table")
+    }
+
+    test("test map of array and struct binary data type with sparkfileformat") {
+        sql("drop table if exists carbon_table")
+        sql("drop table if exists parquet_table")
+        sql("create table if not exists parquet_table(id int, label boolean, name string,"
+
+            "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>>
) " +
+            "using parquet")
+        sql("insert into parquet_table values(1,true,'abc',map(1,array('binary')),map(1,"
+
+            "named_struct('b','binary')))")
+        sql("create table if not exists carbon_table(id int, label boolean, name string,"
+
+            "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>>
) " +
+            "using carbon")
+        sql("insert into carbon_table values(1,true,'abc',map(1,array('binary')),map(1,"
+
+            "named_struct('b','binary')))")
+        checkAnswer(sql("SELECT binaryField1[1][1] FROM carbon_table"),
+            sql("SELECT binaryField1[1][1] FROM parquet_table"))
+        checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"),
+            sql("SELECT binaryField2[1].b FROM parquet_table"))
+        sql("drop table if exists carbon_table")
+    }
+
+    test("test of array of struct and struct of array of binary data type with sparkfileformat")
{
+        sql("drop table if exists carbon_table")
+        sql("drop table if exists parquet_table")
+        sql("create table if not exists parquet_table(id int, label boolean, name string,"
+
+            "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>>
) " +
+            "using parquet")
+        sql("insert into parquet_table values(1,true,'abc',array(named_struct('b1','binary')),"
+
+            "named_struct('b2',array('binary')))")
+        sql("create table if not exists carbon_table(id int, label boolean, name string,"
+
+            "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>>
) " +
+            "using carbon")
+        sql("insert into carbon_table values(1,true,'abc',array(named_struct('b1','binary')),"
+
+            "named_struct('b2',array('binary')))")
+        checkAnswer(sql("SELECT binaryField1[1].b1 FROM carbon_table"),
+            sql("SELECT  binaryField1[1].b1 FROM parquet_table"))
+        checkAnswer(sql("SELECT binaryField2.b2[0] FROM carbon_table"),
+            sql("SELECT binaryField2.b2[0] FROM parquet_table"))
+        sql("drop table if exists carbon_table")
+        sql("drop table if exists parquet_table")
+    }
+
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 200a9f6..9504974 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -378,6 +378,9 @@ public class PrimitiveDataType implements GenericDataType<Object>
{
                 } else {
                   value = ByteUtil.toXorBytes(Long.parseLong(parsedValue));
                 }
+              } else if (this.carbonDimension.getDataType().equals(DataTypes.BINARY)) {
+                value = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(input,
+                    this.carbonDimension.getDataType());
               } else {
                 value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
                     this.carbonDimension.getDataType(), dateFormat);
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
index 30a881b..6f90155 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
@@ -22,6 +22,7 @@ import junit.framework.TestCase;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
@@ -1127,4 +1128,44 @@ public class ImageTest extends TestCase {
     }
   }
 
+  @Test public void testBinaryWithComplexType()
+      throws IOException, InvalidLoadOptionException, InterruptedException {
+    int num = 1;
+    int rows = 1;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[4];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("arrayField", DataTypes.createArrayType(DataTypes.BINARY));
+    ArrayList<StructField> structFields = new ArrayList<>();
+    structFields.add(new StructField("b", DataTypes.BINARY));
+    fields[3] = new Field("structField", DataTypes.createStructType(structFields));
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withCsvInput(new Schema(fields))
+          .writtenBy("BinaryExample").withPageSizeInMb(1).build();
+
+      for (int i = 0; i < rows; i++) {
+        // write data
+        writer.write(new String[] { "robot" + (i % 10), String.valueOf(i), "binary1", "binary2"
});
+      }
+      writer.close();
+    }
+    CarbonReader reader = CarbonReader.builder(path, "_temp").build();
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      Object[] arrayResult = (Object[]) row[1];
+      Object[] structResult = (Object[]) row[2];
+      assert (new String((byte[]) arrayResult[0]).equalsIgnoreCase("binary1"));
+      assert (new String((byte[]) structResult[0]).equalsIgnoreCase("binary2"));
+    }
+    reader.close();
+  }
+
 }


Mime
View raw message