carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject carbondata git commit: [CARBONDATA-1924][PARTITION] Restrict streaming on Partitioned table and support PARTITION syntax to the LOAD TABLE command.
Date Thu, 21 Dec 2017 13:17:15 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master a89587e70 -> 577a8b0d5


[CARBONDATA-1924][PARTITION] Restrict streaming on Partitioned table and support PARTITION
syntax to the LOAD TABLE command.

This closes #1699


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/577a8b0d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/577a8b0d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/577a8b0d

Branch: refs/heads/master
Commit: 577a8b0d59d760a35626779fe45dd4d91c6b3328
Parents: a89587e
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Tue Dec 19 20:01:30 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Thu Dec 21 18:46:50 2017 +0530

----------------------------------------------------------------------
 .../StandardPartitionTableLoadingTestCase.scala | 57 ++++++++++++++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  6 +++
 .../management/CarbonLoadDataCommand.scala      | 10 +++-
 .../sql/parser/CarbonSpark2SqlParser.scala      | 22 +++++---
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  3 ++
 5 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 11e95d8..b7010e5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -17,6 +17,7 @@
 package org.apache.carbondata.spark.testsuite.standardpartition
 
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -200,6 +201,59 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     validateDataFiles("default_singlepasspartitionone", "0", 1)
   }
 
+  test("data loading for partition table for one static partition column with load syntax")
{
+    sql(
+      """
+        | CREATE TABLE loadstaticpartitionone (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitionone
PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select distinct empno from loadstaticpartitionone"), Seq(Row(1)))
+  }
+
+  test("overwrite partition table for one static partition column with load syntax") {
+    sql(
+      """
+        | CREATE TABLE loadstaticpartitiononeoverwrite (empname String, designation String,
doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite
PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    val rows = sql("select count(*) from loadstaticpartitiononeoverwrite").collect()
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite
PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite
PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE loadstaticpartitiononeoverwrite
PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select count(*) from loadstaticpartitiononeoverwrite"), rows)
+  }
+
+  test("Restrict streaming on partitioned table") {
+    intercept[AnalysisException] {
+      sql(
+        """
+          | CREATE TABLE streamingpartitionedtable (empname String, designation String, doj
+          | Timestamp,
+          |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+          |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+          |  utilization int,salary int)
+          | PARTITIONED BY (empno int)
+          | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('streaming'='true')
+        """.stripMargin)
+    }
+  }
+
 
   override def afterAll = {
     dropTable
@@ -215,6 +269,9 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql("drop table if exists insertpartitionthree")
     sql("drop table if exists staticpartitionone")
     sql("drop table if exists singlepasspartitionone")
+    sql("drop table if exists loadstaticpartitionone")
+    sql("drop table if exists loadstaticpartitiononeoverwrite")
+    sql("drop table if exists streamingpartitionedtable")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 6e9b36c..129e6b3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -978,6 +978,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
       case _ => ("", "")
     }
 
+  protected lazy val partitions: Parser[(String, String)] =
+    (ident <~ "=") ~ stringLit ^^ {
+      case opt ~ optvalue => (opt.trim, optvalue)
+      case _ => ("", "")
+    }
+
   protected lazy val valueOptions: Parser[(Int, Int)] =
     (numericLit <~ ",") ~ numericLit ^^ {
       case opt ~ optvalue => (opt.toInt, optvalue.toInt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 7492951..f96c0a7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -558,8 +558,14 @@ case class CarbonLoadDataCommand(
         }
 
       // Only select the required columns
-      Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
-        LogicalRDD(attributes, rdd)(sparkSession))
+      val output = if (partition.nonEmpty) {
+        relation.output.map{ attr =>
+          attributes.find(_.name.equalsIgnoreCase(attr.name)).get
+        }.filter(attr => partition.get(attr.name).isEmpty)
+      } else {
+        relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
+      }
+      Project(output, LogicalRDD(attributes, rdd)(sparkSession))
     }
     val convertRelation = relation match {
       case l: LogicalRelation =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index dad0e3e..5d00a0c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -351,8 +351,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val loadDataNew: Parser[LogicalPlan] =
     LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
     (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
+    (PARTITION ~>"("~> repsep(partitions, ",") <~ ")").? ~
     (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
-      case filePath ~ isOverwrite ~ table ~ optionsList =>
+      case filePath ~ isOverwrite ~ table ~ partitions ~ optionsList =>
         val (databaseNameOp, tableName) = table match {
           case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
         }
@@ -360,13 +361,20 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           validateOptions(optionsList)
         }
         val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
+        val partitionSpec = partitions.getOrElse(List.empty[(String, String)]).toMap
         CarbonLoadDataCommand(
-          convertDbNameToLowerCase(databaseNameOp),
-          tableName,
-          filePath,
-          Seq(),
-          optionsMap,
-          isOverwrite.isDefined)
+          databaseNameOp = convertDbNameToLowerCase(databaseNameOp),
+          tableName = tableName,
+          factPathFromUser = filePath,
+          dimFilesPath = Seq(),
+          options = optionsMap,
+          isOverwriteTable = isOverwrite.isDefined,
+          inputSqlString = null,
+          dataFrame = None,
+          updateModel = None,
+          tableInfoOp = None,
+          internalOptions = Map.empty,
+          partition = partitionSpec.map { case (key, value) => (key, Some(value))})
     }
 
   protected lazy val deleteLoadsByID: Parser[LogicalPlan] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 3597208..211e0ef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -233,6 +233,9 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
       case _ =>
         // ignore this case
     }
+    if (partitionFields.nonEmpty && options.isStreaming) {
+      operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns)
+    }
     // validate tblProperties
     val bucketFields = parser.getBucketFields(tableProperties, fields, options)
     // prepare table model of the collected tokens


Mime
View raw message