carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: quotechar and newLine
Date Thu, 01 Sep 2016 07:31:15 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master faba524b2 -> 30988c501


quotechar and newLine


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1a38a49b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1a38a49b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1a38a49b

Branch: refs/heads/master
Commit: 1a38a49bf6daa2a5f392865f17537cab144853f3
Parents: faba524
Author: Jay357089 <liujunjie9@huawei.com>
Authored: Mon Aug 29 22:55:22 2016 +0800
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Thu Sep 1 12:51:02 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/csv/CarbonCsvReader.scala  | 182 +++++++++++++++++++
 .../spark/csv/CarbonCsvRelation.scala           |   2 +-
 .../src/test/resources/dataWithSingleQuote.csv  |   7 +
 .../TestLoadDataWithSingleQuotechar.scala       |  60 ++++++
 4 files changed, 250 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a38a49b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
new file mode 100644
index 0000000..551fc9c
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.sql.readers
+
+/**
+ * Parser for parsing lines in bulk. Use this when efficiency is desired.
+ *
+ * @param iter iterator over lines in the file
+ * @param fieldSep the delimiter used to separate fields in a line
+ * @param lineSep the delimiter used to separate lines
+ * @param quote character used to quote fields
+ * @param escape character used to escape the quote character
+ * @param ignoreLeadingSpace ignore white space before a field
+ * @param ignoreTrailingSpace ignore white space after a field
+ * @param headers headers for the columns
+ * @param inputBufSize size of buffer to use for parsing input, tune for performance
+ * @param maxCols maximum number of columns allowed, for safety against bad inputs
+ */
+class CarbonBulkCsvReader (iter: Iterator[String],
+    split: Int,
+    fieldSep: Char = ',',
+    lineSep: String = "\n",
+    quote: Char = '"',
+    escape: Char = '\\',
+    commentMarker: Char = '#',
+    ignoreLeadingSpace: Boolean = true,
+    ignoreTrailingSpace: Boolean = true,
+    headers: Seq[String],
+    inputBufSize: Int = 128,
+    maxCols: Int = 20480)
+  extends CsvReader(fieldSep,
+      lineSep,
+      quote,
+      escape,
+      commentMarker,
+      ignoreLeadingSpace,
+      ignoreTrailingSpace,
+      headers,
+      inputBufSize,
+      maxCols)
+    with Iterator[Array[String]] {
+
+  private val reader = new CarbonStringIteratorReader(iter)
+  parser.beginParsing(reader)
+  private var nextRecord = parser.parseNext()
+
+  /**
+   * get the next parsed line.
+   *
+   * @return array of strings where each string is a field in the CSV record
+   */
+  def next: Array[String] = {
+    val curRecord = nextRecord
+    if(curRecord != null) {
+      nextRecord = parser.parseNext()
+    } else {
+      throw new NoSuchElementException("next record is null")
+    }
+    curRecord
+  }
+
+  def hasNext: Boolean = nextRecord != null
+
+}
+
+/**
+ * A Reader that "reads" from a sequence of lines. Spark's textFile method removes newlines
at
+ * end of each line Univocity parser requires a Reader that provides access to the data to
be
+ * parsed and needs the newlines to be present
+ * @param iter iterator over RDD[String]
+ */
+private class CarbonStringIteratorReader(val iter: Iterator[String]) extends java.io.Reader
{
+
+  private var next: Long = 0
+  private var length: Long = 0  // length of input so far
+  private var start: Long = 0
+  private var str: String = null   // current string from iter
+
+  /**
+   * fetch next string from iter, if done with current one
+   * pretend there is a new line at the end of every string we get from from iter
+   */
+  private def refill(): Unit = {
+    if (length == next) {
+      if (iter.hasNext) {
+        str = iter.next
+        start = length
+        // add a space to every line except the last one to store '\n'
+        if (iter.hasNext) {
+          length += (str.length + 1) // allowance for newline removed by SparkContext.textFile()
+        } else {
+          length += str.length
+        }
+      } else {
+        str = null
+      }
+    }
+  }
+
+  /**
+   * read the next character, if at end of string pretend there is a new line
+   */
+  override def read(): Int = {
+    refill()
+    if(next >= length) {
+      -1
+    } else {
+      val cur = next - start
+      next += 1
+      if (cur == str.length) '\n' else str.charAt(cur.toInt)
+    }
+  }
+
+  /**
+   * read from str into cbuf
+   */
+  def read(cbuf: Array[Char], off: Int, len: Int): Int = {
+    refill()
+    var n = 0
+    if ((off < 0) || (off > cbuf.length) || (len < 0) ||
+      ((off + len) > cbuf.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException()
+    } else if (len == 0) {
+      n = 0
+    } else {
+      if (next >= length) {   // end of input
+        n = -1
+      } else {
+        n = Math.min(length - next, len).toInt // lesser of amount of input available or
buf size
+        // add a '\n' to every line except the last one
+        if (n == length - next && iter.hasNext) {
+          str.getChars((next - start).toInt, (next - start + n - 1).toInt, cbuf, off)
+          cbuf(off + n - 1) = '\n'
+        } else {
+          str.getChars((next - start).toInt, (next - start + n).toInt, cbuf, off)
+        }
+        next += n
+        if (n < len) {
+          val m = read(cbuf, off + n, len - n)  // have more space, fetch more input from
iter
+          if(m != -1) n += m
+        }
+      }
+    }
+    n
+  }
+
+  override def skip(ns: Long): Long = {
+    throw new IllegalArgumentException("Skip not implemented")
+  }
+
+  override def ready: Boolean = {
+    refill()
+    true
+  }
+
+  override def markSupported: Boolean = false
+
+  override def mark(readAheadLimit: Int): Unit = {
+    throw new IllegalArgumentException("Mark not implemented")
+  }
+
+  override def reset(): Unit = {
+    throw new IllegalArgumentException("Mark and hence reset not implemented")
+  }
+
+  def close(): Unit = { }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a38a49b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
index ae527ff..6a8021a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
@@ -191,7 +191,7 @@ case class CarbonCsvRelation protected[spark] (
         val escapeVal = if (escape == null) '\\' else escape.charValue()
         val commentChar: Char = if (comment == null) '\0' else comment
 
-        new BulkCsvReader(iter, split,
+        new CarbonBulkCsvReader(iter, split,
           headers = header, fieldSep = delimiter,
           quote = quote, escape = escapeVal, commentMarker = commentChar,
           ignoreLeadingSpace = ignoreLeadingWhiteSpace,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a38a49b/integration/spark/src/test/resources/dataWithSingleQuote.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/dataWithSingleQuote.csv b/integration/spark/src/test/resources/dataWithSingleQuote.csv
new file mode 100644
index 0000000..3ba5469
--- /dev/null
+++ b/integration/spark/src/test/resources/dataWithSingleQuote.csv
@@ -0,0 +1,7 @@
+id,name
+1,Tom
+2,"Tony
+3,Lily"
+4,Games"
+5,"prival\"
+6,"hello\"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a38a49b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
new file mode 100644
index 0000000..bc39df3
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test Class for data loading when there is single quote in fact data
+ *
+ */
+class TestLoadDataWithSingleQuotechar extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS carbontable")
+    sql(
+      "CREATE TABLE carbontable (id Int, name String) STORED BY 'carbondata'")
+  }
+
+  test("test data loading with single quote char") {
+    try {
+      sql(
+        "LOAD DATA LOCAL INPATH './src/test/resources/dataWithSingleQuote.csv' INTO TABLE
" +
+          "carbontable OPTIONS('DELIMITER'= ',')")
+      checkAnswer(
+        sql("SELECT * from carbontable"),
+        Seq(Row("Tom",1),
+          Row("Tony\n3,Lily",2),
+          Row("Games\"",4),
+          Row("prival\"\n6,\"hello\"",5)
+        )
+      )
+    } catch {
+      case e: Throwable =>
+        assert(false)
+    }
+  }
+
+  override def afterAll {
+    sql("DROP TABLE carbontable")
+  }
+}


Mime
View raw message