carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: compress CSV file using GZIP while loading
Date Tue, 06 Sep 2016 14:42:05 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master c7999c14e -> 005186223


compress CSV file using GZIP while loading


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/952ba386
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/952ba386
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/952ba386

Branch: refs/heads/master
Commit: 952ba38699367c7f336f4ed07ed606ad38d14e2c
Parents: c7999c1
Author: jackylk <jacky.likun@huawei.com>
Authored: Tue Sep 6 20:07:23 2016 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Sep 6 20:07:23 2016 +0530

----------------------------------------------------------------------
 .../datastorage/store/impl/FileFactory.java     |  41 +++---
 .../examples/DataFrameAPIExample.scala          |  11 +-
 .../apache/carbondata/examples/PerfTest.scala   |   1 -
 .../examples/util/InitForExamples.scala         |   1 -
 .../apache/carbondata/spark/CarbonOption.scala  |   2 +
 .../carbondata/spark/csv/CarbonTextFile.scala   |   2 +
 .../carbondata/spark/csv/DefaultSource.scala    |  17 ++-
 .../org/apache/carbondata/spark/package.scala   |  40 +++---
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +
 .../spark/util/GlobalDictionaryUtil.scala       |   1 +
 .../spark/sql/CarbonDatasourceRelation.scala    |   2 +-
 .../scala/org/apache/spark/util/FileUtils.scala |   4 +-
 .../spark/src/test/resources/sample.csv.gz      | Bin 0 -> 106 bytes
 .../MultiFilesDataLoagdingTestCase.scala        |   2 +-
 .../dataload/SparkDatasourceSuite.scala         |  81 ++++++++++++
 .../dataload/TestLoadDataGeneral.scala          |  66 ++++++++++
 .../csvreaderstep/BoundedDataStream.java        | 126 +++++++++++++++++++
 .../csvreaderstep/CustomDataStream.java         | 126 -------------------
 .../csvreaderstep/UnivocityCsvParser.java       |  43 ++++---
 19 files changed, 374 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
