carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [06/22] incubator-carbondata git commit: IUD Update and delete DML support
Date Fri, 06 Jan 2017 13:57:06 GMT
IUD Update and delete DML support


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/4d688f2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/4d688f2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/4d688f2a

Branch: refs/heads/master
Commit: 4d688f2a1837f2abaa45adb16f53c992fb139d3c
Parents: a2a081d
Author: vinodkc <vinod.kc.in@gmail.com>
Authored: Fri Dec 30 14:19:16 2016 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jan 6 19:16:29 2017 +0530

----------------------------------------------------------------------
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +
 .../spark/sql/CarbonCatalystOperators.scala     |  24 ++
 .../org/apache/spark/sql/CarbonContext.scala    |   5 +
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   3 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  |  13 +-
 .../sql/execution/command/IUDCommands.scala     |  55 +++
 .../spark/sql/hive/CarbonAnalysisRules.scala    | 172 ++++++++
 .../sql/hive/CarbonPreInsertionCasts.scala      |  59 ---
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  28 +-
 .../spark/src/test/resources/IUD/dest.csv       |   6 +
 .../spark/src/test/resources/IUD/other.csv      |   3 +
 .../spark/src/test/resources/IUD/sample.csv     |   4 +
 .../src/test/resources/IUD/sample_updated.csv   |   2 +
 .../spark/src/test/resources/IUD/source2.csv    |   3 +
 .../spark/src/test/resources/IUD/update01.csv   |   6 +
 .../iud/DeleteCarbonTableTestCase.scala         | 130 ++++++
 .../iud/UpdateCarbonTableTestCase.scala         | 394 +++++++++++++++++++
 17 files changed, 844 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 09cde7b..dfb2925 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -109,11 +109,13 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val RELATION = carbonKeyWord("RELATION")
   protected val SCHEMA = carbonKeyWord("SCHEMA")
   protected val SCHEMAS = carbonKeyWord("SCHEMAS")
+  protected val SET = Keyword("SET")
   protected val SHOW = carbonKeyWord("SHOW")
   protected val TABLES = carbonKeyWord("TABLES")
   protected val TABLE = carbonKeyWord("TABLE")
   protected val TERMINATED = carbonKeyWord("TERMINATED")
   protected val TYPE = carbonKeyWord("TYPE")
+  protected val UPDATE = carbonKeyWord("UPDATE")
   protected val USE = carbonKeyWord("USE")
   protected val WHERE = carbonKeyWord("WHERE")
   protected val WITH = carbonKeyWord("WITH")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 01904ba..c21a8f5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
 import org.apache.spark.sql.hive.HiveContext
@@ -106,6 +107,29 @@ case class DropDatabase(dbName: String, isCascade: Boolean, sql: String)
   }
 }
 
