carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: insertinto for spark2
Date Sat, 03 Dec 2016 01:52:54 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master d5f409840 -> 5e0a07221


insertinto for spark2


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/c1882f29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/c1882f29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/c1882f29

Branch: refs/heads/master
Commit: c1882f29239f3d4fdf77a8ea51f8a5abe3fc955b
Parents: d5f4098
Author: QiangCai <qiangcai@qq.com>
Authored: Sat Dec 3 01:53:32 2016 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Sat Dec 3 09:43:03 2016 +0800

----------------------------------------------------------------------
 examples/spark2/src/main/resources/data.csv     |   3 +-
 .../carbondata/examples/CarbonExample.scala     | 114 +++++++++++--------
 .../sql/CarbonDatasourceHadoopRelation.scala    |  18 ++-
 .../execution/command/carbonTableSchema.scala   |  26 ++++-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   2 +-
 5 files changed, 109 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c1882f29/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv
index 83ea3b3..fcdf3c1 100644
--- a/examples/spark2/src/main/resources/data.csv
+++ b/examples/spark2/src/main/resources/data.csv
@@ -1,4 +1,3 @@
-shortField,intField,bigintField,doubleField,stringField,timestampField
 1,10,100,48.4,spark,2015/4/23
 5,17,140,43.4,spark,2015/7/27
 1,11,100,44.4,flink,2015/5/23
@@ -8,4 +7,4 @@ shortField,intField,bigintField,doubleField,stringField,timestampField
 2,10,100,43.4,impala,2015/7/23
 1,10,100,43.4,spark,2015/5/23
 4,16,130,42.4,impala,2015/7/23
-1,10,100,43.4,spark,2015/7/23
+1,10,100,43.4,spark,2015/7/23
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c1882f29/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index d3a7e86..59cc4e9 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -17,31 +17,49 @@
 
 package org.apache.spark.sql.examples
 