index d537d6e..854ec8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -27,6 +27,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
 
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
@@ -41,6 +43,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.GzipCodec;
 
 public final class FileFactory {
   private static Configuration configuration = null;
@@ -117,36 +120,40 @@ public final class FileFactory {
 
   public static DataInputStream getDataInputStream(String path, FileType fileType)
       throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case LOCAL:
-        return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
-      case HDFS:
-      case VIEWFS:
-        Path pt = new Path(path);
-        FileSystem fs = FileSystem.get(configuration);
-        FSDataInputStream stream = fs.open(pt);
-        return new DataInputStream(new BufferedInputStream(stream));
-      default:
-        return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
-    }
+    return getDataInputStream(path, fileType, -1);
   }
 
   public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
       throws IOException {
     path = path.replace("\\", "/");
+    boolean gzip = path.endsWith(".gz");
+    InputStream stream;
     switch (fileType) {
       case LOCAL:
-        return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+        if (gzip) {
+          stream = new GZIPInputStream(new FileInputStream(path));
+        } else {
+          stream = new FileInputStream(path);
+        }
+        break;
       case HDFS:
       case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = FileSystem.get(configuration);
-        FSDataInputStream stream = fs.open(pt, bufferSize);
-        return new DataInputStream(new BufferedInputStream(stream));
+        if (bufferSize == -1) {
+          stream = fs.open(pt);
+        } else {
+          stream = fs.open(pt, bufferSize);
+        }
+        if (gzip) {
+          GzipCodec codec = new GzipCodec();
+          stream = codec.createInputStream(stream);
+        }
+        break;
       default:
-        return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+        throw new UnsupportedOperationException("unsupported file system");
     }
+    return new DataInputStream(new BufferedInputStream(stream));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
index 57e0f3c..d2ee959 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.SaveMode
 
 import org.apache.carbondata.examples.util.InitForExamples
 
+// scalastyle:off println
 object DataFrameAPIExample {
 
   def main(args: Array[String]) {
@@ -28,7 +29,8 @@ object DataFrameAPIExample {
     val sc = cc.sc
 
     import cc.implicits._
-    // create a dataframe
+
+    // create a dataframe, it can be from parquet or hive table
     val df = sc.parallelize(1 to 1000)
       .map(x => ("a", "b", x))
       .toDF("c1", "c2", "c3")
@@ -47,10 +49,7 @@ object DataFrameAPIExample {
       .load()
 
     val count = in.where($"c3" > 500).select($"*").count()
-
-    // scalastyle:off println
     println(s"count using dataframe.read: $count")
-    // scalastyle:on println
 
     // use SQL to read
     cc.sql("SELECT count(*) FROM carbon1 WHERE c3 > 500").show
@@ -58,8 +57,10 @@ object DataFrameAPIExample {
 
     // also support a implicit function for easier access
     import org.apache.carbondata.spark._
-    df.saveAsCarbonFile(Map("tableName" -> "carbon2"))
+    df.saveAsCarbonFile(Map("tableName" -> "carbon2", "compress" -> "true"))
+
     cc.sql("SELECT count(*) FROM carbon2 WHERE c3 > 100").show
     cc.sql("DROP TABLE IF EXISTS carbon2")
   }
 }
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala b/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
index a18a1e9..cfc5814 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
@@ -21,7 +21,6 @@ import java.io.File
 
 import scala.util.Random
 
-import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.{CarbonContext, DataFrame, Row, SaveMode, SQLContext}
 import org.apache.spark.sql.types.{DataTypes, StructType}
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
b/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
index 34d7736..21377d7 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
@@ -19,7 +19,6 @@ package org.apache.carbondata.examples.util
 
 import java.io.File
 
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.CarbonContext
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index d4282b2..9115d14 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -36,4 +36,6 @@ class CarbonOption(options: Map[String, String]) {
       "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
   }
 
+  def compress: String = options.getOrElse("compress", "false")
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
index 2968ae6..c703a81 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
@@ -36,6 +36,8 @@ private[csv] object CarbonTextFile {
     val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
     hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location)
     hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true)
+    hadoopConfiguration.set("io.compression.codecs", "org.apache.hadoop.io.compress.GzipCodec")
+
     CarbonDataRDDFactory.configSplitMaxSize(sc, location, hadoopConfiguration)
     new NewHadoopRDD[LongWritable, Text](
       sc,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
index 33455b7..cd76651 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
@@ -16,11 +16,11 @@
  */
 package com.databricks.spark.csv.newapi
 
-import com.databricks.spark.csv.CarbonCsvRelation
-import com.databricks.spark.csv.CsvSchemaRDD
-import com.databricks.spark.csv.util.{ ParserLibs, TextFile, TypeCast }
+import com.databricks.spark.csv.{CarbonCsvRelation, CsvSchemaRDD}
+import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast}
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{ DataFrame, SaveMode, SQLContext }
+import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 
@@ -164,9 +164,16 @@ class DefaultSource
     } else {
       true
     }
+
+    val codec: Class[_ <: CompressionCodec] =
+      parameters.getOrElse("codec", "none") match {
+        case "gzip" => classOf[GzipCodec]
+        case _ => null
+      }
+
     if (doSave) {
       // Only save data when the save mode is not ignore.
-      data.saveAsCsvFile(path, parameters)
+      data.saveAsCsvFile(path, parameters, codec)
     }
 
     createRelation(sqlContext, parameters, data.schema)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
index 973ec96..b46af53 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
@@ -18,12 +18,13 @@
 package org.apache.carbondata
 
 import org.apache.hadoop.fs.Path
+import org.apache.spark.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
 
-package object spark {
+package object spark extends Logging {
 
   implicit class toCarbonDataFrame(dataFrame: DataFrame) {
 
@@ -43,38 +44,44 @@ package object spark {
 
       // temporary solution: write to csv file, then load the csv into carbon
       val tempCSVFolder = s"$storePath/$dbName/$tableName/tempCSV"
-      dataFrame.write
-        .format(csvPackage)
-        .option("header", "true")
-        .mode(SaveMode.Overwrite)
-        .save(tempCSVFolder)
+      var writer: DataFrameWriter =
+        dataFrame.write
+                 .format(csvPackage)
+                 .option("header", "false")
+                 .mode(SaveMode.Overwrite)
+
+      if (options.compress.equals("true")) {
+        writer = writer.option("codec", "gzip")
+      }
+
+      writer.save(tempCSVFolder)
 
       val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
       val tempCSVPath = new Path(tempCSVFolder)
       val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
 
-      try {
-        cc.sql(makeCreateTableString(dataFrame.schema, options))
-
-        // add 'csv' as file extension to all generated part file
+      def countSize(): Double = {
+        var size: Double = 0
         val itor = fs.listFiles(tempCSVPath, true)
         while (itor.hasNext) {
           val f = itor.next()
           if (f.getPath.getName.startsWith("part-")) {
-            val newPath = s"${ f.getPath.getParent }/${ f.getPath.getName }.csv"
-            if (!fs.rename(f.getPath, new Path(newPath))) {
-              cc.sql(s"DROP TABLE ${ options.tableName }")
-              throw new RuntimeException("File system rename failed when loading data into
carbon")
-            }
+            size += f.getLen
           }
         }
+        size
+      }
+
+      try {
+        cc.sql(makeCreateTableString(dataFrame.schema, options))
+        logInfo(s"temporary CSV file size: ${countSize() / 1024 / 1024} MB")
         cc.sql(makeLoadString(tableName, tempCSVFolder))
       } finally {
         fs.delete(tempCSVPath, true)
       }
     }
 
-    private def csvPackage: String = "com.databricks.spark.csv"
+    private def csvPackage: String = "com.databricks.spark.csv.newapi"
 
     private def convertToCarbonType(sparkType: DataType): String = {
       sparkType match {
@@ -107,6 +114,7 @@ package object spark {
       s"""
           LOAD DATA INPATH '$csvFolder'
           INTO TABLE $tableName
+          OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
       """
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8f4bd06..ef2920f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -887,6 +887,8 @@ object CarbonDataRDDFactory extends Logging {
           val filePaths = carbonLoadModel.getFactFilePath
           hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
           hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
+          hadoopConfiguration.set("io.compression.codecs",
+            "org.apache.hadoop.io.compress.GzipCodec")
 
           configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index d68fa86..3580810 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -364,6 +364,7 @@ object GlobalDictionaryUtil extends Logging {
       .option("escape", carbonLoadModel.getEscapeChar)
       .option("ignoreLeadingWhiteSpace", "false")
       .option("ignoreTrailingWhiteSpace", "false")
+      .option("codec", "gzip")
       .load(carbonLoadModel.getFactFilePath)
     df
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 3183695..6240c7c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -98,7 +98,7 @@ class CarbonSource
         sys.error(s"ErrorIfExists mode, path $storePath already exists.")
       case (SaveMode.Overwrite, true) =>
         val cc = CarbonContext.getInstance(sqlContext.sparkContext)
-        cc.sql(s"DROP CUBE IF EXISTS ${ options.dbName }.${ options.tableName }")
+        cc.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
         (true, false)
       case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
         (true, false)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
index 558bb1c..b683629 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -43,10 +43,8 @@ object FileUtils extends Logging {
       } else if (fileName.startsWith(CarbonCommonConstants.UNDERSCORE) ||
           fileName.startsWith(CarbonCommonConstants.POINT)) {
         logWarning(s"skip invisible input file: $path")
-      } else if (fileName.toLowerCase().endsWith(".csv")) {
-        stringBuild.append(path.replace('\\', '/')).append(CarbonCommonConstants.COMMA)
       } else {
-        logWarning(s"skip input file: $path, because this path doesn't end with '.csv'")
+        stringBuild.append(path.replace('\\', '/')).append(CarbonCommonConstants.COMMA)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/test/resources/sample.csv.gz
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/sample.csv.gz b/integration/spark/src/test/resources/sample.csv.gz
new file mode 100644
index 0000000..80513b8
Binary files /dev/null and b/integration/spark/src/test/resources/sample.csv.gz differ

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala
index 827aa31..d44c73d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala
@@ -49,7 +49,7 @@ class MultiFilesDataLoagdingTestCase extends QueryTest with BeforeAndAfterAll
{
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table multifile")
     checkAnswer(
       sql("select count(empno) from multifile"),
-      Seq(Row(8))
+      Seq(Row(10))
     )
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
new file mode 100644
index 0000000..73aa2b0
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.io.File
+
+import org.apache.spark.sql.{Row, DataFrame, SaveMode}
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
+
+  var currentDirectory: String = _
+  var df: DataFrame = _
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS carbon1")
+
+    currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+        .getCanonicalPath
+    import implicits._
+    df = sc.parallelize(1 to 1000)
+        .map(x => ("a", "b", x))
+        .toDF("c1", "c2", "c3")
+  }
+
+  test("read and write using CarbonContext") {
+    // save dataframe to carbon file
+    df.write
+        .format("carbondata")
+        .option("tableName", "carbon1")
+        .mode(SaveMode.Overwrite)
+        .save()
+
+    val in = read
+        .format("carbondata")
+        .option("tableName", "carbon1")
+        .load()
+
+    assert(in.where("c3 > 500").count() == 500)
+    sql("DROP TABLE IF EXISTS carbon1")
+  }
+
+  test("saveAsCarbon API") {
+    import org.apache.carbondata.spark._
+    df.saveAsCarbonFile(Map("tableName" -> "carbon2"))
+
+    checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), Seq(Row(900)))
+    sql("DROP TABLE IF EXISTS carbon2")
+  }
+
+  test("saveAsCarbon API using compression") {
+    import org.apache.carbondata.spark._
+    df.saveAsCarbonFile(Map("tableName" -> "carbon2", "compress" -> "true"))
+
+    checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), Seq(Row(900)))
+    sql("DROP TABLE IF EXISTS carbon2")
+  }
+
+  override def afterAll {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
new file mode 100644
index 0000000..9280447
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
+
+  var currentDirectory: String = _
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS loadtest")
+    sql(
+      """
+        | CREATE TABLE loadtest(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+        .getCanonicalPath
+  }
+
+  test("test data loading CSV file") {
+    val testData = currentDirectory + "/src/test/resources/sample.csv"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM loadtest"),
+      Seq(Row(4))
+    )
+  }
+
+  test("test data loading GZIP compressed CSV file") {
+    val testData = currentDirectory + "/src/test/resources/sample.csv.gz"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM loadtest"),
+      Seq(Row(8))
+    )
+  }
+
+  override def afterAll {
+    sql("DROP TABLE loadtest")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/BoundedDataStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/BoundedDataStream.java
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/BoundedDataStream.java
new file mode 100644
index 0000000..a9406df
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/BoundedDataStream.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.csvreaderstep;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Custom reader class to read the data from file it will take care of reading
+ * till the limit assigned to this class
+ */
+public class BoundedDataStream extends InputStream {
+
+  /**
+   * byte value of the new line character
+   */
+  private static final byte END_OF_LINE_BYTE_VALUE = '\n';
+
+  /**
+   * number of extra character to read
+   */
+  private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
+
+  /**
+   * number of bytes remaining
+   */
+  private long remaining;
+  /**
+   * to check whether end of line is found
+   */
+  private boolean endOfLineFound = false;
+
+  private DataInputStream in;
+
+  public BoundedDataStream(DataInputStream in, long limit) {
+    this.in = in;
+    this.remaining = limit;
+  }
+
+  /**
+   * Below method will be used to read the data from file
+   *
+   * @throws IOException
+   *           problem while reading
+   */
+  @Override
+  public int read() throws IOException {
+    if (this.remaining == 0) {
+      return -1;
+    } else {
+      int var1 = this.in.read();
+      if (var1 >= 0) {
+        --this.remaining;
+      }
+
+      return var1;
+    }
+  }
+
+  /**
+   * Below method will be used to read the data from file. If limit reaches in
+   * that case it will read until new line character is reached
+   *
+   * @param buffer
+   *          buffer in which data will be read
+   * @param offset
+   *          from position to buffer will be filled
+   * @param length
+   *          number of character to be read
+   * @throws IOException
+   *           problem while reading
+   */
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    if (this.remaining == 0) {
+      return -1;
+    } else {
+      if (this.remaining < length) {
+        length = (int) this.remaining;
+      }
+
+      length = this.in.read(buffer, offset, length);
+      if (length >= 0) {
+        this.remaining -= length;
+        if (this.remaining == 0 && !endOfLineFound) {
+          endOfLineFound = true;
+          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+        } else if (endOfLineFound) {
+          int end = offset + length;
+          for (int i = offset; i < end; i++) {
+            if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
+              this.remaining = 0;
+              return (i - offset) + 1;
+            }
+          }
+          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+        }
+      }
+      return length;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CustomDataStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CustomDataStream.java
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CustomDataStream.java
deleted file mode 100644
index 0023dda..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CustomDataStream.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.csvreaderstep;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Custom reader class to read the data from file it will take care of reading
- * till the limit assigned to this class
- */
-public class CustomDataStream extends InputStream {
-
-  /**
-   * byte value of the new line character
-   */
-  private static final byte END_OF_LINE_BYTE_VALUE = '\n';
-
-  /**
-   * number of extra character to read
-   */
-  private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
-
-  /**
-   * number of bytes remaining
-   */
-  private long remaining;
-  /**
-   * to check whether end of line is found
-   */
-  private boolean endOfLineFound = false;
-
-  private DataInputStream in;
-
-  public CustomDataStream(DataInputStream in, long limit) {
-    this.in = in;
-    this.remaining = limit;
-  }
-
-  /**
-   * Below method will be used to read the data from file
-   *
-   * @throws IOException
-   *           problem while reading
-   */
-  @Override
-  public int read() throws IOException {
-    if (this.remaining == 0) {
-      return -1;
-    } else {
-      int var1 = this.in.read();
-      if (var1 >= 0) {
-        --this.remaining;
-      }
-
-      return var1;
-    }
-  }
-
-  /**
-   * Below method will be used to read the data from file. If limit reaches in
-   * that case it will read until new line character is reached
-   *
-   * @param buffer
-   *          buffer in which data will be read
-   * @param offset
-   *          from position to buffer will be filled
-   * @param length
-   *          number of character to be read
-   * @throws IOException
-   *           problem while reading
-   */
-  @Override
-  public int read(byte[] buffer, int offset, int length) throws IOException {
-    if (this.remaining == 0) {
-      return -1;
-    } else {
-      if (this.remaining < length) {
-        length = (int) this.remaining;
-      }
-
-      length = this.in.read(buffer, offset, length);
-      if (length >= 0) {
-        this.remaining -= length;
-        if (this.remaining == 0 && !endOfLineFound) {
-          endOfLineFound = true;
-          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
-        } else if (endOfLineFound) {
-          int end = offset + length;
-          for (int i = offset; i < end; i++) {
-            if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
-              this.remaining = 0;
-              return (i - offset) + 1;
-            }
-          }
-          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
-        }
-      }
-      return length;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (in != null) {
-      in.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
index 630586a..49f17dc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
@@ -40,6 +42,8 @@ import org.apache.hadoop.util.LineReader;
  */
 public class UnivocityCsvParser {
 
+  private LogService LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
+
   /**
    * Max number of columns that will be parsed for a row by univocity parsing
    */
@@ -137,25 +141,28 @@ public class UnivocityCsvParser {
     // if already one input stream is open first we need to close and then
     // open new stream
     close();
-    // get the block offset
-    long startOffset = this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockOffset();
-    FileType fileType = FileFactory
-        .getFileType(this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath());
-    // calculate the end offset the block
-    long endOffset =
-        this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockLength() + startOffset;
-
-    // create a input stream for the block
-    DataInputStream dataInputStream = FileFactory
-        .getDataInputStream(this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath(),
-            fileType, bufferSize, startOffset);
-    // if start offset is not 0 then reading then reading and ignoring the extra line
-    if (startOffset != 0) {
-      LineReader lineReader = new LineReader(dataInputStream, 1);
-      startOffset += lineReader.readLine(new Text(), 0);
+
+    String path = this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath();
+    FileType fileType = FileFactory.getFileType(path);
+
+    if (path.endsWith(".gz")) {
+      DataInputStream dataInputStream = FileFactory.getDataInputStream(path, fileType, bufferSize);
+      inputStreamReader = new BufferedReader(new InputStreamReader(dataInputStream));
+    } else {
+      long startOffset = this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockOffset();
+      long blockLength = this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockLength();
+      long endOffset = blockLength + startOffset;
+
+      DataInputStream dataInputStream =
+          FileFactory.getDataInputStream(path, fileType, bufferSize, startOffset);
+      // if start offset is not 0 then reading then reading and ignoring the extra line
+      if (startOffset != 0) {
+        LineReader lineReader = new LineReader(dataInputStream, 1);
+        startOffset += lineReader.readLine(new Text(), 0);
+      }
+      inputStreamReader = new BufferedReader(new InputStreamReader(
+          new BoundedDataStream(dataInputStream, endOffset - startOffset)));
     }
-    inputStreamReader = new BufferedReader(new InputStreamReader(
-        new CustomDataStream(dataInputStream, endOffset - startOffset)));
   }
 
   /**



Mime
View raw message