+case class ProjectForUpdate(
+    table: UnresolvedRelation,
+    columns: List[String],
+    children: Seq[LogicalPlan] ) extends LogicalPlan with Command {
+  override def output: Seq[AttributeReference] = Seq.empty
+}
+
+case class UpdateTable(
+    table: UnresolvedRelation,
+    columns: List[String],
+    selectStmt: String,
+    filer: String) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq.empty
+  override def output: Seq[AttributeReference] = Seq.empty
+}
+
+case class DeleteRecords(
+    statement: String,
+    table: UnresolvedRelation) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq.empty
+  override def output: Seq[AttributeReference] = Seq.empty
+}
+
 /**
  * A logical plan representing insertion into Hive table.
  * This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index efaa57b..51b1943 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -77,6 +77,7 @@ class CarbonContext(
       override val extendedResolutionRules =
         catalog.ParquetConversions ::
         catalog.CreateTables ::
+        CarbonIUDAnalysisRule ::
         CarbonPreInsertionCasts ::
         ExtractPythonUDFs ::
         ResolveHiveWindowFunction ::
@@ -183,3 +184,7 @@ object CarbonContext {
     cache(sc) = cc
   }
 }
+
+object SQLParser {
+  def parse(sql: String, sqlContext: SQLContext): LogicalPlan = sqlContext.parseSql(sql)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index f7f0802..36cd6f2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.sql.hive.CarbonMetastore
+import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule, CarbonMetastore}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
@@ -41,6 +41,7 @@ object CarbonEnv {
       val cc = sqlContext.asInstanceOf[CarbonContext]
       val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "")
       carbonEnv = CarbonEnv(catalog)
+      CarbonIUDAnalysisRule.init(sqlContext)
       initialized = true
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 7318e27..73c44d6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.sql
 
+import java.util.regex.{Matcher, Pattern}
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable.LinkedHashSet
 import scala.collection.mutable.Map
 import scala.language.implicitConversions
+import scala.util.matching.Regex
 
 import org.apache.hadoop.hive.ql.lib.Node
 import org.apache.hadoop.hive.ql.parse._
@@ -27,11 +31,16 @@ import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.ExplainCommand
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveQlWrapper
 
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -59,8 +68,8 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
   override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
 
   protected lazy val startCommand: Parser[LogicalPlan] =
-    createDatabase | dropDatabase | loadManagement | describeTable |
-    showLoads | alterTable | createTable
+    createDatabase | dropDatabase | loadManagement | describeTable | showLoads | alterTable |
+    updateTable | deleteRecords |  createTable
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
new file mode 100644
index 0000000..a7d831d
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.RunnableCommand
+
+/**
+ * IUD update delete and compaction framework.
+ *
+ */
+
+private[sql] case class ProjectForDeleteCommand(
+     plan: LogicalPlan,
+     tableIdentifier: Seq[String],
+     timestamp: String) extends RunnableCommand {
+
+  val LOG = LogServiceFactory.getLogService(this.getClass.getName)
+  var horizontalCompactionFailed = false
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    DataFrame(sqlContext, plan).show(truncate = false)
+    Seq.empty
+  }
+}
+
+private[sql] case class ProjectForUpdateCommand(
+    plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand {
+  val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
+      .EXECUTION_ID_KEY, null)
+    DataFrame(sqlContext, plan).show(truncate = false)
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
new file mode 100644
index 0000000..15ac2f5
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+/**
+ * Insert into carbon table from other source
+ */
+object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
+      // Wait until children are resolved.
+      case p: LogicalPlan if !p.childrenResolved => p
+
+      case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
+           if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
+        castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
+    }
+
+    def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
+      : LogicalPlan = {
+      if (relation.carbonRelation.output.size > CarbonCommonConstants
+        .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
+        sys
+          .error("Maximum supported column by carbon is:" + CarbonCommonConstants
+            .DEFAULT_MAX_NUMBER_OF_COLUMNS
+          )
+      }
+      if (child.output.size >= relation.carbonRelation.output.size ) {
+        InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
+      } else {
+         sys.error("Cannot insert into target table because column number are different")
+      }
+    }
+  }
+
+object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
+
+  var sqlContext: SQLContext = _
+
+  def init(sqlContext: SQLContext) {
+    this.sqlContext = sqlContext
+  }
+
+  private def processUpdateQuery(
+                   table: UnresolvedRelation,
+                   columns: List[String],
+                   selectStmt: String,
+                   filter: String): LogicalPlan = {
+    var includedDestColumns = false
+    var includedDestRelation = false
+    var addedTupleId = false
+
+    def prepareTargetReleation(relation: UnresolvedRelation): Subquery = {
+      val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
+        Seq.empty, isDistinct = false), "tupleId")())
+      val projList = Seq(
+        UnresolvedAlias(UnresolvedStar(table.alias))/*, tupleId*/)
+      // include tuple id and rest of the required columns in subqury
+      Subquery(table.alias.getOrElse(""), Project(projList, relation))
+    }
+    // get the un-analyzed logical plan
+    val targetTable = prepareTargetReleation(table)
+    val selectPlan = org.apache.spark.sql.SQLParser.parse(selectStmt, sqlContext) transform {
+      case Project(projectList, child) if (!includedDestColumns) =>
+        includedDestColumns = true
+        if (projectList.size != columns.size) {
+          sys.error("Number of source and destination columns are not matching")
+        }
+        val renamedProjectList = projectList.zip(columns).map{ case(attr, col) =>
+          attr match {
+            case UnresolvedAlias(child) =>
+              UnresolvedAlias(Alias(child, col + "-updatedColumn")())
+            case _ => attr
+          }
+        }
+        val list = Seq(
+          UnresolvedAlias(UnresolvedStar(table.alias))) ++ renamedProjectList
+        Project(list, child)
+      case Filter(cond, child) if (!includedDestRelation) =>
+        includedDestRelation = true
+        Filter(cond, Join(child, targetTable, Inner, None))
+      case r @ UnresolvedRelation(t, a) if (!includedDestRelation &&
+        t != table.tableIdentifier) =>
+        includedDestRelation = true
+        Join(r, targetTable, Inner, None)
+    }
+    val updatedSelectPlan = if (!includedDestRelation) {
+      // special case to handle self join queries
+      // Eg. update tableName  SET (column1) = (column1+1)
+      selectPlan transform {
+        case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
+          addedTupleId == false) =>
+          addedTupleId = true
+          targetTable
+      }
+    } else {
+      selectPlan
+    }
+    val finalPlan = if (filter.length > 0) {
+      val alias = table.alias.getOrElse("")
+      var transformed: Boolean = false
+      // Create a dummy projection to include filter conditions
+      SQLParser.parse("select * from  " +
+        table.tableIdentifier.mkString(".") + " " + alias + " " + filter, sqlContext)  transform {
+        case UnresolvedRelation(t, Some(a)) if (
+          !transformed && t == table.tableIdentifier && a == alias) =>
+          transformed = true
+          // Add the filter condition of update statement  on destination table
+          Subquery(alias, updatedSelectPlan)
+      }
+    } else {
+      updatedSelectPlan
+    }
+    val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
+    val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
+    val destinationTable = UnresolvedRelation(tidSeq, table.alias)
+    ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
+  }
+
+  def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = {
+    val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
+    val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
+    var addedTupleId = false
+    val selectPlan = SQLParser.parse(selectStmt, sqlContext) transform {
+      case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
+        addedTupleId == false) =>
+        addedTupleId = true
+        val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
+          Seq.empty, isDistinct = false), "tupleId")())
+        val projList = Seq(
+          UnresolvedAlias(UnresolvedStar(table.alias)), tupleId)
+        // include tuple id in subqury
+        Project(projList, relation)
+    }
+    ProjectForDeleteCommand(
+      selectPlan,
+      tidSeq,
+      System.currentTimeMillis().toString)
+  }
+
+  override def apply(logicalplan: LogicalPlan): LogicalPlan = {
+
+    logicalplan transform {
+      case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, sel, where)
+      case DeleteRecords(statement, table) => processDeleteRecordsQuery(statement, table)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonPreInsertionCasts.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonPreInsertionCasts.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonPreInsertionCasts.scala
deleted file mode 100644
index 381697b..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonPreInsertionCasts.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.expressions.Cast
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-/**
- * Insert into carbon table from other source
- */
-object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
-      // Wait until children are resolved.
-      case p: LogicalPlan if !p.childrenResolved => p
-
-      case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
-           if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
-        castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
-    }
-
-    def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
-      : LogicalPlan = {
-      if (relation.carbonRelation.output.size > CarbonCommonConstants
-        .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
-        sys
-          .error("Maximum supported column by carbon is:" + CarbonCommonConstants
-            .DEFAULT_MAX_NUMBER_OF_COLUMNS
-          )
-      }
-      if (child.output.size >= relation.carbonRelation.output.size ) {
-        InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
-      } else {
-         sys.error("Cannot insert into target table because column number are different")
-      }
-    }
-  }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 481ce54..34763ae 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.optimizer
 
 import java.util
 