+import java.io.File
+
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.TableLoader
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 
 object CarbonExample {
 
   def main(args: Array[String]): Unit = {
-    // to run the example, plz change this path to your local machine path
-    val rootPath = "/home/david/Documents/incubator-carbondata"
+    val rootPath = new File(this.getClass.getResource("/").getPath
+        + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+    // clean data folder
+    if (true) {
+      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+      clean(storeLocation)
+      clean(warehouse)
+      clean(metastoredb)
+    }
+
     val spark = SparkSession
         .builder()
         .master("local")
         .appName("CarbonExample")
         .enableHiveSupport()
-        .config(CarbonCommonConstants.STORE_LOCATION,
-          s"$rootPath/examples/spark2/target/store")
+        .config("carbon.kettle.home",
+          s"$rootPath/processing/carbonplugins")
+        .config("carbon.storelocation", storeLocation)
+        .config("spark.sql.warehouse.dir", warehouse)
+        .config("javax.jdo.option.ConnectionURL",
+          s"jdbc:derby:;databaseName=$metastoredb;create=true")
         .getOrCreate()
     spark.sparkContext.setLogLevel("WARN")
 
-    // Drop table
-//    spark.sql("DROP TABLE IF EXISTS carbon_table")
-//    spark.sql("DROP TABLE IF EXISTS csv_table")
-//
-//    // Create table
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    // Create table
     spark.sql(
       s"""
          | CREATE TABLE carbon_table(
@@ -49,47 +67,47 @@ object CarbonExample {
          |    intField int,
          |    bigintField long,
          |    doubleField double,
-         |    stringField string
+         |    stringField string,
+         |    timestampField timestamp
          | )
          | USING org.apache.spark.sql.CarbonSource
        """.stripMargin)
 
-    val prop = s"$rootPath/conf/dataload.properties.template"
-    val tableName = "carbon_table"
+    // val prop = s"$rootPath/conf/dataload.properties.template"
+    // val tableName = "carbon_table"
     val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
-    TableLoader.main(Array[String](prop, tableName, path))
-
-//    spark.sql(
-//      s"""
-//         | CREATE TABLE csv_table
-//         | (ID int,
-//         | date timestamp,
-//         | country string,
-//         | name string,
-//         | phonetype string,
-//         | serialname string,
-//         | salary int)
-//       """.stripMargin)
-//
-//    spark.sql(
-//      s"""
-//         | LOAD DATA LOCAL INPATH '$csvPath'
-//         | INTO TABLE csv_table
-//       """.stripMargin)
-
-//    spark.sql(
-//      s"""
-//         | INSERT INTO TABLE carbon_table
-//         | SELECT * FROM csv_table
-//       """.stripMargin)
-
-    // Perform a query
-//    spark.sql("""
-//           SELECT country, count(salary) AS amount
-//           FROM carbon_table
-//           WHERE country IN ('china','france')
-//           GROUP BY country
-//           """).show()
+    // TableLoader.main(Array[String](prop, tableName, path))
+
+    spark.sql(
+      s"""
+         | CREATE TABLE csv_table
+         | (  shortField short,
+         |    intField int,
+         |    bigintField long,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField string)
+         |    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE csv_table
+       """.stripMargin)
+
+    spark.sql("""
+             SELECT *
+             FROM csv_table
+              """).show
+
+    spark.sql(
+      s"""
+         | INSERT INTO TABLE carbon_table
+         | SELECT shortField, intField, bigintField, doubleField, stringField,
+         | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField
+         | FROM csv_table
+       """.stripMargin)
 
     spark.sql("""
              SELECT *
@@ -115,7 +133,7 @@ object CarbonExample {
       """.stripMargin).show
 
     // Drop table
-//    spark.sql("DROP TABLE IF EXISTS carbon_table")
-//    spark.sql("DROP TABLE IF EXISTS csv_table")
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+    spark.sql("DROP TABLE IF EXISTS csv_table")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c1882f29/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 3b951ba..0ab5b6c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -21,11 +21,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.{ExecutedCommandExec, LoadTableByInsert}
 import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan}
 import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonProjection}
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.scan.expression.Expression
@@ -41,7 +43,7 @@ case class CarbonDatasourceHadoopRelation(
     paths: Array[String],
     parameters: Map[String, String],
     tableSchema: Option[StructType])
-  extends BaseRelation {
+  extends BaseRelation with InsertableRelation {
 
   lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
   lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
@@ -75,4 +77,16 @@ case class CarbonDatasourceHadoopRelation(
       absIdentifier, carbonTable)
   }
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
+
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    if (carbonRelation.output.size > CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS)
{
+      sys.error("Maximum supported column by carbon is:" +
+          CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS)
+    }
+    if(data.logicalPlan.output.size >= carbonRelation.output.size) {
+      LoadTableByInsert(this, data.logicalPlan).run(sparkSession)
+    } else {
+      sys.error("Cannot insert into target table because column number are different")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c1882f29/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 794de02..135a5f6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -25,6 +25,7 @@ import scala.language.implicitConversions
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.types.TimestampType
@@ -47,7 +48,8 @@ import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
+
 
 /**
  * Command for the compaction in alter table command
@@ -290,6 +292,28 @@ case class DeleteLoadsByLoadDate(
 
 }
 
+case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation,
+                                          child: LogicalPlan) extends RunnableCommand {
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    val df = Dataset.ofRows(sparkSession, child)
+    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
+    val load = LoadTable(
+      Some(relation.carbonRelation.databaseName),
+      relation.carbonRelation.tableName,
+      null,
+      Seq(),
+      scala.collection.immutable.Map(("fileheader" -> header)),
+      false,
+      null,
+      Some(df)).run(sparkSession)
+    // updating relation metadata. This is in case of auto detect high cardinality
+    relation.carbonRelation.metaData =
+        CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
+    load
+  }
+}
+
 case class LoadTable(
     databaseNameOp: Option[String],
     tableName: String,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c1882f29/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 98b481e..2fde552 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -665,7 +665,7 @@ object CarbonMetastoreTypes extends RegexParsers {
 case class CarbonRelation(
     databaseName: String,
     tableName: String,
-    metaData: CarbonMetaData,
+    var metaData: CarbonMetaData,
     tableMeta: TableMeta,
     alias: Option[String])
   extends LeafNode with MultiInstanceRelation {


Mime
View raw message