carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajan...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3740] Add line separator option to load command to configure the line separator during csv parsing.
Date Wed, 01 Apr 2020 07:07:32 GMT
This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1052bf6  [CARBONDATA-3740] Add line separator option to load command to configure
the line separator during csv parsing.
1052bf6 is described below

commit 1052bf6529ed8fa4328ee247ce94e64513ee745f
Author: jack86596 <jack86596@gmail.com>
AuthorDate: Wed Mar 11 15:36:45 2020 +0800

    [CARBONDATA-3740] Add line separator option to load command to configure the line separator
during csv parsing.
    
    Why is this PR needed?
    Sometime univocity parser will detect the line separator incorrectly. In this case, user
should be able to set line separator explicitly.
    Issue: During loading, if in the first line, there is one field has a '\r' character and
this '\r' appears before the first '\n',
    line separator detection will treat '\r' as line separator. This is not the intention.
    Example:
    Data file has two lines, ^M is '\r':
    1,2^M,3
    4,5,6
    After loading,
    The records in table will be:
    | 1 | 2 | null |
    | null | 3
    4 | 5 |
    Correct should be:
    | 1 | 2^M | 3 |
    | 4 | 5 | 6 |
    
    What changes were proposed in this PR?
    Allow user to specify line separator explicitly in load command, add one new option to
load command named "line_separator".
    
    Does this PR introduce any user interface change?
    Yes. New load option "line_separator" is added.
    
    Is any new testcase added?
    Yes.
    
    This closes #3664