+import org.apache.spark.sql.execution.command.ProjectForUpdateCommand
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
@@ -72,8 +74,9 @@ object CarbonOptimizer {
 class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
   extends Rule[LogicalPlan] with PredicateHelper {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-  def apply(plan: LogicalPlan): LogicalPlan = {
-    if (relations.nonEmpty && !isOptimized(plan)) {
+  def apply(logicalPlan: LogicalPlan): LogicalPlan = {
+    if (relations.nonEmpty && !isOptimized(logicalPlan)) {
+      val plan = processPlan(logicalPlan)
       LOGGER.info("Starting to optimize plan")
       val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
       val queryStatistic = new QueryStatistic()
@@ -85,7 +88,26 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
       result
     } else {
       LOGGER.info("Skip CarbonOptimizer")
-      plan
+      logicalPlan
+    }
+  }
+
+  private def processPlan(plan: LogicalPlan): LogicalPlan = {
+    plan transform {
+      case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
+        var isTransformed = false
+        val newPlan = updatePlan transform {
+          case Project(pList, child) if (!isTransformed) =>
+            val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
+              .splitAt(pList.size - cols.size)
+            val diff = cols.diff(dest.map(_.name))
+            if (diff.size > 0) {
+              sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
+            }
+            isTransformed = true
+            Project(dest.filter(a => !cols.contains(a.name)) ++ source, child)
+        }
+        ProjectForUpdateCommand(newPlan, table.tableIdentifier)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/test/resources/IUD/dest.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/IUD/dest.csv b/integration/spark/src/test/resources/IUD/dest.csv
new file mode 100644
index 0000000..4ef9aa5
--- /dev/null
+++ b/integration/spark/src/test/resources/IUD/dest.csv
@@ -0,0 +1,6 @@
+c1,c2,c3,c5
+a,1,aa,aaa
+b,2,bb,bbb
+c,3,cc,ccc
+d,4,dd,ddd
+e,5,ee,eee
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/test/resources/IUD/other.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/IUD/other.csv b/integration/spark/src/test/resources/IUD/other.csv
new file mode 100644
index 0000000..69a0e1c
--- /dev/null
+++ b/integration/spark/src/test/resources/IUD/other.csv
@@ -0,0 +1,3 @@
+c1,c2,c3,c5
+a,1,MGM,Disco
+b,2,RGK,Music

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/test/resources/IUD/sample.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/IUD/sample.csv b/integration/spark/src/test/resources/IUD/sample.csv
new file mode 100644
index 0000000..d364136
--- /dev/null
+++ b/integration/spark/src/test/resources/IUD/sample.csv
@@ -0,0 +1,4 @@
+ID,country,name,phonetype,serialname,salary
+4,china,ravz,jio,ASD66902,15003
+1,india,ravi,airtel,ASD90633,15004
+6,usa,manu,alkatel,ASD59961,15005
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/test/resources/IUD/sample_updated.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/IUD/sample_updated.csv b/integration/spark/src/test/resources/IUD/sample_updated.csv
new file mode 100644
index 0000000..b0cf165
--- /dev/null
+++ b/integration/spark/src/test/resources/IUD/sample_updated.csv
@@ -0,0 +1,2 @@
+part0/segment_0/part-0-0-1475753917000.carbondata/0/0,4,china,ravz,Aircel,ASD66902,200000
+part0/segment_0/part-0-0-1475753917000.carbondata/0/2,6,usa,manu,Aircel,ASD59961,15005
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/test/resources/IUD/source2.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/IUD/source2.csv b/integration/spark/src/test/resources/IUD/source2.csv
new file mode 100644
index 0000000..cc86bdb
--- /dev/null
+++ b/integration/spark/src/test/resources/IUD/source2.csv
@@ -0,0 +1,3 @@
+c11,c22,c33,c55,c66
+a,1,MGM,Disco,10
+b,2,RGK,Music,8

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/test/resources/IUD/update01.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/IUD/update01.csv b/integration/spark/src/test/resources/IUD/update01.csv
new file mode 100644
index 0000000..35596cb
--- /dev/null
+++ b/integration/spark/src/test/resources/IUD/update01.csv
@@ -0,0 +1,6 @@
+imei,age,task,num,level,name
+imei0,2147,9279,100.05,100.055,fegt
+imei1,-2148,-9807,10.05,100.05,lrhkr
+imei2,2147,9279,100.05,100.055,dfegt
+imei3,-217,-9206,100.005,100.05,lrhkr
+imei4,10,0,15.5,45,Lily
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
new file mode 100644
index 0000000..169249c
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.iud
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+
+    sql("use default")
+    sql("drop database  if exists iud_db cascade")
+    sql("create database  iud_db")
+
+    sql("""create table iud_db.source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/source2.csv' INTO table iud_db.source2""")
+    sql("use iud_db")
+  }
+  test("delete data from carbon table with alias [where clause ]") {
+
+  }
+  /*test("delete data from carbon table with alias [where clause ]") {
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
+    sql("""delete from iud_db.dest d where d.c1 = 'a'""")
+    // sql("""select c2 from iud_db.dest""").show()
+    checkAnswer(
+      sql("""select c2 from iud_db.dest"""),
+      Seq(Row(2), Row(3),Row(4), Row(5))
+    )
+  }
+  test("delete data from  carbon table[where clause ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
+    sql("""delete from dest where c2 = 2""")
+    // sql("""select c1 from dest""").show()
+    checkAnswer(
+      sql("""select c1 from dest"""),
+      Seq(Row("a"), Row("c"), Row("d"), Row("e"))
+    )
+  }
+  test("delete data from  carbon table[where IN  ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
+    sql("""delete from dest where c1 IN ('d', 'e')""")
+    // sql("""select c1 from dest""").show()
+    checkAnswer(
+      sql("""select c1 from dest"""),
+      Seq(Row("a"), Row("b"),Row("c"))
+    )
+  }
+
+  test("delete data from  carbon table[with alias No where clause]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
+    sql("""delete from iud_db.dest a""")
+    // sql("""select c1 from iud_db.dest""").show()
+    checkAnswer(
+      sql("""select c1 from iud_db.dest"""),
+      Seq()
+    )
+  }
+  test("delete data from  carbon table[No alias No where clause]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
+    sql("""delete from dest""").show()
+    // sql("""select c1 from dest""").show()
+    checkAnswer(
+      sql("""select c1 from dest"""),
+      Seq()
+    )
+  }
+  test("delete data from  carbon table[where IN (sub query) ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
+    //sql("""delete from  iud_db.dest where c1 IN (select c11 from source2)""").show()
+    sql("""delete from  iud_db.dest where c1 IN (select c11 from source2)""").show(truncate = false)
+    checkAnswer(
+      sql("""select c1 from iud_db.dest"""),
+      Seq(Row("c"), Row("d"), Row("e"))
+    )
+  }
+  test("delete data from  carbon table[where IN (sub query with where clause) ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
+    sql("""delete from  iud_db.dest where c1 IN (select c11 from source2 where c11 = 'b')""").show()
+    checkAnswer(
+      sql("""select c1 from iud_db.dest"""),
+      Seq(Row("a"), Row("c"), Row("d"), Row("e"))
+    )
+  }
+  test("delete data from  carbon table[where numeric condition  ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
+    sql("""delete from  iud_db.dest where c2 >= 4""").show()
+    checkAnswer(
+      sql("""select count(*) from iud_db.dest"""),
+      Seq(Row(3))
+    )
+  }*/
+  override def afterAll {
+    sql("use default")
+    sql("drop database  if exists iud_db cascade")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4d688f2a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
new file mode 100644
index 0000000..805444f
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.iud
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+
+    sql("drop database if exists iud cascade")
+    sql("create database  iud")
+    sql("use iud")
+    sql("""create table iud.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest""")
+    sql("""create table iud.source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/source2.csv' INTO table iud.source2""")
+    sql("""create table iud.other (c1 string,c2 int) STORED BY 'org.apache.carbondata.format'""")
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/other.csv' INTO table iud.other""")
+    sql("""create table iud.hdest (c1 string,c2 int,c3 string,c5 string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.hdest""")
+    sql("""CREATE TABLE iud.update_01(imei string,age int,task bigint,num double,level decimal(10,3),name string)STORED BY 'org.apache.carbondata.format' """)
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/update01.csv' INTO TABLE iud.update_01 OPTIONS('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 'BAD_RECORDS_ACTION' = 'FORCE') """)
+   /* CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")*/
+  }
+
+  test("test update operation with 0 rows updation.") {
+  }
+
+/*
+
+  test("test update operation with 0 rows updation.") {
+    sql("""drop table iud.zerorows""").show
+    sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.zerorows""")
+    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a' and exists( select * from iud.other o where o.c2 > 1)""").show()
+    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'xxx'""").show()
+     checkAnswer(
+      sql("""select c1,c2,c3,c5 from iud.zerorows"""),
+      Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
+    )
+    sql("""drop table iud.zerorows""").show
+
+
+  }
+
+
+    test("update carbon table[select from source table with where and exist]") {
+      sql("""drop table iud.dest11""").show
+      sql("""create table iud.dest11 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+      sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest11""")
+      sql("""update iud.dest11 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where  exists( select * from iud.other o where o.c2 > 1)""").show()
+      checkAnswer(
+        sql("""select c3,c5 from iud.dest11"""),
+        Seq(Row("cc","ccc"), Row("dd","ddd"),Row("ee","eee"), Row("MGM","Disco"),Row("RGK","Music"))
+      )
+      sql("""drop table iud.dest11""").show
+   }
+
+  test("update carbon table[using destination table columns with where and exist]") {
+    sql("""drop table iud.dest22""").show
+    sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest22""")
+    checkAnswer(
+      sql("""select c2 from iud.dest22 where c1='a'"""),
+      Seq(Row(1))
+    )
+    sql("""update dest22 d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a' and exists( select * from iud.other o where o.c2 > 1)""").show()
+    checkAnswer(
+      sql("""select c2 from iud.dest22 where c1='a'"""),
+      Seq(Row(2))
+    )
+    sql("""drop table iud.dest22""").show
+   }
+
+    test("update carbon table without alias in set columns") {
+      sql("""drop table iud.dest33""").show
+      sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+      sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest33""")
+     sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a' and exists( select * from iud.other o where o.c2 > 1)""").show()
+      checkAnswer(
+        sql("""select c3,c5 from iud.dest33 where c1='a'"""),
+        Seq(Row("MGM","Disco"))
+      )
+      sql("""drop table iud.dest33""").show
+  }
+
+   test("update carbon table without alias in set three columns") {
+     sql("""drop table iud.dest44""").show
+     sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest44""")
+     sql("""update iud.dest44 d set (c1,c3,c5 ) = (select s.c11, s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+     checkAnswer(
+       sql("""select c1,c3,c5 from iud.dest44 where c1='a'"""),
+       Seq(Row("a","MGM","Disco"))
+     )
+     sql("""drop table iud.dest44""").show
+   }
+
+    test("update carbon table[single column select from source with where and exist]") {
+      sql("""drop table iud.dest55""").show
+      sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+      sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest55""")
+     sql("""update iud.dest55 d set (c3)  = (select s.c33 from iud.source2 s where d.c1 = s.c11) where exists( select * from iud.other o where o.c2 > 1)""").show()
+      checkAnswer(
+        sql("""select c1,c3 from iud.dest55 """),
+        Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
+      )
+      sql("""drop table iud.dest55""").show
+   }
+
+  test("update carbon table[single column SELECT from source with where and exist]") {
+    sql("""drop table iud.dest55""").show
+    sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest55""")
+    sql("""update iud.dest55 d set (c3)  = (SELECT s.c33 from iud.source2 s where d.c1 = s.c11) where exists( select * from iud.other o where o.c2 > 1)""").show()
+    checkAnswer(
+      sql("""select c1,c3 from iud.dest55 """),
+      Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
+    )
+    sql("""drop table iud.dest55""").show
+  }
+
+   test("update carbon table[using destination table columns without where clause]") {
+     sql("""drop table iud.dest66""").show
+     sql("""create table iud.dest66 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest66""")
+     sql("""update iud.dest66 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))""").show()
+     checkAnswer(
+       sql("""select c2,c5 from iud.dest66 """),
+       Seq(Row(2,"aaaz"),Row(3,"bbbz"),Row(4,"cccz"),Row(5,"dddz"),Row(6,"eeez"))
+     )
+     sql("""drop table iud.dest66""").show
+   }
+
+   test("update carbon table[using destination table columns with where clause]") {
+       sql("""drop table iud.dest77""").show
+       sql("""create table iud.dest77 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+       sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest77""")
+       sql("""update iud.dest77 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) where d.c3 = 'dd'""").show()
+       checkAnswer(
+         sql("""select c2,c5 from iud.dest77 where c3 = 'dd'"""),
+         Seq(Row(5,"dddz"))
+       )
+       sql("""drop table iud.dest77""").show
+   }
+
+   test("update carbon table[using destination table( no alias) columns without where clause]") {
+     sql("""drop table iud.dest88""").show
+     sql("""create table iud.dest88 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest88""")
+     sql("""update iud.dest88  set (c2, c5 ) = (c2 + 1, concat(c5 , "y" ))""").show()
+     checkAnswer(
+       sql("""select c2,c5 from iud.dest88 """),
+       Seq(Row(2,"aaay"),Row(3,"bbby"),Row(4,"cccy"),Row(5,"dddy"),Row(6,"eeey"))
+     )
+     sql("""drop table iud.dest88""").show
+   }
+
+   test("update carbon table[using destination table columns with hard coded value ]") {
+     sql("""drop table iud.dest99""").show
+     sql("""create table iud.dest99 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest99""")
+     sql("""update iud.dest99 d set (c2, c5 ) = (c2 + 1, "xyx")""").show()
+     checkAnswer(
+       sql("""select c2,c5 from iud.dest99 """),
+       Seq(Row(2,"xyx"),Row(3,"xyx"),Row(4,"xyx"),Row(5,"xyx"),Row(6,"xyx"))
+     )
+     sql("""drop table iud.dest99""").show
+   }
+
+   test("update carbon tableusing destination table columns with hard coded value and where condition]") {
+     sql("""drop table iud.dest110""").show
+     sql("""create table iud.dest110 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest110""")
+     sql("""update iud.dest110 d set (c2, c5 ) = (c2 + 1, "xyx") where d.c1 = 'e'""").show()
+     checkAnswer(
+       sql("""select c2,c5 from iud.dest110 where c1 = 'e' """),
+       Seq(Row(6,"xyx"))
+     )
+     sql("""drop table iud.dest110""").show
+   }
+
+   test("update carbon table[using source  table columns with where and exist and no destination table condition]") {
+     sql("""drop table iud.dest120""").show
+     sql("""create table iud.dest120 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest120""")
+     sql("""update iud.dest120 d  set (c3, c5 ) = (select s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11)""").show()
+     checkAnswer(
+       sql("""select c3,c5 from iud.dest120 """),
+       Seq(Row("MGM","Disco"),Row("RGK","Music"),Row("cc","ccc"),Row("dd","ddd"),Row("ee","eee"))
+     )
+     sql("""drop table iud.dest120""").show
+   }
+   test("update carbon table[using destination table where and exist]") {
+     sql("""drop table iud.dest130""").show
+     sql("""create table iud.dest130 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest130""")
+     sql("""update iud.dest130 dd  set (c2, c5 ) = (c2 + 1, "xyx")  where dd.c1 = 'a' and exists( select * from iud.other o where o.c2 > 1)""").show()
+     checkAnswer(
+       sql("""select c2,c5 from iud.dest130 where c1 = 'a' """),
+       Seq(Row(2,"xyx"))
+     )
+     sql("""drop table iud.dest130""").show
+   }
+
+   test("update carbon table[using destination table (concat) where and exist]") {
+     sql("""drop table iud.dest140""").show
+     sql("""create table iud.dest140 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest140""")
+     sql("""update iud.dest140 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))  where d.c1 = 'a' and exists( select * from iud.other o where o.c2 > 1)""").show()
+     checkAnswer(
+       sql("""select c2,c5 from iud.dest140 where c1 = 'a'"""),
+       Seq(Row(2,"aaaz"))
+     )
+     sql("""drop table iud.dest140""").show
+   }
+
+   test("update carbon table[using destination table (concat) with  where") {
+     sql("""drop table iud.dest150""").show
+     sql("""create table iud.dest150 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest150""")
+     sql("""update iud.dest150 d set (c5) = (concat(c5 , "z"))  where d.c1 = 'b'""").show()
+     checkAnswer(
+       sql("""select c5 from iud.dest150 where c1 = 'b' """),
+       Seq(Row("bbbz"))
+     )
+     sql("""drop table iud.dest150""").show
+   }
+
+  test("update table with data for datatype mismatch with column ") {
+    sql("""update iud.update_01 set (imei) = ('skt') where level = 'aaa'""")
+    checkAnswer(
+      sql("""select * from iud.update_01 where imei = 'skt'"""),
+      Seq()
+    )
+  }
+
+   test("update carbon table-error[more columns in source table not allowed") {
+     try {
+       sql("""update iud.dest d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"), "abc")""").show()
+       assert(false)
+     } catch {
+       case ex: Exception =>
+         assertResult("Number of source and destination columns are not matching")(ex.getMessage)
+     }
+   }
+   test("update carbon table-error[no set columns") {
+     try {
+       sql("""update iud.dest d set () = ()""").show()
+       assert(false)
+     } catch {
+       case ex: org.apache.spark.sql.AnalysisException =>
+         assert(true)
+     }
+   }
+   test("update carbon table-error[no set columns with updated column") {
+     try {
+       sql("""update iud.dest d set  = (c1+1)""").show()
+       assert(false)
+     } catch {
+       case ex: org.apache.spark.sql.AnalysisException =>
+         assert(true)
+     }
+   }
+   test("update carbon table-error[one set column with two updated column") {
+     try {
+       sql("""update iud.dest  set c2 = (c2 + 1, concat(c5 , "z") )""").show()
+       assert(false)
+     } catch {
+       case ex: org.apache.spark.sql.AnalysisException =>
+         assert(true)
+     }
+   }
+
+  test("""update carbon [special characters  in value- test parsing logic ]""") {
+    sql("""drop table iud.dest160""").show
+    sql("""create table iud.dest160 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest160""")
+    sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
+    sql("""update iud.dest160 set(c1) =  ('abd$asjdh$adasj$l;sdf$*)$*)(&^')""").show()
+    sql("""update iud.dest160 set(c1) =("\\")""").show()
+    sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
+    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'a\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3 and exists( select * from iud.other o where o.c1 = d.c1)""").show()
+    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3 and exists( select * from iud.other o where o.c1 = d.c1)""").show()
+    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3 and exists( select * from iud.other o where o.c1 = d.c1)""").show()
+    sql("""update iud.dest160 d set (c3,c5)      =     (select s.c33,'a\\a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3 and exists( select * from iud.other o where o.c1 = d.c1)""").show()
+    sql("""update iud.dest160 d set (c3,c5) =(select s.c33,'a\'a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3 and exists( select * from iud.other o where o.c1 = d.c1)""").show()
+    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a\'a\"' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3 and exists( select * from iud.other o where o.c1 = d.c1)""").show()
+    sql("""drop table iud.dest160""").show
+  }
+
+  test("""update carbon [sub query, between and existing in outer condition.(Customer query ) ]""") {
+    sql("""drop table iud.dest170""").show
+    sql("""create table iud.dest170 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest170""")
+    sql("""update iud.dest170 d set (c3)=(select s.c33 from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3 and exists( select * from iud.other o where o.c1 = d.c1)""").show
+    checkAnswer(
+      sql("""select c3 from  iud.dest170 as d where  d.c2 between 1 and 3 and exists( select * from iud.other o where o.c1 = d.c1)"""),
+      Seq(Row("MGM"), Row("RGK"))
+    )
+    sql("""drop table iud.dest170""").show
+  }
+
+  test("""update carbon [self join select query ]""") {
+    sql("""drop table iud.dest171""").show
+    sql("""create table iud.dest171 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest171""")
+    sql("""update iud.dest171 d set (c3)=(select concat(s.c3 , "z") from iud.dest171 s where d.c2 = s.c2)""").show
+    sql("""drop table iud.dest172""").show
+    sql("""create table iud.dest172 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest172""")
+    sql("""update iud.dest172 d set (c3)=( concat(c3 , "z"))""").show
+    checkAnswer(
+      sql("""select c3 from  iud.dest171"""),
+      sql("""select c3 from  iud.dest172""")
+    )
+    sql("""drop table iud.dest171""").show
+    sql("""drop table iud.dest172""").show
+  }
+
+  test("update carbon table-error[closing bracket missed") {
+    try {
+      sql("""update iud.dest d set (c2) = (194""").show()
+      assert(false)
+    } catch {
+      case ex: Exception =>
+        assert(true)
+    }
+  }
+
+  test("update carbon table-error[starting bracket missed") {
+    try {
+      sql("""update iud.dest d set (c2) = 194)""").show()
+      assert(false)
+    } catch {
+      case ex: Exception =>
+        assert(true)
+    }
+  }
+
+  test("update carbon table-error[missing starting and closing bracket") {
+    try {
+      sql("""update iud.dest d set (c2) = 194""").show()
+      assert(false)
+    } catch {
+      case ex: Exception =>
+        assert(true)
+    }
+  }
+
+  test("test create table with column name as tupleID"){
+    try {
+      sql("CREATE table carbontable (empno int, tupleID String, " +
+          "designation String, doj Timestamp, workgroupcategory int, " +
+          "workgroupcategoryname String, deptno int, deptname String, projectcode int, " +
+          "projectjoindate Timestamp, projectenddate Timestamp, attendance int, " +
+          "utilization int,salary int) STORED BY 'org.apache.carbondata.format' " +
+          "TBLPROPERTIES('DICTIONARY_INCLUDE'='empno,workgroupcategory,deptno,projectcode'," +
+          "'DICTIONARY_EXCLUDE'='empname')")
+    }catch  {
+      case ex: Exception => assert(true)
+    }
+  }
+*/
+
+  override def afterAll {
+    sql("use default")
+    sql("drop database  if exists iud cascade")
+  /*  CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")*/
+  }
+}
\ No newline at end of file


Mime
View raw message