carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject carbondata git commit: [CARBONDATA-1843] Block CTAS and external table feature
Date Mon, 04 Dec 2017 04:16:39 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 6ae1f1b61 -> 25c28242a


[CARBONDATA-1843] Block CTAS and external table feature

This closes #1604


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/25c28242
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/25c28242
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/25c28242

Branch: refs/heads/master
Commit: 25c28242a81a33918e2bf4f84a3a93b1415c9c83
Parents: 6ae1f1b
Author: Jacky Li <jacky.likun@qq.com>
Authored: Sat Dec 2 22:46:56 2017 +0800
Committer: QiangCai <qiangcai@qq.com>
Committed: Mon Dec 4 12:15:48 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/util/CarbonProperties.java  |  1 +
 .../TestDataWithDicExcludeAndInclude.scala      | 23 +++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  3 ++-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  2 +-
 .../org/apache/spark/sql/CarbonSession.scala    | 24 +++++++++++++++++---
 .../management/CarbonShowLoadsCommand.scala     |  4 ++--
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 16 +++++++++----
 .../src/main/spark2.1/CarbonSessionState.scala  |  3 ++-
 .../src/main/spark2.2/CarbonSessionState.scala  |  3 ++-
 9 files changed, 66 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/25c28242/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index daad410..281ee15 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -989,4 +989,5 @@ public final class CarbonProperties {
   public Map<String, String> getAddedProperty() {
     return addedProperty;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/25c28242/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
index ed3ff24..484c304 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
@@ -17,7 +17,9 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import org.apache.spark.sql.AnalysisException
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
@@ -88,6 +90,27 @@ class TestLoadDataWithDictionaryExcludeAndInclude extends QueryTest with
BeforeA
     )
   }
 
+  test("test create external table should fail") {
+    assert(intercept[AnalysisException](
+      sql(
+        """
+          | CREATE EXTERNAL TABLE t1 (id string, value int)
+          | STORED BY 'carbondata'
+        """.stripMargin)
+    ).message.contains("Operation not allowed: CREATE EXTERNAL TABLE"))
+  }
+
+  test("test CTAS should fail") {
+    assert(intercept[AnalysisException](
+      sql(
+        """
+          | CREATE TABLE t1 (id string, value int)
+          | STORED BY 'carbondata'
+          | AS SELECT 'ABC', 1 FROM t2
+        """.stripMargin)
+    ).message.contains("Operation not allowed: CREATE TABLE AS SELECT"))
+  }
+
   override def afterAll {
     dropTable
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/25c28242/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 171b71b..11e5baf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -60,7 +60,8 @@ class CarbonScanRDD(
     var filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
     @transient serializedTableInfo: Array[Byte],
-    @transient tableInfo: TableInfo, inputMetricsStats: InitInputMetrics)
+    @transient tableInfo: TableInfo,
+    inputMetricsStats: InitInputMetrics)
   extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/25c28242/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 70645f6..31ff323 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -73,7 +73,7 @@ case class CarbonDatasourceHadoopRelation(
     requiredColumns.foreach(projection.addColumn)
     val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
     new CarbonScanRDD(
-      sqlContext.sparkContext,
+      sparkSession.sparkContext,
       projection,
       filterExpression.orNull,
       identifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/25c28242/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 288c66e..89cbbe4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.util.{CarbonReflectionUtils, Utils}
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.events._
@@ -58,7 +59,17 @@ class CarbonSession(@transient val sc: SparkContext,
    */
   @transient
  override lazy val sharedState: SharedState = {
-    existingSharedState.getOrElse(new SharedState(sparkContext))
+    existingSharedState match {
+      case Some(_) =>
+        val ss = existingSharedState.get
+        if (ss == null) {
+          new SharedState(sparkContext)
+        } else {
+          ss
+        }
+      case None =>
+        new SharedState(sparkContext)
+    }
   }
 
   override def newSession(): SparkSession = {
@@ -170,8 +181,15 @@ object CarbonSession {
         }
         options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v)
}
         SparkSession.setDefaultSession(session)
-        CommonUtil.cleanInProgressSegments(
-          carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION), sparkContext)
+        try {
+          CommonUtil.cleanInProgressSegments(
+            carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION), sparkContext)
+        } catch {
+          case e: Throwable =>
+            // catch all exceptions to avoid CarbonSession initialization failure
+          LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+            .error(e, "Failed to clean in progress segments")
+        }
         // Register a successfully instantiated context to the singleton. This should be
at the
         // end of the class definition so that the singleton is updated only if there is
no
         // exception in the construction of the instance.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/25c28242/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 7d00fa6..c6898b2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -33,8 +33,8 @@ case class CarbonShowLoadsCommand(
   override def output: Seq[Attribute] = {
     Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
       AttributeReference("Status", StringType, nullable = false)(),
-      AttributeReference("Load Start Time", TimestampType, nullable = false)(),
-      AttributeReference("Load End Time", TimestampType, nullable = true)(),
+      AttributeReference("Load Start Time (GMT+0)", TimestampType, nullable = false)(),
+      AttributeReference("Load End Time (GMT+0)", TimestampType, nullable = true)(),
       AttributeReference("Merged To", StringType, nullable = false)(),
       AttributeReference("File Format", StringType, nullable = false)())
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/25c28242/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index ac38ec9..07dc672 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.parser
 
 import scala.collection.mutable
 
+import org.antlr.v4.runtime.tree.TerminalNode
 import org.apache.spark.sql.{CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
@@ -31,6 +32,8 @@ import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -142,7 +145,8 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       partitionColumns: ColTypeListContext,
       columns : ColTypeListContext,
       tablePropertyList : TablePropertyListContext,
-      tableComment : Option[String]) : LogicalPlan = {
+      tableComment : Option[String],
+      ctas: TerminalNode) : LogicalPlan = {
     // val parser = new CarbonSpark2SqlParser
 
     val (name, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
@@ -158,7 +162,12 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
     if (bucketSpecContext != null) {
       operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
     }
-
+    if (external) {
+      operationNotAllowed("CREATE EXTERNAL TABLE", tableHeader)
+    }
+    if (ctas != null && columns != null) {
+      operationNotAllowed("CREATE TABLE AS SELECT", tableHeader)
+    }
 
     val cols = Option(columns).toSeq.flatMap(visitColTypeList)
     val properties = getPropertyKeyValues(tablePropertyList)
@@ -173,8 +182,6 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
                           duplicateColumns.mkString("[", ",", "]"), columns)
     }
 
-
-
     val tableProperties = mutable.Map[String, String]()
     properties.foreach{property => tableProperties.put(property._1, property._2)}
 
@@ -195,6 +202,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
           partitionColumns)
       }
     }
+
     val fields = parser.getFields(cols ++ partitionByStructFields)
     val options = new CarbonOption(properties)
     // validate tblProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/25c28242/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index 0d6ba66..b6df33e 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -251,7 +251,8 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
extends
           ctx.partitionColumns,
           ctx.columns,
           ctx.tablePropertyList,
-          Option(ctx.STRING()).map(string))
+          Option(ctx.STRING()).map(string),
+          ctx.AS)
     } else {
       super.visitCreateTable(ctx)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/25c28242/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index 7acce97..b3792cd 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -248,7 +248,8 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
extends
           ctx.partitionColumns,
           ctx.columns,
           ctx.tablePropertyList,
-          Option(ctx.STRING()).map(string))
+          Option(ctx.STRING()).map(string),
+          ctx.AS)
     } else {
       super.visitCreateHiveTable(ctx)
     }


Mime
View raw message