---
 .../apache/carbondata/core/util/CarbonUtil.java    |  2 +-
 .../carbondata/core/util/CarbonUtilTest.java       |  4 +-
 docs/dml-of-carbondata.md                          |  8 +++
 .../hadoop/api/CarbonTableOutputFormat.java        |  2 +-
 .../carbondata/hadoop/testutil/StoreCreator.java   |  1 +
 .../presto/util/CarbonDataStoreCreator.scala       |  1 +
 .../apache/carbondata/spark/util/CommonUtil.scala  |  4 ++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |  1 +
 .../spark/sql/catalyst/CarbonParserUtil.scala      | 13 ++++-
 .../command/management/CommonLoadUtils.scala       |  2 +-
 .../streaming/CarbonAppendableStreamSink.scala     |  2 +-
 .../test/resources/carriage_return_in_string.csv   |  2 +
 .../spark/testsuite/dataload/TestLoadOptions.scala | 68 +++++++++++++++++++++-
 .../processing/loading/DataLoadProcessBuilder.java | 12 ++--
 .../loading/csvinput/CSVInputFormat.java           | 14 ++++-
 .../processing/loading/model/CarbonLoadModel.java  | 26 ++++++---
 .../loading/model/CarbonLoadModelBuilder.java      |  8 ++-
 17 files changed, 145 insertions(+), 25 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index e56b8b1..b3d9339 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -565,7 +565,7 @@ public final class CarbonUtil {
   /**
    * From beeline if a delimiter is passed as \001, in code we get it as
    * escaped string as \\001. So this method will unescape the slash again and
-   * convert it back t0 \001
+   * convert it back to \001
    *
    * @param parseStr
    * @return
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 8378c4f..09bf55b 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -287,8 +287,8 @@ public class CarbonUtilTest {
   }
 
   @Test public void testToUnescapeChar() {
-    String[] input = { "\\001", "\\t", "\\r", "\\b", "\\n", "\\f" };
-    String[] output = { "\001", "\t", "\r", "\b", "\n", "\f" };
+    String[] input = { "\\001", "\\t", "\\r", "\\b", "\\n", "\\f", "\\r\\n", "\\\\" };
+    String[] output = { "\001", "\t", "\r", "\b", "\n", "\f", "\r\n", "\\" };
     for (int i = 0; i < input.length; i++) {
       assertEquals(CarbonUtil.unescapeChar(input[i]), output[i]);
     }
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index 49e5664..2b26957 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -50,6 +50,7 @@ CarbonData DML statements are documented here,which includes:
 | ------------------------------------------------------- | ------------------------------------------------------------
|
 | [DELIMITER](#delimiter)                                 | Character used to separate the
data in the input csv file    |
 | [QUOTECHAR](#quotechar)                                 | Character used to quote the data
in the input csv file       |
+| [LINE_SEPARATOR](#line_separator)                       | Characters used to specify the
line separator in the input csv file. If not provide, csv parser will detect it automatically.
| 
 | [COMMENTCHAR](#commentchar)                             | Character used to comment the
rows in the input csv file. Those rows will be skipped from processing |
 | [HEADER](#header)                                       | Whether the input csv files have
header row                  |
 | [FILEHEADER](#fileheader)                               | If header is not present in the
input csv, what is the column names to be used for data read from input csv |
@@ -86,6 +87,13 @@ CarbonData DML statements are documented here,which includes:
     OPTIONS('QUOTECHAR'='"')
     ```
 
+  - ##### LINE_SEPARATOR:
+    Line separator Characters can be provided in the load command.
+
+    ```
+    OPTIONS('LINE_SEPARATOR'='\n')
+    ```
+
   - ##### COMMENTCHAR:
     Comment Characters can be provided in the load command if user wants to comment lines.
     ```
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index cb985c8..200eb44 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -382,7 +382,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
Obje
                 CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
                 CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)));
 
-    model.setTimestampformat(
+    model.setTimestampFormat(
         conf.get(
             TIMESTAMP_FORMAT,
             carbonProperty.getProperty(
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 32de3c2..c6ba4ec 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -407,6 +407,7 @@ public class StoreCreator {
     CSVInputFormat.setNumberOfColumns(
         configuration, String.valueOf(loadModel.getCsvHeaderColumns().length));
     CSVInputFormat.setMaxColumns(configuration, "10");
+    CSVInputFormat.setLineSeparator(configuration, loadModel.getLineSeparator());
 
     TaskAttemptContextImpl hadoopAttemptContext =
         new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP,
0, 0));
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 8f1ef0a..05eed08 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -270,6 +270,7 @@ object CarbonDataStoreCreator {
     CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar)
     CSVInputFormat.setHeaderExtractionEnabled(configuration, true)
     CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar)
+    CSVInputFormat.setLineSeparator(configuration, loadModel.getLineSeparator)
     CSVInputFormat.setReadBufferSize(
       configuration,
       CarbonProperties.getInstance.getProperty(
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 5528184..d2c53f6 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -506,6 +506,10 @@ object CommonUtil {
     CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
         CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
+    val lineSeparator = carbonLoadModel.getLineSeparator
+    if (lineSeparator != null) {
+      CSVInputFormat.setLineSeparator(configuration, lineSeparator)
+    }
   }
 
   def configSplitMaxSize(context: SparkContext, filePaths: String,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 7fe2328..c7d4cc1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -102,6 +102,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
   protected val PARTITIONER = carbonKeyWord("PARTITIONER")
   protected val PARTITIONS = carbonKeyWord("PARTITIONS")
   protected val QUOTECHAR = carbonKeyWord("QUOTECHAR")
+  protected val LINE_SEPARATOR = carbonKeyWord("LINE_SEPARATOR")
   protected val RELATION = carbonKeyWord("RELATION")
   protected val SCHEMA = carbonKeyWord("SCHEMA")
   protected val SCHEMAS = carbonKeyWord("SCHEMAS")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index 4ff66e0..6b92ffa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -927,7 +927,8 @@ object CarbonParserUtil {
       "LOAD_MIN_SIZE_INMB",
       "SCALE_FACTOR",
       "BINARY_DECODER",
-      "SORT_SCOPE"
+      "SORT_SCOPE",
+      "LINE_SEPARATOR"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder
@@ -960,6 +961,16 @@ object CarbonParserUtil {
       }
     }
 
+    // Validate LINE_SEPARATOR length
+    if (options.exists(_._1.equalsIgnoreCase("LINE_SEPARATOR"))) {
+      val line_separator: String = CarbonUtil.unescapeChar(
+        options.get("line_separator").get.head._2)
+      if (line_separator.isEmpty || line_separator.length > 2) {
+        throw new MalformedCarbonCommandException(
+          "LINE_SEPARATOR can be only one or two characters.")
+      }
+    }
+
     // Validate ESCAPECHAR length
     if (options.exists(_._1.equalsIgnoreCase("ESCAPECHAR"))) {
       val escapeChar: String = options.get("escapechar").get.head._2
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index f355795..05e3b0b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -774,7 +774,7 @@ object CommonLoadUtils {
 
   def getTimeAndDateFormatFromLoadModel(loadModel: CarbonLoadModel): (SimpleDateFormat,
     SimpleDateFormat) = {
-    var timeStampformatString = loadModel.getTimestampformat
+    var timeStampformatString = loadModel.getTimestampFormat
     if (timeStampformatString.isEmpty) {
       timeStampformatString = loadModel.getDefaultTimestampFormat
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 039c946..3eeaa54 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -87,7 +87,7 @@ class CarbonAppendableStreamSink(
       carbonLoadModel.getSerializationNullFormat().split(",")(1))
     conf.set(
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-      carbonLoadModel.getTimestampformat())
+      carbonLoadModel.getTimestampFormat())
     conf.set(
       CarbonCommonConstants.CARBON_DATE_FORMAT,
       carbonLoadModel.getDateFormat())
diff --git a/integration/spark/src/test/resources/carriage_return_in_string.csv b/integration/spark/src/test/resources/carriage_return_in_string.csv
new file mode 100644
index 0000000..c48c51c
--- /dev/null
+++ b/integration/spark/src/test/resources/carriage_return_in_string.csv
@@ -0,0 +1,2 @@
+1,2
,3
+4,5,6
\ No newline at end of file
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
index 4e7c296..88d7829 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
@@ -23,7 +23,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
-class TestLoadOptions extends QueryTest with BeforeAndAfterAll{
+class TestLoadOptions extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
     sql("drop table if exists TestLoadTableOptions")
@@ -35,7 +35,6 @@ class TestLoadOptions extends QueryTest with BeforeAndAfterAll{
     sql("drop table if exists TestLoadTableOptions")
   }
 
-
   test("test load data with more than one char in quotechar option") {
     val errorMessage = intercept[MalformedCarbonCommandException] {
       sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE " +
@@ -77,4 +76,69 @@ class TestLoadOptions extends QueryTest with BeforeAndAfterAll{
       Row(1, "2015/7/23", "ind", "aaa1", "phone197", "ASD69643a", 15000))
   }
 
+  test("test load data with different line separator option value") {
+    // line separator as '\n'
+    sql("drop table if exists carriage_return_in_string")
+    sql("""CREATE TABLE IF NOT EXISTS carriage_return_in_string(ID BigInt, name String,
+          |city String) STORED AS carbondata""".stripMargin.replace('\n', ' '))
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/carriage_return_in_string.csv' INTO TABLE
+         |carriage_return_in_string OPTIONS ('fileheader'='id, name, city', 'line_separator'='\\n')"""
+        .stripMargin.replace('\n', ' '));
+    checkAnswer(sql("select * from carriage_return_in_string where id = 1"),
+      Row(1, "2\r", "3"))
+
+    // without line separator
+    sql("drop table if exists carriage_return_in_string")
+    sql("""CREATE TABLE IF NOT EXISTS carriage_return_in_string(ID BigInt, name String,
+          |city String) STORED AS carbondata""".stripMargin.replace('\n', ' '))
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/carriage_return_in_string.csv' INTO TABLE
+           |carriage_return_in_string OPTIONS ('fileheader'='id, name, city')"""
+      .stripMargin.replace('\n', ' '));
+    checkAnswer(sql("select * from carriage_return_in_string where id = 1"),
+      Row(1, "2", null))
+
+    // line separator as '\r\n'
+    sql("drop table if exists carriage_return_in_string")
+    sql("""CREATE TABLE IF NOT EXISTS carriage_return_in_string(ID BigInt, name String,
+          |city String) STORED AS carbondata""".stripMargin.replace('\n', ' '))
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/carriage_return_in_string.csv' INTO TABLE
+           |carriage_return_in_string OPTIONS ('fileheader'='id, name, city',
+           |'line_separator'='\\r\\n')""".stripMargin.replace('\n', ' '));
+    checkAnswer(sql("select * from carriage_return_in_string where id = 1"),
+      Row(1, "2\r", "3\n4"))
+
+    // line separator as 'ab'
+    sql("drop table if exists carriage_return_in_string")
+    sql("""CREATE TABLE IF NOT EXISTS carriage_return_in_string(ID BigInt, name String,
+          |city String) STORED AS carbondata""".stripMargin.replace('\n', ' '))
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/carriage_return_in_string.csv' INTO TABLE
+           |carriage_return_in_string OPTIONS ('fileheader'='id, name, city',
+           |'line_separator'='ab')""".stripMargin.replace('\n', ' '));
+    checkAnswer(sql("select * from carriage_return_in_string where id = 1"),
+      Row(1, "2\r", "3\n4"))
+    sql("drop table if exists carriage_return_in_string")
+  }
+
+  test("test load data with invalidated line separator option value") {
+    sql("drop table if exists carriage_return_in_string")
+    sql("""CREATE TABLE IF NOT EXISTS carriage_return_in_string(ID BigInt, name String,
+          |city String) STORED AS carbondata""".stripMargin.replace('\n', ' '))
+
+    // line separator as ''
+    var exception = intercept[MalformedCarbonCommandException] {
+      sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/carriage_return_in_string.csv' INTO
TABLE
+           |carriage_return_in_string OPTIONS ('fileheader'='id, name, city',
+           |'line_separator'='')""".stripMargin.replace('\n', ' '));
+    }
+    assert(exception.getMessage.contains("LINE_SEPARATOR can be only one or two characters."))
+
+    // line separator as '\r\na'
+    exception = intercept[MalformedCarbonCommandException] {
+      sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/carriage_return_in_string.csv' INTO
TABLE
+             |carriage_return_in_string OPTIONS ('fileheader'='id, name, city',
+             |'line_separator'='\\r\\na')""".stripMargin.replace('\n', ' '));
+    }
+    assert(exception.getMessage.contains("LINE_SEPARATOR can be only one or two characters."))
+    sql("drop table if exists carriage_return_in_string")
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 966f828..a412f9a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -288,8 +288,8 @@ public final class DataLoadProcessBuilder {
         dataField.setDateFormat(loadModel.getDateFormat());
         column.setDateFormat(loadModel.getDateFormat());
       } else if (column.getDataType() == DataTypes.TIMESTAMP) {
-        dataField.setTimestampFormat(loadModel.getTimestampformat());
-        column.setTimestampFormat(loadModel.getTimestampformat());
+        dataField.setTimestampFormat(loadModel.getTimestampFormat());
+        column.setTimestampFormat(loadModel.getTimestampFormat());
       }
       if (column.isComplex()) {
         complexDataFields.add(dataField);
@@ -299,7 +299,7 @@ public final class DataLoadProcessBuilder {
           if (childDimension.getDataType() == DataTypes.DATE) {
             childDimension.setDateFormat(loadModel.getDateFormat());
           } else if (childDimension.getDataType() == DataTypes.TIMESTAMP) {
-            childDimension.setTimestampFormat(loadModel.getTimestampformat());
+            childDimension.setTimestampFormat(loadModel.getTimestampFormat());
           }
         }
       } else {
@@ -339,7 +339,7 @@ public final class DataLoadProcessBuilder {
           if (childDimension.getDataType() == DataTypes.DATE) {
             childDimension.setDateFormat(loadModel.getDateFormat());
           } else if (childDimension.getDataType() == DataTypes.TIMESTAMP) {
-            childDimension.setTimestampFormat(loadModel.getTimestampformat());
+            childDimension.setTimestampFormat(loadModel.getTimestampFormat());
           }
         }
         if (partitionColumnSchemaList.size() != 0 && partitionColumnSchemaList
@@ -353,8 +353,8 @@ public final class DataLoadProcessBuilder {
           dataField.setDateFormat(loadModel.getDateFormat());
           column.setDateFormat(loadModel.getDateFormat());
         } else if (column.getDataType() == DataTypes.TIMESTAMP) {
-          dataField.setTimestampFormat(loadModel.getTimestampformat());
-          column.setTimestampFormat(loadModel.getTimestampformat());
+          dataField.setTimestampFormat(loadModel.getTimestampFormat());
+          column.setTimestampFormat(loadModel.getTimestampFormat());
         }
         if (partitionColumnSchemaList.size() != 0 && partitionColumnSchemaList
             .contains(column.getColumnSchema())) {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
index 306627c..21987d5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
@@ -76,6 +76,7 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
   public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
   public static final String MAX_COLUMNS = "carbon.csvinputformat.max.columns";
   public static final String NUMBER_OF_COLUMNS = "carbon.csvinputformat.number.of.columns";
+  public static final String LINE_SEPARATOR = "carbon.csvinputformat.line.separator";
   /**
    * support only one column index
    */
@@ -198,11 +199,22 @@ public class CSVInputFormat extends FileInputFormat<NullWritable,
StringArrayWri
     configuration.set(NUMBER_OF_COLUMNS, numberOfColumns);
   }
 
