carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [21/50] [abbrv] incubator-carbondata git commit: CARBONDATA-5 data mismatch between the carbon Table and Hive Table for columns having \N for non numeric data type (#802)
Date Wed, 20 Jul 2016 10:13:49 GMT
CARBONDATA-5 data mismatch between the carbon Table and Hive Table for columns having \N for
non numeric data type (#802)

LGTM

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a45dc4f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a45dc4f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a45dc4f7

Branch: refs/heads/master
Commit: a45dc4f7bc9d67b29ee18c6ee7d369b338565e21
Parents: a044680
Author: Mohammad Shahid Khan <mohdshahidkhan1987@gmail.com>
Authored: Sun Jul 17 00:44:23 2016 +0530
Committer: sujith71955 <sujithchacko.2010@gmail.com>
Committed: Sun Jul 17 00:44:23 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoadModel.java  |  23 ++++
 .../carbondata/spark/load/CarbonLoaderUtil.java |   1 +
 .../org/apache/spark/sql/CarbonSqlParser.scala  |   4 +-
 .../execution/command/carbonTableSchema.scala   |   3 +
 .../test/resources/nullvalueserialization.csv   |   2 +
 .../TestNullValueSerialization.scala            | 112 +++++++++++++++++++
 .../processing/api/dataloader/SchemaInfo.java   |  20 ++++
 .../graphgenerator/GraphGenerator.java          |  14 +++
 .../configuration/GraphConfigurationInfo.java   |  23 ++++
 .../processing/schema/metadata/TableOption.java |  82 ++++++++++++++
 .../schema/metadata/TableOptionWrapper.java     | 106 ++++++++++++++++++
 .../csvbased/CarbonCSVBasedSeqGenMeta.java      |  39 ++++++-
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |  14 ++-
 13 files changed, 435 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
index 2ae2a06..098c863 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
@@ -95,6 +95,11 @@ public class CarbonLoadModel implements Serializable {
   private String escapeChar;
 
   /**
+   * defines the string that should be treated as null while loadind data
+   */
+  private String serializationNullFormat;
+
+  /**
    * get escape char
    * @return
    */
@@ -301,6 +306,7 @@ public class CarbonLoadModel implements Serializable {
     copy.taskNo = taskNo;
     copy.factTimeStamp = factTimeStamp;
     copy.segmentId = segmentId;
+    copy.serializationNullFormat = serializationNullFormat;
     copy.escapeChar = escapeChar;
     return copy;
   }
@@ -338,6 +344,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.taskNo = taskNo;
     copyObj.factTimeStamp = factTimeStamp;
     copyObj.segmentId = segmentId;
+    copyObj.serializationNullFormat = serializationNullFormat;
     copyObj.escapeChar = escapeChar;
     return copyObj;
   }
@@ -491,4 +498,20 @@ public class CarbonLoadModel implements Serializable {
   public void setSegmentId(String segmentId) {
     this.segmentId = segmentId;
   }
+
+  /**
+   * the method returns the value to be treated as null while data load
+   * @return
+   */
+  public String getSerializationNullFormat() {
+    return serializationNullFormat;
+  }
+
+  /**
+   * the method sets the value to be treated as null while data load
+   * @param serializationNullFormat
+   */
+  public void setSerializationNullFormat(String serializationNullFormat) {
+    this.serializationNullFormat = serializationNullFormat;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index a9378d2..8bb8598 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -169,6 +169,7 @@ public final class CarbonLoaderUtil {
     info.setAutoAggregateRequest(loadModel.isAggLoadRequest());
     info.setComplexDelimiterLevel1(loadModel.getComplexDelimiterLevel1());
     info.setComplexDelimiterLevel2(loadModel.getComplexDelimiterLevel2());
+    info.setSerializationNullFormat(loadModel.getSerializationNullFormat());
 
     generateGraph(schmaModel, info, currentRestructNumber, loadModel, outPutLoc);
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index f2e5283..2533321 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -81,6 +81,7 @@ class CarbonSqlParser()
   protected val FACT = Keyword("FACT")
   protected val FIELDS = Keyword("FIELDS")
   protected val FILEHEADER = Keyword("FILEHEADER")
+  protected val SERIALIZATION_NULL_FORMAT = Keyword("SERIALIZATION_NULL_FORMAT")
   protected val FILES = Keyword("FILES")
   protected val FROM = Keyword("FROM")
   protected val HIERARCHIES = Keyword("HIERARCHIES")
@@ -951,7 +952,8 @@ class CarbonSqlParser()
     // validate with all supported options
     val options = partionDataOptions.get.groupBy(x => x._1)
     val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
-      "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT"
+      "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
+      "SERIALIZATION_NULL_FORMAT"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 040e3b6..3da0ff1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1531,6 +1531,7 @@ private[sql] case class LoadCube(
       val fileHeader = partionValues.getOrElse("fileheader", "")
       val escapeChar = partionValues.getOrElse("escapechar", "\\")
       val columnDict = partionValues.getOrElse("columndict", null)
+      val serializationNullFormat = partionValues.getOrElse("serialization_null_format",
"\\N")
       val complex_delimiter_level_1 = partionValues.getOrElse("complex_delimiter_level_1",
"\\$")
       val complex_delimiter_level_2 = partionValues.getOrElse("complex_delimiter_level_2",
"\\:")
       val multiLine = partionValues.getOrElse("multiline", "false").trim.toLowerCase match
{
@@ -1543,6 +1544,8 @@ private[sql] case class LoadCube(
       }
 
       carbonLoadModel.setEscapeChar(escapeChar)
+      carbonLoadModel.setSerializationNullFormat("serialization_null_format" + "," +
+        serializationNullFormat)
       if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
           complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
           delimiter.equalsIgnoreCase(complex_delimiter_level_2)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/test/resources/nullvalueserialization.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/nullvalueserialization.csv b/integration/spark/src/test/resources/nullvalueserialization.csv
new file mode 100644
index 0000000..9d1dba8
--- /dev/null
+++ b/integration/spark/src/test/resources/nullvalueserialization.csv
@@ -0,0 +1,2 @@
+1,2015-17-23 00:00:00,china,aaa1,phone197,ASD69643,15000.43525
+2,\N,\N,\N,\N,\N,\N

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/test/scala/org/carbondata/spark/testsuite/nullvalueserialization/TestNullValueSerialization.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/nullvalueserialization/TestNullValueSerialization.scala
b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/nullvalueserialization/TestNullValueSerialization.scala
new file mode 100644
index 0000000..d68903d
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/nullvalueserialization/TestNullValueSerialization.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.spark.testsuite.nullvalueserialization
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.util.CarbonProperties
+import org.scalatest.BeforeAndAfterAll
+
+/**
+  * Test cases for testing columns having \N or \null values for non numeric columns
+  */
+class TestNullValueSerialization extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists carbonTable")
+    sql("drop table if exists hiveTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+      )
+    val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+      .getCanonicalPath
+    val csvFilePath = currentDirectory + "/src/test/resources/nullvalueserialization.csv"
+    sql(
+      "CREATE TABLE IF NOT EXISTS carbonTable (ID String, date Timestamp, country String,
name " +
+        "String, phonetype String, serialname String, salary Decimal(17,2))STORED BY 'org.apache"
+
+        ".carbondata.format'"
+    )
+    sql(
+      "create table if not exists hiveTable(ID String, date Timestamp, country String, name
" +
+        "String, " +
+        "phonetype String, serialname String, salary Decimal(17,2))row format delimited fields
" +
+        "terminated by ','"
+    )
+    sql(
+      "LOAD DATA LOCAL INPATH '" + csvFilePath + "' into table carbonTable OPTIONS " +
+        "('FILEHEADER'='ID,date," +
+        "country,name,phonetype,serialname,salary')"
+    )
+    sql(
+      "LOAD DATA local inpath '" + csvFilePath + "' INTO table hiveTable"
+    )
+  }
+
+
+  test("test detail query on column having null values") {
+    System.out.println("Carbon Table")
+    sql("select * from carbonTable").show()
+    System.out.println("Hive Table")
+    sql("select * from hiveTable").show()
+    checkAnswer(
+      sql("select * from carbonTable"),
+      sql("select * from hiveTable")
+    )
+  }
+
+    test("test filter query on column is null") {
+      checkAnswer(
+        sql("select * from carbonTable where salary is null"),
+        sql("select * from hiveTable where salary is null")
+      )
+    }
+
+    test("test filter query on column is not null") {
+      checkAnswer(
+        sql("select * from carbonTable where salary is not null"),
+        sql("select * from hiveTable where salary is not null")
+      )
+    }
+
+    test("test filter query on columnValue=null") {
+      checkAnswer(
+        sql("select * from carbonTable where salary=null"),
+        sql("select * from hiveTable where salary=null")
+      )
+    }
+
+    test("test filter query where date is null") {
+      checkAnswer(
+        sql("select * from carbonTable where date is null"),
+        sql("select * from hiveTable where date is null")
+      )
+    }
+
+  override def afterAll {
+    sql("drop table if exists carbonTable")
+    sql("drop table if exists hiveTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java
b/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java
index 3f6b8a0..c52e3a0 100644
--- a/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java
+++ b/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java
@@ -59,6 +59,10 @@ public class SchemaInfo {
   private String complexDelimiterLevel1;
 
   private String complexDelimiterLevel2;
+  /**
+   * the value to be treated as null while data load
+   */
+  private String serializationNullFormat;
 
   public String getComplexDelimiterLevel1() {
     return complexDelimiterLevel1;
@@ -168,4 +172,20 @@ public class SchemaInfo {
     this.schemaName = schemaName;
   }
 
+  /**
+   * the method returns the value to be treated as null while data load
+   * @return
+   */
+  public String getSerializationNullFormat() {
+    return serializationNullFormat;
+  }
+
+  /**
+   * the method sets the value to be treated as null while data load
+   * @param serializationNullFormat
+   */
+  public void setSerializationNullFormat(String serializationNullFormat) {
+    this.serializationNullFormat = serializationNullFormat;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
index 92c00b5..ef846fe 100644
--- a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
+++ b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -46,6 +46,7 @@ import org.carbondata.processing.csvreaderstep.CsvInputMeta;
 import org.carbondata.processing.graphgenerator.configuration.GraphConfigurationInfo;
 import org.carbondata.processing.mdkeygen.MDKeyGenStepMeta;
 import org.carbondata.processing.merger.step.CarbonSliceMergerStepMeta;
+import org.carbondata.processing.schema.metadata.TableOptionWrapper;
 import org.carbondata.processing.sortandgroupby.sortdatastep.SortKeyStepMeta;
 import org.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedSeqGenMeta;
 import org.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -567,6 +568,7 @@ public class GraphGenerator {
     seqMeta.setNormHiers(graphConfiguration.getNormHiers());
     seqMeta.setHeirKeySize(graphConfiguration.getHeirAndKeySizeString());
     seqMeta.setColumnSchemaDetails(graphConfiguration.getColumnSchemaDetails().toString());
+    seqMeta.setTableOption(graphConfiguration.getTableOptionWrapper().toString());
     String[] aggType = graphConfiguration.getAggType();
     StringBuilder builder = new StringBuilder();
     for (int i = 0; i < aggType.length; i++) {
@@ -797,6 +799,7 @@ public class GraphGenerator {
     prepareNoDictionaryMapping(dimensions, graphConfiguration);
     graphConfiguration
         .setColumnSchemaDetails(CarbonSchemaParser.getColumnSchemaDetails(dimensions));
+    graphConfiguration.setTableOptionWrapper(getTableOptionWrapper());
     String factTableName = carbonDataLoadSchema.getCarbonTable().getFactTableName();
     graphConfiguration.setTableName(factTableName);
     StringBuilder dimString = new StringBuilder();
@@ -901,6 +904,17 @@ public class GraphGenerator {
     return graphConfiguration;
   }
 
+  /**
+   * the method returns the table option wrapper
+   *
+   * @return
+   */
+  private TableOptionWrapper getTableOptionWrapper() {
+    TableOptionWrapper tableOptionWrapper = TableOptionWrapper.getTableOptionWrapperInstance();
+    tableOptionWrapper.setTableOption(schemaInfo.getSerializationNullFormat());
+    return tableOptionWrapper;
+  }
+
   private String getQuoteType(SchemaInfo schemaInfo) throws GraphGeneratorException {
     String driverClass = schemaInfo.getSrcDriverName();
     String type = DRIVERS.get(driverClass);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
b/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
index 1b2afa5..c4406ab 100644
--- a/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
+++ b/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
@@ -22,6 +22,7 @@ package org.carbondata.processing.graphgenerator.configuration;
 import java.util.Map;
 
 import org.carbondata.processing.schema.metadata.ColumnSchemaDetailsWrapper;
+import org.carbondata.processing.schema.metadata.TableOptionWrapper;
 
 public class GraphConfigurationInfo {
   private String connectionName;
@@ -200,6 +201,11 @@ public class GraphConfigurationInfo {
   private ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper;
 
   /**
+   * wrapper object holding the table options details needed while dataload
+   */
+  private TableOptionWrapper tableOptionWrapper;
+
+  /**
    * It is column groups in below format
    * 0,1~2~3,4,5,6~7~8,9
    * groups are
@@ -998,6 +1004,23 @@ public class GraphConfigurationInfo {
     return columnSchemaDetailsWrapper;
   }
 
+  /**
+   * set wraper object having table options needed while dataload
+   *
+   * @return
+   */
+  public void setTableOptionWrapper(TableOptionWrapper tableOptionWrapper) {
+    this.tableOptionWrapper = tableOptionWrapper;
+  }
+
+  /**
+   * method returns the table options detail wrapper instance.
+   * @return
+   */
+  public TableOptionWrapper getTableOptionWrapper() {
+    return tableOptionWrapper;
+  }
+
   public void setColumnGroupsString(String columnGroups) {
     this.columnGroupsString = columnGroups;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOption.java
b/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOption.java
new file mode 100644
index 0000000..b487c72
--- /dev/null
+++ b/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOption.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.processing.schema.metadata;
+
+/**
+ * This class is to hold the key value pair of properties needed while dataload.
+ */
+public class TableOption {
+  /**
+   * option key name
+   */
+  private String optionKey;
+  /**
+   * option key value
+   */
+  private String optionValue;
+
+  /**
+   * the constructor to initialize the key value pair TableOption instance
+   *
+   * @param optionKey
+   * @param optionValue
+   */
+  public TableOption(String optionKey, String optionValue) {
+    this.optionKey = optionKey;
+    this.optionValue = optionValue;
+  }
+
+  /**
+   * constructor to init from te string separated by comma(,)
+   *
+   * @param str
+   */
+  public TableOption(String str) {
+    //passing 2 to split the key value pair having empty value for the corresponding key.
+    String[] split = str.split(",", 2);
+    this.optionKey = split[0];
+    this.optionValue = split[1];
+  }
+
+  /**
+   * returns options key
+   *
+   * @return
+   */
+  public String getOptionKey() {
+    return optionKey;
+  }
+
+  /**
+   * returns options value
+   *
+   * @return
+   */
+  public String getOptionValue() {
+    return optionValue;
+  }
+
+  /**
+   * @return
+   */
+  public String toString() {
+    return optionKey + "," + optionValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOptionWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOptionWrapper.java
b/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOptionWrapper.java
new file mode 100644
index 0000000..adb077c
--- /dev/null
+++ b/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOptionWrapper.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.processing.schema.metadata;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * The class hold the table option details being used while dataload
+ */
+public class TableOptionWrapper {
+  /**
+   * map holds the table options
+   */
+  private static final Map<String, TableOption> mapOFOptions =
+      new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  private static TableOptionWrapper tableOptionWrapper = new TableOptionWrapper();
+
+  /**
+   * to  initialize the wrapper object
+   */
+  private TableOptionWrapper() {
+  }
+
+  /**
+   * @param input
+   */
+  public static void populateTableOptions(String input) {
+    String[] split =
+        null != input ? input.split(CarbonCommonConstants.HASH_SPC_CHARACTER) : new String[0];
+    for (String str : split) {
+      TableOption tableOption = new TableOption(str);
+      mapOFOptions.put(tableOption.getOptionKey(), tableOption);
+    }
+  }
+
+  /**
+   * @param input
+   */
+  public static void setTableOption(String input) {
+    if (null != input) {
+      TableOption tableOption = new TableOption(input);
+      mapOFOptions.put(tableOption.getOptionKey(), tableOption);
+    }
+  }
+
+  /**
+   * returns TableOptionWrapper instance
+   *
+   * @return
+   */
+  public static TableOptionWrapper getTableOptionWrapperInstance() {
+    return tableOptionWrapper;
+  }
+
+  /**
+   * returns the options key value
+   * return null if the key is not found in the map
+   *
+   * @param key
+   * @return
+   */
+  public String get(String key) {
+    TableOption tableOption = mapOFOptions.get(key);
+    return null != tableOption ? tableOption.getOptionValue() : null;
+  }
+
+  /**
+   * return the string object
+   *
+   * @return
+   */
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    Set<Map.Entry<String, TableOption>> entries = mapOFOptions.entrySet();
+    Iterator<Map.Entry<String, TableOption>> iterator = entries.iterator();
+
+    while (iterator.hasNext()) {
+      Map.Entry<String, TableOption> entry = iterator.next();
+      builder.append(entry.getValue().toString());
+      builder.append(CarbonCommonConstants.HASH_SPC_CHARACTER);
+    }
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
index 77f7297..dad2bbc 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
@@ -38,6 +38,7 @@ import org.carbondata.processing.datatypes.PrimitiveDataType;
 import org.carbondata.processing.datatypes.StructDataType;
 import org.carbondata.processing.schema.metadata.ColumnSchemaDetailsWrapper;
 import org.carbondata.processing.schema.metadata.HierarchiesInfo;
+import org.carbondata.processing.schema.metadata.TableOptionWrapper;
 import org.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.carbondata.processing.util.RemoveDictionaryUtil;
 
@@ -363,14 +364,24 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements
StepMetaIn
   private String segmentId;
 
   /***
-   * String of columns ordinal and column datatype separated by COLON_SPC_CHARACTER
+   * String of columns ordinal and column datatype separated by HASH_SPC_CHARACTER
    */
   private String columnSchemaDetails;
 
   /**
+   * String of key value pair separated by , and HASH_SPC_CHARACTER
+   */
+  private String tableOption;
+
+  /**
    * wrapper object having the columnSchemaDetails
    */
   private ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper;
+
+  /**
+   * Wrapper object holding the table options
+   */
+  private TableOptionWrapper tableOptionWrapper;
   /**
    * task id, each spark task has a unique id
    */
@@ -644,6 +655,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements
StepMetaIn
     taskNo = "";
     columnSchemaDetails = "";
     columnsDataTypeString="";
+    tableOption = "";
   }
 
   // helper method to allocate the arrays
@@ -709,6 +721,8 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements
StepMetaIn
     retval.append("    ").append(XMLHandler.addTagValue("taskNo", taskNo));
     retval.append("    ")
         .append(XMLHandler.addTagValue("columnSchemaDetails", columnSchemaDetails));
+    retval.append("    ")
+        .append(XMLHandler.addTagValue("tableOption", tableOption));
     return retval.toString();
   }
 
@@ -758,6 +772,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements
StepMetaIn
       segmentId = XMLHandler.getTagValue(stepnode, "segmentId");
       taskNo = XMLHandler.getTagValue(stepnode, "taskNo");
       columnSchemaDetails = XMLHandler.getTagValue(stepnode, "columnSchemaDetails");
+      tableOption = XMLHandler.getTagValue(stepnode, "tableOption");
       String batchConfig = XMLHandler.getTagValue(stepnode, "batchSize");
 
       if (batchConfig != null) {
@@ -784,7 +799,9 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements
StepMetaIn
   }
 
   public void initialize() throws KettleException {
-    columnSchemaDetailsWrapper = new ColumnSchemaDetailsWrapper(columnSchemaDetails);
+    this.columnSchemaDetailsWrapper = new ColumnSchemaDetailsWrapper(columnSchemaDetails);
+    this.tableOptionWrapper = TableOptionWrapper.getTableOptionWrapperInstance();
+    tableOptionWrapper.populateTableOptions(tableOption);
 
     updateDimensions(carbondim, carbonmsr, noDictionaryDims);
     dimColDataTypes=RemoveDictionaryUtil.extractDimColsDataTypeValues(columnsDataTypeString);
@@ -1324,6 +1341,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements
StepMetaIn
       segmentId = rep.getStepAttributeString(idStep, "segmentId");
       taskNo = rep.getStepAttributeString(idStep, "taskNo");
       columnSchemaDetails = rep.getStepAttributeString(idStep, "columnSchemaDetails");
+      tableOption = rep.getStepAttributeString(idStep, "tableOption");
       int nrKeys = rep.countNrStepAttributes(idStep, "lookup_keyfield");
       allocate(nrKeys);
     } catch (Exception e) {
@@ -1378,6 +1396,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements
StepMetaIn
       rep.saveStepAttribute(idTransformation, idStep, "segmentId", segmentId);
       rep.saveStepAttribute(idTransformation, idStep, "taskNo", taskNo);
       rep.saveStepAttribute(idTransformation, idStep, "columnSchemaDetails", columnSchemaDetails);
+      rep.saveStepAttribute(idTransformation, idStep, "tableOption", tableOption);
     } catch (Exception e) {
       throw new KettleException(
           BaseMessages.getString(pkg, "CarbonStep.Exception.UnableToSaveStepInfoToRepository")
@@ -1678,5 +1697,21 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements
StepMetaIn
   public ColumnSchemaDetailsWrapper getColumnSchemaDetailsWrapper() {
     return columnSchemaDetailsWrapper;
   }
+
+  /**
+   * the method set the TableOption details
+   * @param tableOption
+   */
+  public void setTableOption(String tableOption) {
+    this.tableOption = tableOption;
+  }
+
+  /**
+   * the method returns the wrapper object of tableoption
+   * @return
+   */
+  public TableOptionWrapper getTableOptionWrapper() {
+    return tableOptionWrapper;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index c7b2024..237d793 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -163,10 +163,6 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
    */
   private boolean[] dimPresentCsvOrder;
   /**
-   * ValueToCheckAgainst
-   */
-  private String valueToCheckAgainst;
-  /**
    * propMap
    */
   private Map<String, int[]> propMap;
@@ -436,6 +432,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
         if (null != getInputRowMeta()) {
           generateNoDictionaryAndComplexIndexMapping();
         }
+        serializationNullFormat = meta.getTableOptionWrapper().get("serialization_null_format");
       }
       // no more input to be expected...
       if (r == null) {
@@ -546,6 +543,11 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
     return false;
   }
 
+  /**
+   * holds the value to be considered as null while dataload
+   */
+  private String serializationNullFormat;
+
   private List<String> getDenormalizedHierarchies() {
     List<String> hierList = Arrays.asList(meta.hierNames);
     List<String> denormHiers = new ArrayList<String>(10);
@@ -877,7 +879,9 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
     int i = 0;
     for (Object obj : rowValue) {
       if (obj != null) {
-        if (obj.equals(valueToCheckAgainst)) {
+        //removed valueToCheckAgainst does not make sense to
+        // compare non null object with a null string
+        if (obj.toString().equalsIgnoreCase(serializationNullFormat)) {
           rowValue[i] = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
         }
       } else {


Mime
View raw message