carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject carbondata git commit: [CARBONDATA-1611][Streaming] Reject Update and Delete operation for streaming table
Date Tue, 07 Nov 2017 13:59:55 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master ae280e239 -> 74bd52b66


[CARBONDATA-1611][Streaming] Reject Update and Delete operation for streaming table

This closes #1447


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/74bd52b6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/74bd52b6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/74bd52b6

Branch: refs/heads/master
Commit: 74bd52b66f7dae938c1d993e5dc3a7a225227866
Parents: ae280e2
Author: Jacky Li <jacky.likun@qq.com>
Authored: Sun Oct 29 21:31:21 2017 +0530
Committer: QiangCai <qiangcai@qq.com>
Committed: Tue Nov 7 21:58:48 2017 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |  8 +++
 .../mutation/ProjectForUpdateCommand.scala      | 12 +---
 .../strategy/StreamingTableStrategy.scala       | 62 ++++++++++++++++++++
 .../spark/sql/hive/CarbonSessionState.scala     |  8 ++-
 .../TestStreamingTableOperation.scala           | 59 +++++++++++++++++++
 5 files changed, 136 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index a6738a3..e1a7143 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -657,4 +657,12 @@ public class CarbonTable implements Serializable {
   public TableInfo getTableInfo() {
     return tableInfo;
   }
+
+  /**
+   * Return true if this is a streaming table (table with property "streaming"="true")
+   */
+  public boolean isStreamingTable() {
+    String streaming = getTableInfo().getFactTable().getTableProperties().get("streaming");
+    return streaming != null && streaming.equalsIgnoreCase("true");
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index 5e9d31f..2088396 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -43,13 +43,6 @@ private[sql] case class ProjectForUpdateCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
-
-    //  sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
-    //  .EXECUTION_ID_KEY, null)
-    // DataFrame(sqlContext, plan).show(truncate = false)
-    // return Seq.empty
-
-
     val res = plan find {
       case relation: LogicalRelation if relation.relation
         .isInstanceOf[CarbonDatasourceHadoopRelation] =>
@@ -63,9 +56,6 @@ private[sql] case class ProjectForUpdateCommand(
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
       asInstanceOf[CarbonRelation]
-    //    val relation = CarbonEnv.get.carbonMetastore
-    //      .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext).
-    //      asInstanceOf[CarbonRelation]
     val carbonTable = relation.tableMeta.carbonTable
     val metadataLock = CarbonLockFactory
       .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
@@ -75,7 +65,7 @@ private[sql] case class ProjectForUpdateCommand(
     val currentTime = CarbonUpdateUtil.readCurrentTime
     //    var dataFrame: DataFrame = null
     var dataSet: DataFrame = null
-    var isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
+    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
     try {
       lockStatus = metadataLock.lockWithRetries()
       if (lockStatus) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
new file mode 100644
index 0000000..0f0bc24
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.strategy
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand,
ProjectForUpdateCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Strategy for streaming table, like blocking unsupported operation
+ */
+private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends SparkStrategy
{
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    plan match {
+      case update@ProjectForUpdateCommand(_, tableIdentifier) =>
+        rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Data
update")
+        ExecutedCommandExec(update) :: Nil
+      case delete@ProjectForDeleteCommand(_, tableIdentifier, _) =>
+        rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Date
delete")
+        ExecutedCommandExec(delete) :: Nil
+      case _ => Nil
+    }
+  }
+
+  /**
+   * Validate whether Update operation is allowed for specified table in the command
+   */
+  private def rejectIfStreamingTable(tableIdentifier: TableIdentifier, operation: String):
Unit = {
+    val streaming = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(tableIdentifier)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .tableMeta
+      .carbonTable
+      .isStreamingTable
+    if (streaming) {
+      throw new MalformedCarbonCommandException(
+        s"$operation is not allowed for streaming table")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 6892dad..9cad7b0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.SparkOptimizer
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy}
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
@@ -133,7 +133,11 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
   override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
 
   experimentalMethods.extraStrategies =
-    Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
+    Seq(
+      new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy,
+      new DDLStrategy(sparkSession)
+    )
   experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
 
   override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
new file mode 100644
index 0000000..2c1c6b8
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+    sql("DROP DATABASE IF EXISTS streaming CASCADE")
+    sql("CREATE DATABASE streaming")
+    sql("USE streaming")
+    sql(
+      """
+        | create table source(
+        |    c1 string,
+        |    c2 int,
+        |    c3 string,
+        |    c5 string
+        | ) STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES ('streaming' = 'true')
+      """.stripMargin)
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE source""")
+  }
+
+
+  test("test blocking update and delete operation on streaming table") {
+    intercept[MalformedCarbonCommandException] {
+      sql("""UPDATE source d SET (d.c2) = (d.c2 + 1) WHERE d.c1 = 'a'""").show()
+    }
+    intercept[MalformedCarbonCommandException] {
+      sql("""DELETE FROM source WHERE d.c1 = 'a'""").show()
+    }
+  }
+
+  override def afterAll {
+    sql("USE default")
+    sql("DROP DATABASE IF EXISTS streaming CASCADE")
+  }
+}


Mime
View raw message