carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [41/50] [abbrv] carbondata git commit: [CARBONDATA-1936][PARTITION] Corrected bad record and avoid double conversion of data in Partitioning table
Date Sun, 07 Jan 2018 03:05:49 GMT
[CARBONDATA-1936][PARTITION] Corrected bad record and avoid double conversion of data in Partitioning table

Currently, one time data conversion happens while loading data while creating RDD to make sure the partitions are added with the right format. But this approach creates an issue in case of bad record handling as the writing of bad records not possible from RDD.
In this PR we don't convert the data in RDD but convert the data while adding the partition information to hive.

This closes #1729


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6f10c412
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6f10c412
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6f10c412

Branch: refs/heads/carbonstore
Commit: 6f10c4127914c94fdd6f7af16512e0758f2bb146
Parents: e40b34b
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Wed Dec 27 14:45:36 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Fri Jan 5 00:58:14 2018 +0800

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  10 +
 .../carbondata/core/util/SessionParams.java     |  10 +
 .../src/test/resources/emp.csv                  |  14 +
 .../StandardPartitionBadRecordLoggerTest.scala  | 247 +++++++++++++++
 .../StandardPartitionTableLoadingTestCase.scala |   9 +
 .../StandardPartitionTableQueryTestCase.scala   |   8 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala | 171 +++++++++--
 .../management/CarbonLoadDataCommand.scala      | 302 ++++++++++---------
 .../datasources/CarbonFileFormat.scala          |  93 +++++-
 .../src/main/spark2.1/CarbonSessionState.scala  |  28 +-
 .../src/main/spark2.2/CarbonSessionState.scala  |  23 +-
 11 files changed, 724 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 30d5959..bcfeba0 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -100,6 +100,16 @@ public final class CarbonLoadOptionConstants {
       "carbon.options.global.sort.partitions";
 
   /**
+   * specify serialization null format, it is used describe which character in side the csv file
+   * is treated as null.
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT =
+      "carbon.options.serialization.null.format";
+
+  public static final String CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT_DEFAULT = "\\N";
+
+  /**
    *  Max number of dictionary values that can be given with external dictionary
    */
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 10000000;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 0540ed6..3f0e856 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -37,8 +37,10 @@ import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CAR
 import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT;
 import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS;
 import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT;
 import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS;
 import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT;
 
 /**
  * This class maintains carbon session params
@@ -168,6 +170,14 @@ public class SessionParams implements Serializable {
       case CARBON_OPTIONS_DATEFORMAT:
         isValid = true;
         break;
+      // no validation needed while set for CARBON_OPTIONS_TIMESTAMPFORMAT
+      case CARBON_OPTIONS_TIMESTAMPFORMAT:
+        isValid = true;
+        break;
+      // no validation needed while set for CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT
+      case CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT:
+        isValid = true;
+        break;
       default:
         if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
           isValid = CarbonUtil.validateRangeOfSegmentList(value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/integration/spark-common-test/src/test/resources/emp.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/emp.csv b/integration/spark-common-test/src/test/resources/emp.csv
new file mode 100644
index 0000000..b011ecb
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/emp.csv
@@ -0,0 +1,14 @@
+emp_no,ename,job,mgr_id,date_of_joining,salary,bonus,dept_no
+7369,SMITH,CLERK,7902,1980-12-17,800,,20
+7499,ALLEN,SALESMAN,7698,1981-02-21,1600,300,30
+7521,WARD,SALESMAN,7698,1981-03-21,1250,500,30
+7566,JONES,MANAGER,7839,1983-04-01,2975,,20
+7654,MARTIN,SALESMAN,7698,1989-05-11,1250,1400,30
+7698,BLAKE,MANAGER,7839,1981-02-21,2850,,30
+7782,CLARK,MANAGER,7839,1985-11-201,2450,,10
+7788,SCOTT,ANALYST,7566,2001-02-21,3000,,20
+7839,KING,PRESIDENT,,1981-04-12,5000,,10
+7844,TURNER,SALESMAN,7698,1981-02-21,1500,0,30
+7876,ADAMS,CLERK,7788,1999-02-01,1100,,20
+7900,JAMES,CLERK,7698,1997-12-31,950,,30
+7902,FORD,ANALYST,7566,1984-05-25,3000,,20
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
new file mode 100644
index 0000000..f916c5e
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+
+  override def beforeAll {
+    drop()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        new File("./target/test/badRecords")
+          .getCanonicalPath)
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+  }
+
+  test("test partition redirect") {
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'""")
+
+    val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"
+        +
+        "('bad_records_logger_enable'='true','bad_records_action'='redirect', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(
+      sql("select count(*) from sales"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("test partition serializable_values") {
+    // 1.0 "\N" which should be treated as NULL
+    // 1.1 Time stamp "\N" which should be treated as NULL
+    val csvFilePath = s"$resourcesPath/badrecords/seriazableValue.csv"
+    sql(
+      """CREATE TABLE IF NOT EXISTS serializable_values(ID BigInt, date Timestamp,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'
+        """)
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values OPTIONS"
+        +
+        "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+        "'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(
+      sql("select count(*) from serializable_values"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("test partition serializable_values_false") {
+    val csvFilePath = s"$resourcesPath/badrecords/seriazableValue.csv"
+    // load with bad_records_logger_enable false
+    sql(
+      """CREATE TABLE IF NOT EXISTS serializable_values_false(ID BigInt, date Timestamp,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'
+        """)
+    sql(
+      "LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values_false OPTIONS"
+      + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+    checkAnswer(
+      sql("select count(*) from serializable_values_false"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("test partition with empty_timestamp") {
+    // 4.1 Time stamp empty data - Bad records/Null value based on configuration
+    // 5. non-parsable data - Bad records/Null value based on configuration
+    // 6. empty line(check current one) - Bad records/Null value based on configuration
+    // only one value should be loadded.
+    sql(
+      """CREATE TABLE IF NOT EXISTS empty_timestamp(ID BigInt, date Timestamp,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'
+        """)
+    val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp OPTIONS"
+        +
+        "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true' ," +
+        "'bad_records_action'='ignore', " +
+        "'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(
+      sql("select count(*) from empty_timestamp"),
+      Seq(Row(1)
+      )
+    )
+  }
+
+  test("test partition with insufficientColumn") {
+    // 2. insufficient columns - Bad records/Null value based on configuration
+    sql(
+      """CREATE TABLE IF NOT EXISTS insufficientColumn(date Timestamp,country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (ID BigInt) STORED BY 'carbondata'
+        """)
+    val csvFilePath = s"$resourcesPath/badrecords/insufficientColumns.csv"
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn OPTIONS"
+        +
+        "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+        "'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(
+      sql("select count(*) from insufficientColumn"),
+      Seq(Row(3)
+      )
+    )
+  }
+
+  test("test partition with insufficientColumn_false") {
+    // load with bad_records_logger_enable false
+    sql(
+      """CREATE TABLE IF NOT EXISTS insufficientColumn_false(date Timestamp,country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (ID BigInt) STORED BY 'carbondata'
+        """)
+    val csvFilePath = s"$resourcesPath/badrecords/insufficientColumns.csv"
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn_false OPTIONS"
+        + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(
+      sql("select count(*) from insufficientColumn_false"),
+      Seq(Row(3)
+      )
+    )
+  }
+
+
+  test("test partition with emptyColumnValues") {
+    // 3. empty data for string data type - take empty value
+    // 4. empty data for non-string data type - Bad records/Null value based on configuration
+    //table should have only two records.
+    sql(
+      """CREATE TABLE IF NOT EXISTS emptyColumnValues(date Timestamp,country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (ID BigInt) STORED BY 'carbondata'
+        """)
+    val csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues OPTIONS"
+        +
+        "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true'," +
+        " 'bad_records_action'='ignore', " +
+        "'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(
+      sql("select count(*) from emptyColumnValues"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("test partition with emptyColumnValues_false") {
+    // load with bad_records_logger_enable to false
+    sql(
+      """CREATE TABLE IF NOT EXISTS emptyColumnValues_false(date Timestamp,country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (ID BigInt) STORED BY 'carbondata'
+        """)
+    val csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues_false OPTIONS"
+        + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+    checkAnswer(
+      sql("select count(*) from emptyColumnValues_false"),
+      Seq(Row(7)
+      )
+    )
+  }
+
+  test("test partition with empty_timestamp_false") {
+    // load with bad_records_logger_enable to false
+    sql(
+      """CREATE TABLE IF NOT EXISTS empty_timestamp_false(ID BigInt, date Timestamp,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'
+        """)
+    val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp_false OPTIONS"
+        + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+    checkAnswer(
+      sql("select count(*) from empty_timestamp_false"),
+      Seq(Row(7)
+      )
+    )
+  }
+
+  test("test load ddl command") {
+    sql(
+      """CREATE TABLE IF NOT EXISTS dataloadOptionTests(ID BigInt, date Timestamp, country
+           String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+      """)
+    val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+    try {
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE dataloadOptionTests OPTIONS"
+          + "('bad_records_action'='FORCA', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+    } catch {
+      case ex: Exception =>
+        assert("option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT"
+          .equals(ex.getMessage))
+    }
+  }
+
+  def drop(): Unit = {
+    sql("drop table IF EXISTS sales")
+    sql("drop table IF EXISTS serializable_values")
+    sql("drop table IF EXISTS serializable_values_false")
+    sql("drop table IF EXISTS insufficientColumn")
+    sql("drop table IF EXISTS insufficientColumn_false")
+    sql("drop table IF EXISTS emptyColumnValues")
+    sql("drop table IF EXISTS emptyColumnValues_false")
+    sql("drop table IF EXISTS empty_timestamp")
+    sql("drop table IF EXISTS empty_timestamp_false")
+    sql("drop table IF EXISTS dataloadOptionTests")
+  }
+
+  override def afterAll {
+    drop()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index b0afb0f..72e464e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -317,6 +317,14 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     checkExistence(df, true,  "empno=1")
   }
 
+  test("bad record test with null values") {
+    sql(s"""CREATE TABLE IF NOT EXISTS emp1 (emp_no int,ename string,job string,mgr_id int,date_of_joining string,salary int,bonus int) partitioned by (dept_no int) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA INPATH '$resourcesPath/emp.csv' overwrite INTO TABLE emp1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '\')""")
+    val rows = sql(s"select count(*) from emp1").collect()
+    sql(s"""LOAD DATA INPATH '$resourcesPath/emp.csv' overwrite INTO TABLE emp1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '\','BAD_RECORDS_ACTION'='FORCE')""")
+    checkAnswer(sql(s"select count(*) from emp1"), rows)
+  }
+
 
   override def afterAll = {
     dropTable
@@ -338,6 +346,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql("drop table if exists mergeindexpartitionthree")
     sql("drop table if exists loadstaticpartitiononeissue")
     sql("drop table if exists loadpartitionwithspecialchar")
+    sql("drop table if exists emp1")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 8a09093..a36023e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -203,11 +203,14 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
 
   test("badrecords ignore on partition column") {
     sql("create table badrecordsPartitionignore(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'")
+    sql("create table badrecordsignore(intField1 int,intField2 int, stringField1 string) stored by 'carbondata'")
     sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionignore options('bad_records_action'='ignore')")
-    checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is null"), Seq(Row(3)))
-    checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is not null"), Seq(Row(2)))
+    sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsignore options('bad_records_action'='ignore')")
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is null"), sql("select count(*) cnt from badrecordsignore where intfield2 is null"))
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is not null"), sql("select count(*) cnt from badrecordsignore where intfield2 is not null"))
   }
 
+
   test("test partition fails on int null partition") {
     sql("create table badrecordsPartitionintnull(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'")
     sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionintnull options('bad_records_action'='force')")
@@ -263,6 +266,7 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
     sql("drop table if exists staticpartitionload")
     sql("drop table if exists badrecordsPartitionignore")
     sql("drop table if exists badrecordsPartitionfail")
+    sql("drop table if exists badrecordsignore")
     sql("drop table if exists badrecordsPartitionintnull")
     sql("drop table if exists badrecordsPartitionintnullalt")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 9fcb98f..67a29f0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -22,16 +22,18 @@ import java.text.SimpleDateFormat
 import java.util
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.DataTypeInfo
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.util.CarbonSessionInfo
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
-import org.apache.carbondata.processing.loading.exception.BadRecordFoundException
 
 object CarbonScalaUtil {
   def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
@@ -156,55 +158,164 @@ object CarbonScalaUtil {
   }
 
   /**
-   * Converts incoming value to UTF8String after converting data as per the data type.
+   * Converts incoming value to String after converting data as per the data type.
    * @param value Input value to convert
-   * @param dataType Datatype to convert and then convert to UTF8String
+   * @param dataType Datatype to convert and then convert to String
    * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
-   * @param dateFormat DataFormat to convert incase of DateType datatype
+   * @param dateFormat DataFormat to convert in case of DateType datatype
    * @param serializationNullFormat if this encounters in input data then data will
    *                                be treated as null
-   * @param fail If it is true then any conversion error will trhow error otherwise it will be
-   *                   filled with ull value
-   * @return converted UTF8String
+   * @return converted String
    */
-  def convertToUTF8String(value: String,
+  def convertToString(
+      value: String,
       dataType: DataType,
       timeStampFormat: SimpleDateFormat,
       dateFormat: SimpleDateFormat,
-      serializationNullFormat: String,
-      fail: Boolean): UTF8String = {
+      serializationNullFormat: String): String = {
     if (value == null || serializationNullFormat.equals(value)) {
-      return UTF8String.fromString(value)
+      return null
+    }
+    dataType match {
+      case TimestampType =>
+        DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000)
+      case DateType =>
+        DateTimeUtils.dateToString(
+          (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt)
+      case ShortType => value.toShort.toString
+      case IntegerType => value.toInt.toString
+      case LongType => value.toLong.toString
+      case DoubleType => value.toDouble.toString
+      case FloatType => value.toFloat.toString
+      case d: DecimalType => new java.math.BigDecimal(value).toPlainString
+      case BooleanType => value.toBoolean.toString
+      case _ => value
     }
+  }
+
+  /**
+   * Converts incoming value to String after converting data as per the data type.
+   * @param value Input value to convert
+   * @param dataType Datatype to convert and then convert to String
+   * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
+   * @param dateFormat DataFormat to convert in case of DateType datatype
+   * @return converted String
+   */
+  def convertToCarbonFormat(value: String,
+      dataType: DataType,
+      timeStampFormat: SimpleDateFormat,
+      dateFormat: SimpleDateFormat): String = {
     try {
       dataType match {
         case TimestampType =>
-          UTF8String.fromString(
-            DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000))
+          timeStampFormat.format(DateTimeUtils.stringToTime(value))
         case DateType =>
-          UTF8String.fromString(
-            DateTimeUtils.dateToString(
-              (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt))
-        case ShortType => UTF8String.fromString(value.toShort.toString)
-        case IntegerType => UTF8String.fromString(value.toInt.toString)
-        case LongType => UTF8String.fromString(value.toLong.toString)
-        case DoubleType => UTF8String.fromString(value.toDouble.toString)
-        case FloatType => UTF8String.fromString(value.toFloat.toString)
-        case d: DecimalType => UTF8String
-          .fromString(new java.math.BigDecimal(value).toPlainString)
-        case BooleanType => UTF8String.fromString(value.toBoolean.toString)
-        case _ => UTF8String.fromString(value)
+          dateFormat.format(DateTimeUtils.stringToTime(value))
+        case _ => value
       }
     } catch {
       case e: Exception =>
-        if (fail) {
-          throw e
+        value
+    }
+  }
+
+  private val hiveignorepartition = "__HIVE_IGNORE_PARTITION__"
+
+  /**
+   * Update partition values as per the right date and time format
+   * @return updated partition spec
+   */
+  def updatePartitions(
+      partitionSpec: Map[String, String],
+      table: CarbonTable,
+      timeFormat: SimpleDateFormat,
+      dateFormat: SimpleDateFormat,
+      serializationNullFormat: String,
+      badRecordAction: String,
+      isEmptyBadRecord: Boolean): Map[String, String] = {
+    partitionSpec.map{ case (col, pvalue) =>
+      // replace special string with empty value.
+      val value = if (pvalue.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+        ""
+      } else {
+        pvalue
+      }
+      val carbonColumn = table.getColumnByName(table.getTableName, col.toLowerCase)
+      val dataType = CarbonScalaUtil.convertCarbonToSparkDataType(carbonColumn.getDataType)
+      val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__"
+      try {
+        if (isEmptyBadRecord && value.length == 0 &&
+            badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString) &&
+            dataType != StringType) {
+          (col, hiveignorepartition)
+        } else if (!isEmptyBadRecord && value.length == 0 && dataType != StringType) {
+          (col, hivedefaultpartition)
+        } else if (value.equals(hivedefaultpartition)) {
+          (col, value)
+        } else {
+          val convertedString = CarbonScalaUtil.convertToString(
+            value, dataType, timeFormat, dateFormat, serializationNullFormat)
+          if (convertedString == null) {
+            (col, hivedefaultpartition)
+          } else {
+            (col, convertedString)
+          }
         }
-        UTF8String.fromString(null)
+      } catch {
+        case e: Exception =>
+          // If it is bad record ignore case then add with special string so that it will be
+          // filtered after this.
+          if (badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString)) {
+            (col, hiveignorepartition)
+          } else {
+            (col, hivedefaultpartition)
+          }
+      }
     }
   }
 
   /**
+   * Update partition values as per the right date and time format
+   */
+  def updatePartitions(
+      carbonSessionInfo: CarbonSessionInfo,
+      parts: Seq[CatalogTablePartition],
+      table: CarbonTable): Seq[CatalogTablePartition] = {
+    val dateFormatStr = carbonSessionInfo.getThreadParams.getProperty(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+    val dateFormat = new SimpleDateFormat(dateFormatStr)
+    val timeFormatStr = carbonSessionInfo.getThreadParams.getProperty(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)
+    val timeFormat = new SimpleDateFormat(timeFormatStr)
+    val serializeFormat = carbonSessionInfo.getThreadParams.getProperty(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT_DEFAULT)
+    val isEmptyBadRecord = carbonSessionInfo.getThreadParams.getProperty(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT).toBoolean
+    val badRecordAction = carbonSessionInfo.getThreadParams.getProperty(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      LoggerAction.FAIL.toString)
+    parts.map{ f =>
+      val changedSpec =
+        updatePartitions(
+          f.spec,
+          table,
+          timeFormat,
+          dateFormat,
+          serializeFormat,
+          badRecordAction,
+          isEmptyBadRecord)
+      f.copy(spec = changedSpec)
+    }.filterNot{ p =>
+      // Filter the special bad record ignore case string
+      p.spec.exists(_._2.equals(hiveignorepartition))
+    }.groupBy(p => p.spec).map(f => f._2.head).toSeq // Avoid duplicates by do groupby
+  }
+
+  /**
    * This method will validate a column for its data type and check whether the column data type
    * can be modified and update if conditions are met
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 383f272..46fe24f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -29,13 +29,13 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.NewHadoopRDD
+import org.apache.spark.rdd.{NewHadoopRDD, RDD}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
@@ -47,6 +47,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType, Timestam
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -472,10 +473,10 @@ case class CarbonLoadDataCommand(
     val catalogTable: CatalogTable = logicalPlan.collect {
       case l: LogicalRelation => l.catalogTable.get
       case c // To make compatabile with spark 2.1 and 2.2 we need to compare classes
-        if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+        if c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
-              "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
+              "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
         CarbonReflectionUtils.getFieldOfCatalogTable(
           "tableMeta",
           c).asInstanceOf[CatalogTable]
@@ -490,161 +491,148 @@ case class CarbonLoadDataCommand(
     // converted to hive standard fomat to let spark understand the data to partition.
     val serializationNullFormat =
       carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
-    val failAction =
-      carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-    val ignoreAction =
-      carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
-    val query: LogicalPlan = if (dataFrame.isDefined) {
-      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
-      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
-      val dateFormat = new SimpleDateFormat(dateFormatString)
-      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-      val serializationNullFormat =
-      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
-      val attributes =
-        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
-      val len = attributes.length
-      val rdd = dataFrame.get.rdd.map { f =>
-        val data = new Array[Any](len)
-        var i = 0
-        while (i < len) {
-          data(i) =
-            UTF8String.fromString(
-              CarbonScalaUtil.getString(f.get(i),
-                serializationNullFormat,
-                delimiterLevel1,
-                delimiterLevel2,
-                timeStampFormat,
-                dateFormat))
-          i = i + 1
+    val badRecordAction =
+      carbonLoadModel.getBadRecordsAction.split(",")(1)
+    var timeStampformatString = carbonLoadModel.getTimestampformat
+    if (timeStampformatString.isEmpty) {
+      timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+    }
+    val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+    var dateFormatString = carbonLoadModel.getDateFormat
+    if (dateFormatString.isEmpty) {
+      dateFormatString = carbonLoadModel.getDefaultDateFormat
+    }
+    val dateFormat = new SimpleDateFormat(dateFormatString)
+    CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString)
+    CarbonSession.threadSet(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+      timeStampformatString)
+    CarbonSession.threadSet(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
+      serializationNullFormat)
+    CarbonSession.threadSet(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      badRecordAction)
+    CarbonSession.threadSet(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
+    try {
+      val query: LogicalPlan = if (dataFrame.isDefined) {
+        val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+        val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+        val attributes =
+          StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
+        val len = attributes.length
+        val rdd = dataFrame.get.rdd.map { f =>
+          val data = new Array[Any](len)
+          var i = 0
+          while (i < len) {
+            data(i) =
+              UTF8String.fromString(
+                CarbonScalaUtil.getString(f.get(i),
+                  serializationNullFormat,
+                  delimiterLevel1,
+                  delimiterLevel2,
+                  timeStampFormat,
+                  dateFormat))
+            i = i + 1
+          }
+          InternalRow.fromSeq(data)
         }
-        InternalRow.fromSeq(data)
-      }
-      if (updateModel.isDefined) {
-        sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
-        // In case of update, we don't need the segmrntid column in case of partitioning
-        val dropAttributes = attributes.dropRight(1)
-        val finalOutput = catalogTable.schema.map { attr =>
-          dropAttributes.find { d =>
-            val index = d.name.lastIndexOf("-updatedColumn")
-            if (index > 0) {
-              d.name.substring(0, index).equalsIgnoreCase(attr.name)
-            } else {
-              d.name.equalsIgnoreCase(attr.name)
-            }
-          }.get
+        if (updateModel.isDefined) {
+          // Get the updated query plan in case of update scenario
+          getLogicalQueryForUpdate(sparkSession, catalogTable, attributes, rdd)
+        } else {
+          LogicalRDD(attributes, rdd)(sparkSession)
         }
-        Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
-      } else {
-        LogicalRDD(attributes, rdd)(sparkSession)
-      }
 
-    } else {
-      var timeStampformatString = carbonLoadModel.getTimestampformat
-      if (timeStampformatString.isEmpty) {
-        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
-      }
-      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-      var dateFormatString = carbonLoadModel.getDateFormat
-      if (dateFormatString.isEmpty) {
-        dateFormatString = carbonLoadModel.getDefaultDateFormat
-      }
-      val dateFormat = new SimpleDateFormat(dateFormatString)
-      // input data from csv files. Convert to logical plan
-      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
-      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
-      val jobConf = new JobConf(hadoopConf)
-      SparkHadoopUtil.get.addCredentials(jobConf)
-      val attributes =
-        StructType(carbonLoadModel.getCsvHeaderColumns.map(
-          StructField(_, StringType))).toAttributes
-      val rowDataTypes = attributes.map { attribute =>
-        catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
-          case Some(attr) => attr.dataType
-          case _ => StringType
+      } else {
+        // input data from csv files. Convert to logical plan
+        CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
+        hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
+        val jobConf = new JobConf(hadoopConf)
+        SparkHadoopUtil.get.addCredentials(jobConf)
+        val attributes =
+          StructType(carbonLoadModel.getCsvHeaderColumns.map(
+            StructField(_, StringType))).toAttributes
+        val rowDataTypes = attributes.map { attribute =>
+          catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
+            case Some(attr) => attr.dataType
+            case _ => StringType
+          }
         }
-      }
-      val len = rowDataTypes.length
-      // Fail row conversion if fail/ignore badrecord action is enabled
-      val fail = failAction || ignoreAction
-      var rdd =
-        new NewHadoopRDD[NullWritable, StringArrayWritable](
-          sparkSession.sparkContext,
-          classOf[CSVInputFormat],
-          classOf[NullWritable],
-          classOf[StringArrayWritable],
-          jobConf).map{ case (key, value) =>
+        // Find the partition columns from the csv header attributes
+        val partitionColumns = attributes.map { attribute =>
+          catalogTable.partitionSchema.find(_.name.equalsIgnoreCase(attribute.name)) match {
+            case Some(attr) => true
+            case _ => false
+          }
+        }
+        val len = rowDataTypes.length
+        var rdd =
+          new NewHadoopRDD[NullWritable, StringArrayWritable](
+            sparkSession.sparkContext,
+            classOf[CSVInputFormat],
+            classOf[NullWritable],
+            classOf[StringArrayWritable],
+            jobConf).map { case (key, value) =>
             val data = new Array[Any](len)
             var i = 0
             val input = value.get()
             val inputLen = Math.min(input.length, len)
-            try {
-              while (i < inputLen) {
-                // TODO find a way to avoid double conversion of date and time.
-                data(i) = CarbonScalaUtil.convertToUTF8String(
-                  input(i),
-                  rowDataTypes(i),
-                  timeStampFormat,
-                  dateFormat,
-                  serializationNullFormat,
-                  fail)
-                i = i + 1
-              }
-              InternalRow.fromSeq(data)
-            } catch {
-              case e: Exception =>
-                if (failAction) {
-                  // It is badrecord fail case.
-                  throw new BadRecordFoundException(
-                    s"Data load failed due to bad record: " +
-                    s"${input(i)} with datatype ${rowDataTypes(i)}")
-                } else {
-                  // It is bad record ignore case
-                  InternalRow.empty
+            while (i < inputLen) {
+              data(i) = UTF8String.fromString(input(i))
+              // If partition column then update empty value with special string otherwise spark
+              // makes it as null so we cannot internally handle badrecords.
+              if (partitionColumns(i)) {
+                if (input(i) != null && input(i).isEmpty) {
+                  data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
                 }
+              }
+              i = i + 1
             }
+            InternalRow.fromSeq(data)
 
+          }
+        // Only select the required columns
+        val output = if (partition.nonEmpty) {
+          val lowerCasePartition = partition.map{case(key, value) => (key.toLowerCase, value)}
+          catalogTable.schema.map { attr =>
+            attributes.find(_.name.equalsIgnoreCase(attr.name)).get
+          }.filter(attr => lowerCasePartition.get(attr.name.toLowerCase).isEmpty)
+        } else {
+          catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
         }
-      // In bad record ignore case filter the empty values
-      if (ignoreAction) {
-        rdd = rdd.filter(f => f.numFields != 0)
+        Project(output, LogicalRDD(attributes, rdd)(sparkSession))
       }
-
-      // Only select the required columns
-      val output = if (partition.nonEmpty) {
-        catalogTable.schema.map{ attr =>
-          attributes.find(_.name.equalsIgnoreCase(attr.name)).get
-        }.filter(attr => partition.get(attr.name).isEmpty)
+      // TODO need to find a way to avoid double lookup
+      val sizeInBytes =
+        CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+          catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
+      val convertRelation = convertToLogicalRelation(
+        catalogTable,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        sparkSession)
+      val convertedPlan =
+        CarbonReflectionUtils.getInsertIntoCommand(
+          table = convertRelation,
+          partition = partition,
+          query = query,
+          overwrite = false,
+          ifPartitionNotExists = false)
+      if (isOverwriteTable && partition.nonEmpty) {
+        overwritePartition(sparkSession, table, convertedPlan)
       } else {
-        catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
+        Dataset.ofRows(sparkSession, convertedPlan)
       }
-      Project(output, LogicalRDD(attributes, rdd)(sparkSession))
-    }
-    // TODO need to find a way to avoid double lookup
-    val sizeInBytes =
-      CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
-        catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
-    val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
-    val convertRelation = convertToLogicalRelation(
-      catalogTable,
-      sizeInBytes,
-      isOverwriteTable,
-      carbonLoadModel,
-      sparkSession)
-    val convertedPlan =
-      CarbonReflectionUtils.getInsertIntoCommand(
-        convertRelation,
-        partition,
-        query,
-        false,
-        false)
-    if (isOverwriteTable && partition.nonEmpty) {
-      overwritePartition(sparkSession, table, convertedPlan)
-    } else {
-      Dataset.ofRows(sparkSession, convertedPlan)
+    } finally {
+      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT)
+      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT)
+      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
     }
     try {
       // Trigger auto compaction
@@ -661,6 +649,31 @@ case class CarbonLoadDataCommand(
     }
   }
 
+  /**
+   * Create the logical plan for update scenario. Here we should drop the segmentid column from the
+   * input rdd.
+   */
+  private def getLogicalQueryForUpdate(
+      sparkSession: SparkSession,
+      catalogTable: CatalogTable,
+      attributes: Seq[AttributeReference],
+      rdd: RDD[InternalRow]): LogicalPlan = {
+    sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
+    // In case of update, we don't need the segmrntid column in case of partitioning
+    val dropAttributes = attributes.dropRight(1)
+    val finalOutput = catalogTable.schema.map { attr =>
+      dropAttributes.find { d =>
+        val index = d.name.lastIndexOf("-updatedColumn")
+        if (index > 0) {
+          d.name.substring(0, index).equalsIgnoreCase(attr.name)
+        } else {
+          d.name.equalsIgnoreCase(attr.name)
+        }
+      }.get
+    }
+    Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
+  }
+
   private def convertToLogicalRelation(
       catalogTable: CatalogTable,
       sizeInBytes: Long,
@@ -694,6 +707,7 @@ case class CarbonLoadDataCommand(
     options += (("onepass", loadModel.getUseOnePass.toString))
     options += (("dicthost", loadModel.getDictionaryServerHost))
     options += (("dictport", loadModel.getDictionaryServerPort.toString))
+    options += (("staticpartition", partition.nonEmpty.toString))
     options ++= this.options
     if (updateModel.isDefined) {
       options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 954c67a..7f05cef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -17,9 +17,11 @@
 package org.apache.spark.sql.execution.datasources
 
 import java.io.File
+import java.text.SimpleDateFormat
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -35,14 +37,14 @@ import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types.{DataType, StructType}
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore}
+import org.apache.carbondata.core.metadata.PartitionMapFileStore
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.util.{DataLoadingUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
 
 class CarbonFileFormat
   extends FileFormat
@@ -89,17 +91,17 @@ with Serializable {
     optionsFinal.put(
       "fileheader",
       dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
+    val optionsLocal = new mutable.HashMap[String, String]()
+    optionsLocal ++= options
+    optionsLocal += (("header", "false"))
     DataLoadingUtil.buildCarbonLoadModel(
       table,
       carbonProperty,
-      options,
+      optionsLocal.toMap,
       optionsFinal,
       model,
       conf
     )
-    // Set the standard date/time format which supported by spark/hive.
-    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-    model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
     model.setPartitionId("0")
     model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
@@ -112,6 +114,10 @@ with Serializable {
       conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp)
       model.setFactTimeStamp(updateTimeStamp.toLong)
     }
+    val staticPartition = options.getOrElse("staticpartition", null)
+    if (staticPartition != null) {
+      conf.set("carbon.staticpartition", staticPartition)
+    }
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
     new OutputWriterFactory {
@@ -188,16 +194,48 @@ private class CarbonOutputWriter(path: String,
     fieldTypes: Seq[DataType])
   extends OutputWriter with AbstractCarbonOutputWriter {
   val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName)
-  val partitionData = if (partitions.nonEmpty) {
-    partitions.map{ p =>
+  val staticPartition = {
+    val staticPart = context.getConfiguration.get("carbon.staticpartition")
+    staticPart != null && staticPart.toBoolean
+  }
+  lazy val partitionData = if (partitions.nonEmpty) {
+    val updatedPartitions = partitions.map{ p =>
       val value = p.substring(p.indexOf("=") + 1, p.length)
+      val col = p.substring(0, p.indexOf("="))
       // NUll handling case. For null hive creates with this special name
       if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
-        null
+        (col, null)
+        // we should replace back the special string with empty value.
+      } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+        (col, "")
       } else {
-        value
+        (col, value)
       }
     }
+
+    if (staticPartition) {
+      val loadModel = recordWriter.getLoadModel
+      val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
+      var timeStampformatString = loadModel.getTimestampformat
+      if (timeStampformatString.isEmpty) {
+        timeStampformatString = loadModel.getDefaultTimestampFormat
+      }
+      val timeFormat = new SimpleDateFormat(timeStampformatString)
+      var dateFormatString = loadModel.getDateFormat
+      if (dateFormatString.isEmpty) {
+        dateFormatString = loadModel.getDefaultDateFormat
+      }
+      val dateFormat = new SimpleDateFormat(dateFormatString)
+      updatedPartitions.map {case (col, value) =>
+        CarbonScalaUtil.convertToCarbonFormat(value,
+          CarbonScalaUtil.convertCarbonToSparkDataType(
+            table.getColumnByName(table.getTableName, col).getDataType),
+          timeFormat,
+          dateFormat)
+      }
+    } else {
+      updatedPartitions.map(_._2)
+    }
   } else {
     Array.empty
   }
@@ -238,9 +276,42 @@ private class CarbonOutputWriter(path: String,
     recordWriter.close(context)
     val loadModel = recordWriter.getLoadModel
     val segmentPath = CarbonTablePath.getSegmentPath(loadModel.getTablePath, loadModel.getSegmentId)
+    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
+    var timeStampformatString = loadModel.getTimestampformat
+    if (timeStampformatString.isEmpty) {
+      timeStampformatString = loadModel.getDefaultTimestampFormat
+    }
+    val timeFormat = new SimpleDateFormat(timeStampformatString)
+    var dateFormatString = loadModel.getDateFormat
+    if (dateFormatString.isEmpty) {
+      dateFormatString = loadModel.getDefaultDateFormat
+    }
+    val dateFormat = new SimpleDateFormat(dateFormatString)
+    val serializeFormat =
+      loadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+    val badRecordAction = loadModel.getBadRecordsAction.split(",")(1)
+    val isEmptyBadRecord = loadModel.getIsEmptyDataBadRecord.split(",")(1).toBoolean
     // write partition info to new file.
     val partitonList = new util.ArrayList[String]()
-    partitions.foreach(partitonList.add)
+    val splitPartitions = partitions.map{ p =>
+      val value = p.substring(p.indexOf("=") + 1, p.length)
+      val col = p.substring(0, p.indexOf("="))
+      (col, value)
+    }.toMap
+    val updatedPartitions =
+      if (staticPartition) {
+        splitPartitions
+      } else {
+        CarbonScalaUtil.updatePartitions(
+          splitPartitions,
+          table,
+          timeFormat,
+          dateFormat,
+          serializeFormat,
+          badRecordAction,
+          isEmptyBadRecord)
+      }
+    updatedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
     new PartitionMapFileStore().writePartitionMapFile(
       segmentPath,
       loadModel.getTaskNo,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index dae6249..3d84dd3 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.hive
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogUtils, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
@@ -37,7 +37,8 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Experime
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
  * This class will have carbon catalog and refresh the relation from cache if the carbontable in
@@ -135,6 +136,26 @@ class CarbonSessionCatalog(
     sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
   }
 
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      // Get the properties from thread local
+      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      if (carbonSessionInfo != null) {
+        val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
+        super.createPartitions(tableName, updatedParts, ignoreIfExists)
+      } else {
+        super.createPartitions(tableName, parts, ignoreIfExists)
+      }
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
   /**
    * This is alternate way of getting partition information. It first fetches all partitions from
    * hive and then apply filter instead of querying hive along with filters.
@@ -143,7 +164,8 @@ class CarbonSessionCatalog(
    * @param identifier
    * @return
    */
-  def getPartitionsAlternate(partitionFilters: Seq[Expression],
+  def getPartitionsAlternate(
+      partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
       identifier: TableIdentifier) = {
     val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f10c412/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index c8ea275..f563007 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -45,8 +45,9 @@ import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
  * This class will have carbon catalog and refresh the relation from cache if the carbontable in
@@ -142,6 +143,26 @@ class CarbonSessionCatalog(
       .asInstanceOf[HiveExternalCatalog].client
   }
 
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      // Get the properties from thread local
+      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      if (carbonSessionInfo != null) {
+        val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
+        super.createPartitions(tableName, updatedParts, ignoreIfExists)
+      } else {
+        super.createPartitions(tableName, parts, ignoreIfExists)
+      }
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
   /**
    * This is alternate way of getting partition information. It first fetches all partitions from
    * hive and then apply filter instead of querying hive along with filters.


Mime
View raw message