carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [8/9] carbondata git commit: [CARBONDATA-1597] Remove spark1 integration
Date Thu, 19 Oct 2017 10:33:13 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
deleted file mode 100644
index 824730f..0000000
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.File
-
-import scala.util.Random
-
-import org.apache.spark.sql.{CarbonContext, DataFrame, Row, SaveMode, SQLContext}
-import org.apache.spark.sql.types.{DataTypes, StructType}
-
-import org.apache.carbondata.examples.PerfTest._
-import org.apache.carbondata.examples.util.ExampleUtils
-
-// scalastyle:off println
-
-/**
- * represent one query
- */
-class Query(val queryType: String, val queryNo: Int, val sqlString: String) {
-
-  /**
-   * run the query in a batch and calculate average time
-   *
-   * @param sqlContext context to run the query
-   * @param runs run how many time
-   * @param datasource datasource to run
-   */
-  def run(sqlContext: SQLContext, runs: Int, datasource: String): QueryResult = {
-    // run repeated and calculate average time elapsed
-    require(runs >= 1)
-    val sqlToRun = makeSQLString(datasource)
-
-    val firstTime = withTime {
-      sqlContext.sql(sqlToRun).collect
-    }
-
-    var totalTime: Long = 0
-    var result: Array[Row] = null
-    (1 to (runs - 1)).foreach { x =>
-      totalTime += withTime {
-        result = sqlContext.sql(sqlToRun).collect
-      }
-    }
-
-    val avgTime = totalTime / (runs - 1)
-    QueryResult(datasource, result, avgTime, firstTime)
-  }
-
-  private def makeSQLString(datasource: String): String = {
-    sqlString.replaceFirst("tableName", PerfTest.makeTableName(datasource))
-  }
-
-}
-
-/**
- * query performance result
- */
-case class QueryResult(datasource: String, result: Array[Row], avgTime: Long, firstTime: Long)
-
-class QueryRunner(sqlContext: SQLContext, dataFrame: DataFrame, datasources: Seq[String]) {
-
-  /**
-   * run a query on each datasource
-   */
-  def run(query: Query, runs: Int): Seq[QueryResult] = {
-    var results = Seq[QueryResult]()
-    datasources.foreach { datasource =>
-      val result = query.run(sqlContext, runs, datasource)
-      results :+= result
-    }
-    checkResult(results)
-    results
-  }
-
-  private def checkResult(results: Seq[QueryResult]): Unit = {
-    results.foldLeft(results.head) { (last, cur) =>
-      if (last.result.sortBy(_.toString()).sameElements(cur.result.sortBy(_.toString()))) cur
-      else sys.error(s"result is not the same between " +
-          s"${last.datasource} and " +
-          s"${cur.datasource}")
-    }
-  }
-
-  private def loadToNative(datasource: String): Unit = {
-    val savePath = PerfTest.savePath(datasource)
-    println(s"loading data into $datasource, path: $savePath")
-    dataFrame.write
-        .mode(SaveMode.Overwrite)
-        .format(datasource)
-        .save(savePath)
-    sqlContext.read
-        .format(datasource)
-        .load(savePath)
-        .registerTempTable(PerfTest.makeTableName(datasource))
-  }
-
-  /**
-   * load data to each datasource
-   */
-  def loadData: Seq[QueryResult] = {
-    // load data into all datasources
-    var results = Seq[QueryResult]()
-    datasources.foreach { datasource =>
-      val time = withTime {
-        datasource match {
-          case "parquet" =>
-            dataFrame.sqlContext.setConf(s"spark.sql.$datasource.compression.codec", "snappy")
-            loadToNative(datasource)
-          case "orc" =>
-            dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("orc.compress", "SNAPPY")
-            loadToNative(datasource)
-          case "carbon" =>
-            sqlContext.sql(s"DROP TABLE IF EXISTS ${PerfTest.makeTableName(datasource)}")
-            println(s"loading data into $datasource, path: " +
-                s"${dataFrame.sqlContext.asInstanceOf[CarbonContext].storePath}")
-            dataFrame.write
-                .format("org.apache.spark.sql.CarbonSource")
-                .option("tableName", PerfTest.makeTableName(datasource))
-                .mode(SaveMode.Overwrite)
-                .save()
-          case _ => sys.error("unsupported data source")
-        }
-      }
-      println(s"load data into $datasource completed, time taken ${time/1000000}ms")
-      results :+= QueryResult(datasource, null, time, time)
-    }
-    results
-  }
-
-  def shutDown(): Unit = {
-    // drop all tables and temp files
-    datasources.foreach {
-        case datasource @ ("parquet" | "orc") =>
-          val f = new File(PerfTest.savePath(datasource))
-          if (f.exists()) f.delete()
-        case "carbon" =>
-          sqlContext.sql(s"DROP TABLE IF EXISTS ${PerfTest.makeTableName("carbon")}")
-        case _ => sys.error("unsupported data source")
-    }
-  }
-}
-
-/**
- * template for table data generation
- *
- * @param dimension number of dimension columns and their cardinality
- * @param measure number of measure columns
- */
-case class TableTemplate(dimension: Seq[(Int, Int)], measure: Int)
-
-/**
- * utility to generate random data according to template
- */
-class TableGenerator(sqlContext: SQLContext) {
-
-  /**
-   * generate a dataframe from random data
-   */
-  def genDataFrame(template: TableTemplate, rows: Int): DataFrame = {
-    val measures = template.measure
-    val dimensions = template.dimension.foldLeft(0) {(x, y) => x + y._1}
-    val cardinality = template.dimension.foldLeft(Seq[Int]()) {(x, y) =>
-      x ++ (1 to y._1).map(z => y._2)
-    }
-    print(s"generating data: $rows rows of $dimensions dimensions and $measures measures. ")
-    println("cardinality for each dimension: " + cardinality.mkString(", "))
-
-    val dimensionFields = (1 to dimensions).map { id =>
-      DataTypes.createStructField(s"c$id", DataTypes.StringType, false)
-    }
-    val measureFields = (dimensions + 1 to dimensions + measures).map { id =>
-      DataTypes.createStructField(s"c$id", DataTypes.IntegerType, false)
-    }
-    val schema = StructType(dimensionFields ++ measureFields)
-    val data = sqlContext.sparkContext.parallelize(1 to rows).map { x =>
-      val random = new Random()
-      val dimSeq = (1 to dimensions).map { y =>
-        s"P${y}_${random.nextInt(cardinality(y - 1))}"
-      }
-      val msrSeq = (1 to measures).map { y =>
-        random.nextInt(10)
-      }
-      Row.fromSeq(dimSeq ++ msrSeq)
-    }
-    val df = sqlContext.createDataFrame(data, schema)
-    df.write.mode(SaveMode.Overwrite).parquet(PerfTest.savePath("temp"))
-    sqlContext.parquetFile(PerfTest.savePath("temp"))
-  }
-}
-
-object PerfTest {
-
-  private val olap: Seq[String] = Seq(
-    """SELECT c3, c4, sum(c8) FROM tableName
-      |WHERE c1 = 'P1_23' and c2 = 'P2_43'
-      |GROUP BY c3, c4""".stripMargin,
-
-    """SELECT c2, c3, sum(c9) FROM tableName
-      |WHERE c1 = 'P1_432' and c4 = 'P4_3' and c5 = 'P5_2'
-      |GROUP by c2, c3 """.stripMargin,
-
-    """SELECT c2, count(distinct c1), sum(c8) FROM tableName
-      |WHERE c3="P3_4" and c5="P5_4"
-      |GROUP BY c2 """.stripMargin,
-
-    """SELECT c2, c5, count(distinct c1), sum(c7) FROM tableName
-      |WHERE c4="P4_4" and c5="P5_7" and c8>4
-      |GROUP BY c2, c5 """.stripMargin
-  )
-
-  private val point: Seq[String] = Seq(
-    """SELECT c4 FROM tableName
-      |WHERE c1="P1_43" """.stripMargin,
-
-    """SELECT c3 FROM tableName
-      |WHERE c1="P1_542" and c2="P2_23" """.stripMargin,
-
-    """SELECT c3, c5 FROM tableName
-      |WHERE c1="P1_52" and c7=4""".stripMargin,
-
-    """SELECT c4, c9 FROM tableName
-      |WHERE c1="P1_43" and c8<3""".stripMargin
-  )
-
-  private val filter: Seq[String] = Seq(
-    """SELECT * FROM tableName
-      |WHERE c2="P2_43" """.stripMargin,
-
-    """SELECT * FROM tableName
-      |WHERE c3="P3_3"  """.stripMargin,
-
-    """SELECT * FROM tableName
-      |WHERE c2="P2_32" and c3="P3_23" """.stripMargin,
-
-    """SELECT * FROM tableName
-      |WHERE c3="P3_28" and c4="P4_3" """.stripMargin
-  )
-
-  private val scan: Seq[String] = Seq(
-    """SELECT sum(c7), sum(c8), avg(c9), max(c10) FROM tableName """.stripMargin,
-
-    """SELECT sum(c7) FROM tableName
-      |WHERE c2="P2_32" """.stripMargin,
-
-    """SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM tableName
-      |WHERE c4="P4_4" """.stripMargin,
-
-    """SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM tableName
-      |WHERE c2="P2_75" and c6<5 """.stripMargin
-  )
-
-  def main(args: Array[String]) {
-    val cc = ExampleUtils.createCarbonContext("PerfTest")
-
-    // prepare performance queries
-    var workload = Seq[Query]()
-    olap.zipWithIndex.foreach(x => workload :+= new Query("OLAP Query", x._2, x._1))
-    point.zipWithIndex.foreach(x => workload :+= new Query("Point Query", x._2, x._1))
-    filter.zipWithIndex.foreach(x => workload :+= new Query("Filter Query", x._2, x._1))
-    scan.zipWithIndex.foreach(x => workload :+= new Query("Scan Query", x._2, x._1))
-
-    // prepare data
-    val rows = 3 * 1000 * 1000
-    val dimension = Seq((1, 1 * 1000), (1, 100), (1, 50), (2, 10)) // cardinality for each column
-    val measure = 5 // number of measure
-    val template = TableTemplate(dimension, measure)
-    val df = new TableGenerator(cc).genDataFrame(template, rows)
-    println("generate data completed")
-
-    // run all queries against all data sources
-    val datasource = Seq("parquet", "orc", "carbon")
-    val runner = new QueryRunner(cc, df, datasource)
-
-    val results = runner.loadData
-    println(s"load performance: ${results.map(_.avgTime / 1000000L).mkString(", ")}")
-
-    var parquetTime: Double = 0
-    var orcTime: Double = 0
-    var carbonTime: Double = 0
-
-    println(s"query id: ${datasource.mkString(", ")}, result in millisecond")
-    workload.foreach { query =>
-      // run 4 times each round, will print performance of first run and avg time of last 3 runs
-      print(s"${query.queryType} ${query.queryNo}: ")
-      val results = runner.run(query, 4)
-      print(s"${results.map(_.avgTime / 1000000L).mkString(", ")} ")
-      println(s"[sql: ${query.sqlString.replace('\n', ' ')}]")
-      parquetTime += results(0).avgTime
-      orcTime += results(1).avgTime
-      carbonTime += results(2).avgTime
-    }
-
-    println(s"Total time: ${parquetTime / 1000000}, ${orcTime / 1000000}, " +
-        s"${carbonTime / 1000000} = 1 : ${parquetTime / orcTime} : ${parquetTime / carbonTime}")
-    runner.shutDown()
-  }
-
-  def makeTableName(datasource: String): String = {
-    s"${datasource}_perftest_table"
-  }
-
-  def savePath(datasource: String): String =
-      s"${ExampleUtils.currentPath}/target/perftest/${datasource}"
-
-  def withTime(body: => Unit): Long = {
-    val start = System.nanoTime()
-    body
-    System.nanoTime() - start
-  }
-
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
deleted file mode 100644
index 3ab61bf..0000000
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples.util
-
-import java.io.DataOutputStream
-
-import scala.collection.mutable.{ArrayBuffer, HashSet}
-
-import org.apache.spark.SparkContext
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory
-
-object AllDictionaryUtil {
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  def extractDictionary(sc: SparkContext,
-                        srcData: String,
-                        outputPath: String,
-                        fileHeader: String,
-                        dictCol: String): Unit = {
-    val fileHeaderArr = fileHeader.split(",")
-    val isDictCol = new Array[Boolean](fileHeaderArr.length)
-    for (i <- 0 until fileHeaderArr.length) {
-      if (dictCol.contains("|" + fileHeaderArr(i).toLowerCase() + "|")) {
-        isDictCol(i) = true
-      } else {
-        isDictCol(i) = false
-      }
-    }
-    val dictionaryRdd = sc.textFile(srcData).flatMap(x => {
-      val tokens = x.split(",")
-      val result = new ArrayBuffer[(Int, String)]()
-      for (i <- 0 until isDictCol.length) {
-        if (isDictCol(i)) {
-          try {
-            result += ((i, tokens(i)))
-          } catch {
-            case ex: ArrayIndexOutOfBoundsException =>
-              LOGGER.error("Read a bad record: " + x)
-          }
-        }
-      }
-      result
-    }).groupByKey().flatMap(x => {
-      val distinctValues = new HashSet[(Int, String)]()
-      for (value <- x._2) {
-        distinctValues.add(x._1, value)
-      }
-      distinctValues
-    })
-    val dictionaryValues = dictionaryRdd.map(x => x._1 + "," + x._2).collect()
-    saveToFile(dictionaryValues, outputPath)
-  }
-
-  def cleanDictionary(outputPath: String): Unit = {
-    try {
-      val fileType = FileFactory.getFileType(outputPath)
-      val file = FileFactory.getCarbonFile(outputPath, fileType)
-      if (file.exists()) {
-        file.delete()
-      }
-    } catch {
-      case ex: Exception =>
-        LOGGER.error("Clean dictionary catching exception:" + ex)
-    }
-  }
-
-  def saveToFile(contents: Array[String], outputPath: String): Unit = {
-    var writer: DataOutputStream = null
-    try {
-      val fileType = FileFactory.getFileType(outputPath)
-      val file = FileFactory.getCarbonFile(outputPath, fileType)
-      if (!file.exists()) {
-        file.createNewFile()
-      }
-      writer = FileFactory.getDataOutputStream(outputPath, fileType)
-      for (content <- contents) {
-        writer.writeBytes(content + "\n")
-      }
-    } catch {
-      case ex: Exception =>
-        LOGGER.error("Save dictionary to file catching exception:" + ex)
-    } finally {
-      if (writer != null) {
-        try {
-          writer.close()
-        } catch {
-          case ex: Exception =>
-            LOGGER.error("Close output stream catching exception:" + ex)
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
deleted file mode 100644
index f98ec3b..0000000
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples.util
-
-import java.io.File
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{CarbonContext, SaveMode}
-
-import org.apache.carbondata.core.util.CarbonProperties
-
-// scalastyle:off println
-
-object ExampleUtils {
-
-  def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../")
-      .getCanonicalPath
-  val storeLocation = currentPath + "/target/store"
-
-  def createCarbonContext(appName: String): CarbonContext = {
-    val sc = new SparkContext(new SparkConf()
-        .setAppName(appName)
-        .setMaster("local[2]"))
-    sc.setLogLevel("ERROR")
-
-    println(s"Starting $appName using spark version ${sc.version}")
-
-    val cc = new CarbonContext(sc, storeLocation, currentPath + "/target/carbonmetastore")
-
-    CarbonProperties.getInstance()
-      .addProperty("carbon.storelocation", storeLocation)
-    cc
-  }
-
-  /**
-   * This func will write a sample CarbonData file containing following schema:
-   * c1: String, c2: String, c3: Double
-   * Returns table path
-   */
-  def writeSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): String = {
-    cc.sql(s"DROP TABLE IF EXISTS $tableName")
-    writeDataframe(cc, tableName, numRows, SaveMode.Overwrite)
-    s"$storeLocation/default/$tableName"
-  }
-
-  /**
-   * This func will append data to the CarbonData file
-   * Returns table path
-   */
-  def appendSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): String = {
-    writeDataframe(cc, tableName, numRows, SaveMode.Append)
-    s"$storeLocation/default/$tableName"
-  }
-
-  /**
-   * create a new dataframe and write to CarbonData file, based on save mode
-   */
-  private def writeDataframe(
-      cc: CarbonContext, tableName: String, numRows: Int, mode: SaveMode): Unit = {
-    // use CarbonContext to write CarbonData files
-    import cc.implicits._
-    val sc = cc.sparkContext
-    val df = sc.parallelize(1 to numRows, 2)
-        .map(x => ("a", "b", x))
-        .toDF("c1", "c2", "c3")
-
-    // save dataframe directl to carbon file without tempCSV
-    df.write
-      .format("carbondata")
-      .option("tableName", tableName)
-      .option("compress", "true")
-      .option("tempCSV", "false")
-      .mode(mode)
-      .save()
-  }
-
-  def cleanSampleCarbonFile(cc: CarbonContext, tableName: String): Unit = {
-    cc.sql(s"DROP TABLE IF EXISTS $tableName")
-  }
-}
-// scalastyle:on println
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index 94af8ec..af25771 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -38,20 +38,6 @@
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-spark2</artifactId>
       <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-hive-thriftserver_2.10</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-repl_2.10</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-sql_2.10</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/hive/pom.xml
----------------------------------------------------------------------
diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml
index 13bd581..e0ad499 100644
--- a/integration/hive/pom.xml
+++ b/integration/hive/pom.xml
@@ -67,20 +67,6 @@
             <groupId>org.apache.carbondata</groupId>
             <artifactId>carbondata-spark2</artifactId>
             <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
-                    <artifactId>spark-hive-thriftserver_2.10</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
-                    <artifactId>spark-repl_2.10</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
-                    <artifactId>spark-sql_2.10</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hive</groupId>
@@ -116,12 +102,6 @@
             <groupId>org.apache.carbondata</groupId>
             <artifactId>carbondata-hadoop</artifactId>
             <version>${project.version}</version>
-            <exclusions>
-              <exclusion>
-                <groupId>org.apache.spark</groupId>
-                <artifactId>spark-sql_2.10</artifactId>
-              </exclusion>
-            </exclusions>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index e11ce4c..13d351d 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -244,10 +244,6 @@
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-databind</artifactId>
         </exclusion>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-sql_2.10</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -371,10 +367,6 @@
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-databind</artifactId>
         </exclusion>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-sql_2.10</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark-common-cluster-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml
index 9728a5c..e529035 100644
--- a/integration/spark-common-cluster-test/pom.xml
+++ b/integration/spark-common-cluster-test/pom.xml
@@ -40,12 +40,6 @@
       <artifactId>carbondata-spark-common</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-hive-thriftserver_2.10</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -167,16 +161,6 @@
           <artifactId>carbondata-spark</artifactId>
           <version>${project.version}</version>
           <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-hive-thriftserver_2.10</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-repl_2.10</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
       </dependencies>
     </profile>
@@ -188,16 +172,6 @@
           <artifactId>carbondata-spark</artifactId>
           <version>${project.version}</version>
           <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-hive-thriftserver_2.10</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-repl_2.10</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
       </dependencies>
     </profile>
@@ -212,16 +186,6 @@
           <artifactId>carbondata-spark2</artifactId>
           <version>${project.version}</version>
           <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-hive-thriftserver_2.10</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-repl_2.10</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
       </dependencies>
     </profile>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index 7c67000..b2ee316 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -91,12 +91,6 @@
       <artifactId>carbondata-spark-common</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-hive-thriftserver_2.10</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -341,16 +335,6 @@
           <artifactId>carbondata-spark</artifactId>
           <version>${project.version}</version>
           <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-hive-thriftserver_2.10</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-repl_2.10</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
       </dependencies>
     </profile>
@@ -362,16 +346,6 @@
           <artifactId>carbondata-spark</artifactId>
           <version>${project.version}</version>
           <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-hive-thriftserver_2.10</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-repl_2.10</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
       </dependencies>
     </profile>
@@ -386,16 +360,6 @@
           <artifactId>carbondata-spark2</artifactId>
           <version>${project.version}</version>
           <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-hive-thriftserver_2.10</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.spark</groupId>
-              <artifactId>spark-repl_2.10</artifactId>
-            </exclusion>
-          </exclusions>
         </dependency>
       </dependencies>
     </profile>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 82ff7a4..d40e213 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -38,16 +38,6 @@
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-processing</artifactId>
       <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-hive-thriftserver_2.10</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-sql_2.10</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/CARBON_SPARK_INTERFACELogResource.properties
----------------------------------------------------------------------
diff --git a/integration/spark/CARBON_SPARK_INTERFACELogResource.properties b/integration/spark/CARBON_SPARK_INTERFACELogResource.properties
deleted file mode 100644
index 61856cf..0000000
--- a/integration/spark/CARBON_SPARK_INTERFACELogResource.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-#
-carbon.spark.interface = {0}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
deleted file mode 100644
index 5060809..0000000
--- a/integration/spark/pom.xml
+++ /dev/null
@@ -1,194 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.carbondata</groupId>
-    <artifactId>carbondata-parent</artifactId>
-    <version>1.3.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>carbondata-spark</artifactId>
-  <name>Apache CarbonData :: Spark</name>
-
-  <properties>
-    <dev.path>${basedir}/../../dev</dev.path>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-core</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-processing</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-hadoop</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-spark-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-compiler</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-reflect</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-repl_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <version>2.2.1</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <testSourceDirectory>src/test/scala</testSourceDirectory>
-    <resources>
-      <resource>
-        <directory>src/resources</directory>
-      </resource>
-      <resource>
-        <directory>.</directory>
-        <includes>
-          <include>CARBON_SPARK_INTERFACELogResource.properties</include>
-        </includes>
-      </resource>
-    </resources>
-    <plugins>
-      <plugin>
-        <groupId>org.scala-tools</groupId>
-        <artifactId>maven-scala-plugin</artifactId>
-        <version>2.15.2</version>
-        <executions>
-          <execution>
-            <id>compile</id>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-            <phase>compile</phase>
-          </execution>
-          <execution>
-            <id>testCompile</id>
-            <goals>
-              <goal>testCompile</goal>
-            </goals>
-            <phase>test</phase>
-          </execution>
-          <execution>
-            <phase>process-resources</phase>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>2.18</version>
-        <!-- Note config is repeated in scalatest config -->
-        <configuration>
-          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
-          <systemProperties>
-            <java.awt.headless>true</java.awt.headless>
-          </systemProperties>
-          <failIfNoTests>false</failIfNoTests>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-        <version>1.0</version>
-        <!-- Note config is repeated in surefire config -->
-        <configuration>
-          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-          <junitxml>.</junitxml>
-          <filereports>CarbonTestSuite.txt</filereports>
-          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
-          </argLine>
-          <stderr />
-          <environmentVariables>
-          </environmentVariables>
-          <systemProperties>
-            <java.awt.headless>true</java.awt.headless>
-          </systemProperties>
-        </configuration>
-        <executions>
-          <execution>
-            <id>test</id>
-            <goals>
-              <goal>test</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-  <profiles>
-    <profile>
-      <id>sdvtest</id>
-      <properties>
-        <maven.test.skip>true</maven.test.skip>
-      </properties>
-    </profile>
-  </profiles>
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
deleted file mode 100644
index 92c8402..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.readsupport;
-
-import java.io.IOException;
-import java.sql.Date;
-import java.sql.Timestamp;
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
-
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
-import org.apache.spark.unsafe.types.UTF8String;
-
-public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> {
-
-  @Override public void initialize(CarbonColumn[] carbonColumns,
-      AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
-    super.initialize(carbonColumns, absoluteTableIdentifier);
-    //can initialize and generate schema here.
-  }
-
-  @Override public Row readRow(Object[] data) {
-    for (int i = 0; i < dictionaries.length; i++) {
-      if (data[i] == null) {
-        continue;
-      }
-      if (dictionaries[i] != null) {
-        data[i] = DataTypeUtil
-            .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKeyInBytes((int) data[i]),
-                (CarbonDimension) carbonColumns[i]);
-        if (data[i] == null) {
-          continue;
-        }
-        if (dataTypes[i] == DataTypes.STRING) {
-          data[i] = UTF8String.fromString(data[i].toString());
-        } else if (dataTypes[i] == DataTypes.TIMESTAMP) {
-          data[i] = new Timestamp((long) data[i]);
-        } else if (dataTypes[i] == DataTypes.DATE) {
-          data[i] = new Date((long) data[i]);
-        } else if (dataTypes[i] == DataTypes.LONG) {
-          data[i] = data[i];
-        }
-      }
-      else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        //convert the long to timestamp in case of direct dictionary column
-        if (DataTypes.TIMESTAMP == carbonColumns[i].getDataType()) {
-          data[i] = new Timestamp((long) data[i] / 1000L);
-        } else if (DataTypes.DATE == carbonColumns[i].getDataType()) {
-          data[i] = new Date((long) data[i]);
-        }
-      }
-    }
-    return new GenericRow(data);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
deleted file mode 100644
index 7881b93..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.command.LoadTable
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
-
-class CarbonDataFrameWriter(val dataFrame: DataFrame) {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
-    checkContext()
-    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
-
-    // create a new table using dataframe's schema and write its content into the table
-    cc.sql(makeCreateTableString(dataFrame.schema, new CarbonOption(parameters)))
-    writeToCarbonFile(parameters)
-  }
-
-  def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
-    // append the data as a new load
-    checkContext()
-    writeToCarbonFile(parameters)
-  }
-
-  private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
-    val options = new CarbonOption(parameters)
-    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
-    if (options.tempCSV) {
-      loadTempCSV(options, cc)
-    } else {
-      loadDataFrame(options, cc)
-    }
-  }
-
-  /**
-   * Firstly, saving DataFrame to CSV files
-   * Secondly, load CSV files
-   * @param options
-   * @param cc
-   */
-  private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = {
-    // temporary solution: write to csv file, then load the csv into carbon
-    val storePath = CarbonEnv.get.carbonMetastore.storePath
-    val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
-      .append("tempCSV")
-      .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
-      .append(CarbonCommonConstants.UNDERSCORE).append(options.tableName)
-      .append(CarbonCommonConstants.UNDERSCORE).append(System.nanoTime()).toString
-    writeToTempCSVFile(tempCSVFolder, options)
-
-    val tempCSVPath = new Path(tempCSVFolder)
-    val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
-
-    def countSize(): Double = {
-      var size: Double = 0
-      val itor = fs.listFiles(tempCSVPath, true)
-      while (itor.hasNext) {
-        val f = itor.next()
-        if (f.getPath.getName.startsWith("part-")) {
-          size += f.getLen
-        }
-      }
-      size
-    }
-
-    LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
-
-    try {
-      cc.sql(makeLoadString(tempCSVFolder, options))
-    } finally {
-      fs.delete(tempCSVPath, true)
-    }
-  }
-
-  private def checkContext(): Unit = {
-    // To avoid derby problem, dataframe need to be writen and read using CarbonContext
-    require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
-      "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe"
-    )
-  }
-
-  private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = {
-
-    val strRDD = dataFrame.rdd.mapPartitions { case iter =>
-      new Iterator[String] {
-        override def hasNext = iter.hasNext
-
-        def convertToCSVString(seq: Seq[Any]): String = {
-          val build = new java.lang.StringBuilder()
-          if (seq.head != null) {
-            build.append(seq.head.toString)
-          }
-          val itemIter = seq.tail.iterator
-          while (itemIter.hasNext) {
-            build.append(CarbonCommonConstants.COMMA)
-            val value = itemIter.next()
-            if (value != null) {
-              build.append(value.toString)
-            }
-          }
-          build.toString
-        }
-
-        override def next: String = {
-          convertToCSVString(iter.next.toSeq)
-        }
-      }
-    }
-
-    if (options.compress) {
-      strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec])
-    } else {
-      strRDD.saveAsTextFile(tempCSVFolder)
-    }
-  }
-
-  /**
-   * Loading DataFrame directly without saving DataFrame to CSV files.
-   * @param options
-   * @param cc
-   */
-  private def loadDataFrame(options: CarbonOption, cc: CarbonContext): Unit = {
-    val header = dataFrame.columns.mkString(",")
-    LoadTable(
-      Some(options.dbName),
-      options.tableName,
-      null,
-      Seq(),
-      Map("fileheader" -> header) ++ options.toMap,
-      isOverwriteExist = false,
-      null,
-      Some(dataFrame),
-      None).run(cc)
-  }
-
-  private def convertToCarbonType(sparkType: DataType): String = {
-    sparkType match {
-      case StringType => CarbonType.STRING.getName
-      case IntegerType => CarbonType.INT.getName
-      case ShortType => "smallint"
-      case LongType => "bigint"
-      case FloatType => CarbonType.DOUBLE.getName
-      case DoubleType => CarbonType.DOUBLE.getName
-      case TimestampType => CarbonType.TIMESTAMP.getName
-      case DateType => CarbonType.DATE.getName
-      case decimal: DecimalType => s"${CarbonType.DECIMAL.getName} (${decimal.precision}" +
-                                   s", ${decimal.scale})"
-      case other => sys.error(s"unsupported type: $other")
-    }
-  }
-
-  private def makeCreateTableString(schema: StructType, options: CarbonOption): String = {
-    val properties = Map(
-      "DICTIONARY_INCLUDE" -> options.dictionaryInclude,
-      "DICTIONARY_EXCLUDE" -> options.dictionaryExclude
-    ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
-    val carbonSchema = schema.map { field =>
-      s"${ field.name } ${ convertToCarbonType(field.dataType) }"
-    }
-    s"""
-          CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
-          (${ carbonSchema.mkString(", ") })
-          STORED BY '${ CarbonContext.datasourceName }'
-          ${ if (properties.nonEmpty) " TBLPROPERTIES (" + properties + ")" else ""}
-      """
-  }
-
-  private def makeLoadString(csvFolder: String, options: CarbonOption): String = {
-    s"""
-          LOAD DATA INPATH '$csvFolder'
-          INTO TABLE ${options.dbName}.${options.tableName}
-          OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
-          'SINGLE_PASS' = '${options.singlePass}')
-      """
-  }
-
-
-}


Mime
View raw message