+  public static void setLineSeparator(Configuration configuration, String lineSeparator)
{
+    if (lineSeparator != null && !lineSeparator.isEmpty()) {
+      configuration.set(LINE_SEPARATOR, lineSeparator);
+    }
+  }
+
   public static CsvParserSettings extractCsvParserSettings(Configuration job) {
     CsvParserSettings parserSettings = new CsvParserSettings();
     parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
     parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
-    parserSettings.setLineSeparatorDetectionEnabled(true);
+    String lineSeparator = job.get(LINE_SEPARATOR);
+    if (lineSeparator != null) {
+      parserSettings.getFormat().setLineSeparator(lineSeparator);
+    } else {
+      parserSettings.setLineSeparatorDetectionEnabled(true);
+    }
     parserSettings.setNullValue("");
     parserSettings.setEmptyValue("");
     parserSettings.setIgnoreLeadingWhitespaces(false);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index d0883d3..9bcec33 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -93,7 +93,9 @@ public class CarbonLoadModel implements Serializable {
    */
   private String commentChar;
 
-  private String timestampformat;
+  private String lineSeparator;
+
+  private String timestampFormat;
 
   private String dateFormat;
 
@@ -403,8 +405,9 @@ public class CarbonLoadModel implements Serializable {
     copy.badRecordsAction = badRecordsAction;
     copy.escapeChar = escapeChar;
     copy.quoteChar = quoteChar;
+    copy.lineSeparator = lineSeparator;
     copy.commentChar = commentChar;
-    copy.timestampformat = timestampformat;
+    copy.timestampFormat = timestampFormat;
     copy.dateFormat = dateFormat;
     copy.defaultTimestampFormat = defaultTimestampFormat;
     copy.maxColumns = maxColumns;
@@ -457,8 +460,9 @@ public class CarbonLoadModel implements Serializable {
     copyObj.badRecordsAction = badRecordsAction;
     copyObj.escapeChar = escapeChar;
     copyObj.quoteChar = quoteChar;
+    copyObj.lineSeparator = lineSeparator;
     copyObj.commentChar = commentChar;
-    copyObj.timestampformat = timestampformat;
+    copyObj.timestampFormat = timestampFormat;
     copyObj.dateFormat = dateFormat;
     copyObj.defaultTimestampFormat = defaultTimestampFormat;
     copyObj.maxColumns = maxColumns;
@@ -649,6 +653,14 @@ public class CarbonLoadModel implements Serializable {
     this.quoteChar = quoteChar;
   }
 
+  public String getLineSeparator() {
+    return lineSeparator;
+  }
+
+  public void setLineSeparator(String lineSeparator) {
+    this.lineSeparator = lineSeparator;
+  }
+
   public String getCommentChar() {
     return commentChar;
   }
@@ -753,12 +765,12 @@ public class CarbonLoadModel implements Serializable {
     this.badRecordsLocation = badRecordsLocation;
   }
 
-  public String getTimestampformat() {
-    return timestampformat;
+  public String getTimestampFormat() {
+    return timestampFormat;
   }
 
-  public void setTimestampformat(String timestampformat) {
-    this.timestampformat = timestampformat;
+  public void setTimestampFormat(String timestampFormat) {
+    this.timestampFormat = timestampFormat;
   }
 
   public String getSkipEmptyLine() {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 20bed85..59e6345 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -104,7 +104,7 @@ public class CarbonLoadModelBuilder {
               CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
     }
     model.setDateFormat(dateFormat);
-    model.setTimestampformat(timestampFormat);
+    model.setTimestampFormat(timestampFormat);
     validateAndSetColumnCompressor(model);
     validateAndSetBinaryDecoder(model);
     return model;
@@ -182,6 +182,10 @@ public class CarbonLoadModelBuilder {
     carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar"), "\\"));
     carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar"), "\""));
     carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar"), "#"));
+    String lineSeparator = CarbonUtil.unescapeChar(options.get("line_separator"));
+    if (lineSeparator != null) {
+      carbonLoadModel.setLineSeparator(lineSeparator);
+    }
 
     // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
     // we should use table schema to generate file header.
@@ -215,7 +219,7 @@ public class CarbonLoadModelBuilder {
 
     String binaryDecoder = options.get("binary_decoder");
     carbonLoadModel.setBinaryDecoder(binaryDecoder);
-    carbonLoadModel.setTimestampformat(timestampformat);
+    carbonLoadModel.setTimestampFormat(timestampformat);
     carbonLoadModel.setDateFormat(dateFormat);
     carbonLoadModel.setDefaultTimestampFormat(
         CarbonProperties.getInstance().getProperty(


Mime
View raw message