carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject [3/4] carbondata git commit: [CARBONDATA-1815][PreAgg] Add AtomicRunnableCommand abstraction
Date Wed, 29 Nov 2017 10:41:14 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
new file mode 100644
index 0000000..9d21468
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.util.{CausedBy, FileUtils}
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
+import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.format
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
+
+case class CarbonLoadDataCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    factPathFromUser: String,
+    dimFilesPath: Seq[DataLoadTableFileMapping],
+    options: scala.collection.immutable.Map[String, String],
+    isOverwriteTable: Boolean,
+    var inputSqlString: String = null,
+    dataFrame: Option[DataFrame] = None,
+    updateModel: Option[UpdateTableModel] = None,
+    var tableInfoOp: Option[TableInfo] = None)
+  extends RunnableCommand with DataProcessOperation {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    if (dataFrame.isDefined && updateModel.isEmpty) {
+      val rdd = dataFrame.get.rdd
+      if (rdd.partitions == null || rdd.partitions.length == 0) {
+        LOGGER.warn("DataLoading finished. No data was loaded.")
+        return Seq.empty
+      }
+    }
+
+    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+
+    // get the value of 'spark.executor.cores' from spark conf, default value is 1
+    val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1")
+    // get the value of 'carbon.number.of.cores.while.loading' from carbon properties,
+    // default value is the value of 'spark.executor.cores'
+    val numCoresLoading =
+      try {
+        CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, sparkExecutorCores)
+      } catch {
+        case exc: NumberFormatException =>
+          LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+              + " is wrong. Falling back to the default value "
+              + sparkExecutorCores)
+          sparkExecutorCores
+      }
+
+    // update the property with new value
+    carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
+
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    try {
+      val table = if (tableInfoOp.isDefined) {
+        CarbonTable.buildFromTableInfo(tableInfoOp.get)
+      } else {
+        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        if (relation == null) {
+          throw new NoSuchTableException(dbName, tableName)
+        }
+        if (null == relation.carbonTable) {
+          LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
+          LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+          throw new NoSuchTableException(dbName, tableName)
+        }
+        relation.carbonTable
+      }
+
+      val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+      val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+      optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+          carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+            CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+
+      val carbonLoadModel = new CarbonLoadModel()
+      val factPath = if (dataFrame.isDefined) {
+        ""
+      } else {
+        FileUtils.getPaths(
+          CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser), hadoopConf)
+      }
+      carbonLoadModel.setFactFilePath(factPath)
+      DataLoadingUtil.buildCarbonLoadModel(
+        table,
+        carbonProperty,
+        options,
+        optionsFinal,
+        carbonLoadModel,
+        hadoopConf
+      )
+      val operationContext = new OperationContext
+      val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
+        new LoadTablePreExecutionEvent(sparkSession,
+          null,
+          carbonLoadModel,
+          factPath,
+          dataFrame.isDefined,
+          optionsFinal)
+      OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
+
+      try{
+        // First system has to partition the data first and then call the load data
+        LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+        GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
+        // add the start entry for the new load in the table status file
+        if (updateModel.isEmpty) {
+          CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
+        }
+        if (isOverwriteTable) {
+          LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+        }
+        if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
+            StringUtils.isEmpty(carbonLoadModel.getColDictFilePath) &&
+            StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
+          LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
+          LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
+          carbonLoadModel.setUseOnePass(false)
+        }
+        // if table is an aggregate table then disable single pass.
+        if (carbonLoadModel.isAggLoadRequest) {
+          carbonLoadModel.setUseOnePass(false)
+        }
+        // Create table and metadata folders if not exist
+        val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
+        val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+        val fileType = FileFactory.getFileType(metadataDirectoryPath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+          FileFactory.mkdirs(metadataDirectoryPath, fileType)
+        }
+        val partitionStatus = SegmentStatus.SUCCESS
+        val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
+        if (carbonLoadModel.getUseOnePass) {
+          loadDataUsingOnePass(
+            sparkSession,
+            carbonProperty,
+            carbonLoadModel,
+            columnar,
+            partitionStatus,
+            hadoopConf)
+        } else {
+          loadData(
+            sparkSession,
+            carbonLoadModel,
+            columnar,
+            partitionStatus,
+            hadoopConf)
+        }
+        val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
+          new LoadTablePostExecutionEvent(sparkSession,
+            table.getCarbonTableIdentifier,
+            carbonLoadModel)
+        OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
+      } catch {
+        case CausedBy(ex: NoRetryException) =>
+          LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
+          throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+        case ex: Exception =>
+          LOGGER.error(ex)
+          LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
+          throw ex
+      } finally {
+        // Once the data load is successful delete the unwanted partition files
+        try {
+          val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
+                                  table.getDatabaseName + "/" +
+                                  table.getTableName + "/"
+          val fileType = FileFactory.getFileType(partitionLocation)
+          if (FileFactory.isFileExist(partitionLocation, fileType)) {
+            val file = FileFactory
+              .getCarbonFile(partitionLocation, fileType)
+            CarbonUtil.deleteFoldersAndFiles(file)
+          }
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(ex)
+            LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
+                         "Problem deleting the partition folder")
+            throw ex
+        }
+
+      }
+    } catch {
+      case dle: DataLoadingException =>
+        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
+        throw dle
+      case mce: MalformedCarbonCommandException =>
+        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
+        throw mce
+    }
+    Seq.empty
+  }
+
+  private def loadDataUsingOnePass(
+      sparkSession: SparkSession,
+      carbonProperty: CarbonProperties,
+      carbonLoadModel: CarbonLoadModel,
+      columnar: Boolean,
+      partitionStatus: SegmentStatus,
+      hadoopConf: Configuration): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      .getCarbonTableIdentifier
+    val carbonTablePath = CarbonStorePath
+      .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
+    val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+    val dimensions = carbonTable.getDimensionByTableName(
+      carbonTable.getTableName).asScala.toArray
+    val colDictFilePath = carbonLoadModel.getColDictFilePath
+    if (!StringUtils.isEmpty(colDictFilePath)) {
+      carbonLoadModel.initPredefDictMap()
+      // generate predefined dictionary
+      GlobalDictionaryUtil.generatePredefinedColDictionary(
+        colDictFilePath,
+        carbonTableIdentifier,
+        dimensions,
+        carbonLoadModel,
+        sparkSession.sqlContext,
+        carbonLoadModel.getTablePath,
+        dictFolderPath)
+    }
+    if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
+      carbonLoadModel.initPredefDictMap()
+      GlobalDictionaryUtil
+        .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
+          carbonLoadModel,
+          carbonLoadModel.getTablePath,
+          carbonTableIdentifier,
+          dictFolderPath,
+          dimensions,
+          carbonLoadModel.getAllDictPath)
+    }
+    // dictionaryServerClient dictionary generator
+    val dictionaryServerPort = carbonProperty
+      .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+        CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
+    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+      getConf.get("spark.driver.host")
+    carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+    // start dictionary server when use one pass load and dimension with DICTIONARY
+    // encoding is present.
+    val allDimensions =
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
+    val createDictionary = allDimensions.exists {
+      carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+                         !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+    }
+    val server: Option[DictionaryServer] = if (createDictionary) {
+      val dictionaryServer = DictionaryServer
+        .getInstance(dictionaryServerPort.toInt, carbonTable)
+      carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+      sparkSession.sparkContext.addSparkListener(new SparkListener() {
+        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+          dictionaryServer.shutdown()
+        }
+      })
+      Some(dictionaryServer)
+    } else {
+      None
+    }
+    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+      carbonLoadModel,
+      carbonLoadModel.getTablePath,
+      columnar,
+      partitionStatus,
+      server,
+      isOverwriteTable,
+      hadoopConf,
+      dataFrame,
+      updateModel)
+  }
+
+  private def loadData(
+      sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      columnar: Boolean,
+      partitionStatus: SegmentStatus,
+      hadoopConf: Configuration): Unit = {
+    val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
+      val fields = dataFrame.get.schema.fields
+      import org.apache.spark.sql.functions.udf
+      // extracting only segment from tupleId
+      val getSegIdUDF = udf((tupleId: String) =>
+        CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
+      // getting all fields except tupleId field as it is not required in the value
+      var otherFields = fields.toSeq
+        .filter(field => !field.name
+          .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+        .map(field => new Column(field.name))
+
+      // extract tupleId field which will be used as a key
+      val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
+        .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
+        as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
+      // use dataFrameWithoutTupleId as dictionaryDataFrame
+      val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
+      otherFields = otherFields :+ segIdColumn
+      // use dataFrameWithTupleId as loadDataFrame
+      val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
+      (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
+    } else {
+      (dataFrame, dataFrame)
+    }
+
+    GlobalDictionaryUtil.generateGlobalDictionary(
+      sparkSession.sqlContext,
+      carbonLoadModel,
+      carbonLoadModel.getTablePath,
+      hadoopConf,
+      dictionaryDataFrame)
+    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+      carbonLoadModel,
+      carbonLoadModel.getTablePath,
+      columnar,
+      partitionStatus,
+      None,
+      isOverwriteTable,
+      hadoopConf,
+      loadDataFrame,
+      updateModel)
+  }
+
+  private def updateTableMetadata(
+      carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      model: DictionaryLoadModel,
+      noDictDimension: Array[CarbonDimension]): Unit = {
+    val sparkSession = sqlContext.sparkSession
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.table)
+
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    // read TableInfo
+    val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+
+    // modify TableInfo
+    val columns = tableInfo.getFact_table.getTable_columns
+    for (i <- 0 until columns.size) {
+      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
+        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
+      }
+    }
+    val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+    entry.setTime_stamp(System.currentTimeMillis())
+
+    // write TableInfo
+    metastore.updateTableSchemaForAlter(carbonTablePath.getCarbonTableIdentifier,
+      carbonTablePath.getCarbonTableIdentifier,
+      tableInfo, entry, carbonTablePath.getPath)(sparkSession)
+
+    // update the schema modified time
+    metastore.updateAndTouchSchemasUpdatedTime()
+
+    val identifier = model.table.getCarbonTableIdentifier
+    // update CarbonDataLoadSchema
+    val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName),
+      identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].carbonTable
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 32d6b80..7d00fa6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -18,21 +18,25 @@
 package org.apache.spark.sql.execution.command.management
 
 import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+import org.apache.spark.sql.types.{StringType, TimestampType}
 
 import org.apache.carbondata.api.CarbonStore
 
 case class CarbonShowLoadsCommand(
     databaseNameOp: Option[String],
     tableName: String,
-    limit: Option[String],
-    override val output: Seq[Attribute])
-  extends RunnableCommand with DataProcessCommand {
+    limit: Option[String])
+  extends DataCommand {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
+  override def output: Seq[Attribute] = {
+    Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
+      AttributeReference("Status", StringType, nullable = false)(),
+      AttributeReference("Load Start Time", TimestampType, nullable = false)(),
+      AttributeReference("Load End Time", TimestampType, nullable = true)(),
+      AttributeReference("Merged To", StringType, nullable = false)(),
+      AttributeReference("File Format", StringType, nullable = false)())
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
deleted file mode 100644
index d6604d8..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.management
-
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand}
-
-import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Clean data in table
- * If table name is specified and forceTableClean is false, it will clean garbage
- * segment (MARKED_FOR_DELETE state).
- * If table name is specified and forceTableClean is true, it will delete all data
- * in the table.
- * If table name is not provided, it will clean garbage segment in all tables.
- */
-case class CleanFilesCommand(
-    databaseNameOp: Option[String],
-    tableName: Option[String],
-    forceTableClean: Boolean = false)
-  extends RunnableCommand with DataProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession)
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
-      } else {
-        cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
-      }
-    } else {
-      cleanGarbageDataInAllTables(sparkSession)
-    }
-    val cleanFilesPostEvent: CleanFilesPostEvent =
-      CleanFilesPostEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
-    Seq.empty
-  }
-
-  private def deleteAllData(sparkSession: SparkSession,
-      databaseNameOp: Option[String], tableName: String): Unit = {
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
-    CarbonStore.cleanFiles(
-      dbName,
-      tableName,
-      databaseLocation,
-      null,
-      forceTableClean)
-  }
-
-  private def cleanGarbageData(sparkSession: SparkSession,
-      databaseNameOp: Option[String], tableName: String): Unit = {
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
-
-    CarbonStore.cleanFiles(
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName,
-      CarbonProperties.getStorePath,
-      carbonTable,
-      forceTableClean)
-
-    val cleanFilesPostEvent: CleanFilesPostEvent =
-      CleanFilesPostEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
-  }
-
-  private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {
-    // Waiting to implement
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
deleted file mode 100644
index 5b305ba..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.management
-
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand}
-import org.apache.spark.sql.hive.CarbonRelation
-
-import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
-
-case class DeleteLoadByIdCommand(
-    loadIds: Seq[String],
-    databaseNameOp: Option[String],
-    tableName: String) extends RunnableCommand with DataProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    val operationContext = new OperationContext
-
-    val deleteSegmentByIdPreEvent: DeleteSegmentByIdPreEvent =
-      DeleteSegmentByIdPreEvent(carbonTable,
-        loadIds,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(deleteSegmentByIdPreEvent, operationContext)
-
-    CarbonStore.deleteLoadById(
-      loadIds,
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName,
-      carbonTable
-    )
-
-    val deleteSegmentPostEvent: DeleteSegmentByIdPostEvent =
-      DeleteSegmentByIdPostEvent(carbonTable,
-        loadIds,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(deleteSegmentPostEvent, operationContext)
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
deleted file mode 100644
index 00c35a5..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.management
-
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand}
-import org.apache.spark.sql.hive.CarbonRelation
-
-import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
-
-case class DeleteLoadByLoadDateCommand(
-    databaseNameOp: Option[String],
-    tableName: String,
-    dateField: String,
-    loadDate: String)
-  extends RunnableCommand with DataProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    val operationContext = new OperationContext
-    val deleteSegmentByDatePreEvent: DeleteSegmentByDatePreEvent =
-      DeleteSegmentByDatePreEvent(carbonTable,
-        loadDate,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(deleteSegmentByDatePreEvent, operationContext)
-
-    CarbonStore.deleteLoadByDate(
-      loadDate,
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName,
-      carbonTable)
-    val deleteSegmentPostEvent: DeleteSegmentByDatePostEvent =
-      DeleteSegmentByDatePostEvent(carbonTable,
-        loadDate,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(deleteSegmentPostEvent, operationContext)
-
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
deleted file mode 100644
index 845a64c..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.management
-
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{DataProcessCommand, RunnableCommand}
-
-import org.apache.carbondata.spark.util.CarbonSparkUtil
-
-case class LoadTableByInsertCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean)
-  extends RunnableCommand with DataProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val df = Dataset.ofRows(sparkSession, child)
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    val load = LoadTableCommand(
-      Some(relation.carbonRelation.databaseName),
-      relation.carbonRelation.tableName,
-      null,
-      Seq(),
-      scala.collection.immutable.Map("fileheader" -> header),
-      overwrite,
-      null,
-      Some(df)).run(sparkSession)
-    // updating relation metadata. This is in case of auto detect high cardinality
-    relation.carbonRelation.metaData =
-      CarbonSparkUtil.createSparkMeta(relation.carbonRelation.carbonTable)
-    load
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
deleted file mode 100644
index 0025011..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.management
-
-import scala.collection.JavaConverters._
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
-import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataProcessCommand, RunnableCommand, UpdateTableModel}
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.util.{CausedBy, FileUtils}
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.SegmentStatus
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.format
-import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
-
-case class LoadTableCommand(
-    databaseNameOp: Option[String],
-    tableName: String,
-    factPathFromUser: String,
-    dimFilesPath: Seq[DataLoadTableFileMapping],
-    options: scala.collection.immutable.Map[String, String],
-    isOverwriteTable: Boolean,
-    var inputSqlString: String = null,
-    dataFrame: Option[DataFrame] = None,
-    updateModel: Option[UpdateTableModel] = None,
-    var tableInfoOp: Option[TableInfo] = None)
-  extends RunnableCommand with DataProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    if (dataFrame.isDefined && updateModel.isEmpty) {
-      val rdd = dataFrame.get.rdd
-      if (rdd.partitions == null || rdd.partitions.length == 0) {
-        LOGGER.warn("DataLoading finished. No data was loaded.")
-        return Seq.empty
-      }
-    }
-
-    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-
-    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
-    carbonProperty.addProperty("zookeeper.enable.lock", "false")
-
-    // get the value of 'spark.executor.cores' from spark conf, default value is 1
-    val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1")
-    // get the value of 'carbon.number.of.cores.while.loading' from carbon properties,
-    // default value is the value of 'spark.executor.cores'
-    val numCoresLoading =
-      try {
-        CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, sparkExecutorCores)
-      } catch {
-        case exc: NumberFormatException =>
-          LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
-              + " is wrong. Falling back to the default value "
-              + sparkExecutorCores)
-          sparkExecutorCores
-      }
-
-    // update the property with new value
-    carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
-
-    val hadoopConf = sparkSession.sessionState.newHadoopConf()
-    try {
-      val table = if (tableInfoOp.isDefined) {
-        CarbonTable.buildFromTableInfo(tableInfoOp.get)
-      } else {
-        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        if (relation == null) {
-          throw new NoSuchTableException(dbName, tableName)
-        }
-        if (null == relation.carbonTable) {
-          LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
-          LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
-          throw new NoSuchTableException(dbName, tableName)
-        }
-        relation.carbonTable
-      }
-
-      val tableProperties = table.getTableInfo.getFactTable.getTableProperties
-      val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
-      optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
-        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-          carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-            CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
-
-      val carbonLoadModel = new CarbonLoadModel()
-      val factPath = if (dataFrame.isDefined) {
-        ""
-      } else {
-        FileUtils.getPaths(
-          CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser), hadoopConf)
-      }
-      carbonLoadModel.setFactFilePath(factPath)
-      DataLoadingUtil.buildCarbonLoadModel(
-        table,
-        carbonProperty,
-        options,
-        optionsFinal,
-        carbonLoadModel,
-        hadoopConf
-      )
-      val operationContext = new OperationContext
-      val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
-        new LoadTablePreExecutionEvent(sparkSession,
-          null,
-          carbonLoadModel,
-          factPath,
-          dataFrame.isDefined,
-          optionsFinal)
-      OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
-
-      try{
-        // First system has to partition the data first and then call the load data
-        LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
-        GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
-        // add the start entry for the new load in the table status file
-        if (updateModel.isEmpty) {
-          CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
-        }
-        if (isOverwriteTable) {
-          LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
-        }
-        if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
-            StringUtils.isEmpty(carbonLoadModel.getColDictFilePath) &&
-            StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
-          LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
-          LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
-          carbonLoadModel.setUseOnePass(false)
-        }
-        // if table is an aggregate table then disable single pass.
-        if (carbonLoadModel.isAggLoadRequest) {
-          carbonLoadModel.setUseOnePass(false)
-        }
-        // Create table and metadata folders if not exist
-        val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
-        val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
-        val fileType = FileFactory.getFileType(metadataDirectoryPath)
-        if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
-          FileFactory.mkdirs(metadataDirectoryPath, fileType)
-        }
-        val partitionStatus = SegmentStatus.SUCCESS
-        val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
-        if (carbonLoadModel.getUseOnePass) {
-          loadDataUsingOnePass(
-            sparkSession,
-            carbonProperty,
-            carbonLoadModel,
-            columnar,
-            partitionStatus,
-            hadoopConf)
-        } else {
-          loadData(
-            sparkSession,
-            carbonLoadModel,
-            columnar,
-            partitionStatus,
-            hadoopConf)
-        }
-        val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
-          new LoadTablePostExecutionEvent(sparkSession,
-            table.getCarbonTableIdentifier,
-            carbonLoadModel)
-        OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
-      } catch {
-        case CausedBy(ex: NoRetryException) =>
-          LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
-          throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
-        case ex: Exception =>
-          LOGGER.error(ex)
-          LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
-          throw ex
-      } finally {
-        // Once the data load is successful delete the unwanted partition files
-        try {
-          val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
-                                  table.getDatabaseName + "/" +
-                                  table.getTableName + "/"
-          val fileType = FileFactory.getFileType(partitionLocation)
-          if (FileFactory.isFileExist(partitionLocation, fileType)) {
-            val file = FileFactory
-              .getCarbonFile(partitionLocation, fileType)
-            CarbonUtil.deleteFoldersAndFiles(file)
-          }
-        } catch {
-          case ex: Exception =>
-            LOGGER.error(ex)
-            LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
-                         "Problem deleting the partition folder")
-            throw ex
-        }
-
-      }
-    } catch {
-      case dle: DataLoadingException =>
-        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
-        throw dle
-      case mce: MalformedCarbonCommandException =>
-        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
-        throw mce
-    }
-    Seq.empty
-  }
-
-  private def loadDataUsingOnePass(
-      sparkSession: SparkSession,
-      carbonProperty: CarbonProperties,
-      carbonLoadModel: CarbonLoadModel,
-      columnar: Boolean,
-      partitionStatus: SegmentStatus,
-      hadoopConf: Configuration): Unit = {
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      .getCarbonTableIdentifier
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
-    val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
-    val dimensions = carbonTable.getDimensionByTableName(
-      carbonTable.getTableName).asScala.toArray
-    val colDictFilePath = carbonLoadModel.getColDictFilePath
-    if (!StringUtils.isEmpty(colDictFilePath)) {
-      carbonLoadModel.initPredefDictMap()
-      // generate predefined dictionary
-      GlobalDictionaryUtil.generatePredefinedColDictionary(
-        colDictFilePath,
-        carbonTableIdentifier,
-        dimensions,
-        carbonLoadModel,
-        sparkSession.sqlContext,
-        carbonLoadModel.getTablePath,
-        dictFolderPath)
-    }
-    if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
-      carbonLoadModel.initPredefDictMap()
-      GlobalDictionaryUtil
-        .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
-          carbonLoadModel,
-          carbonLoadModel.getTablePath,
-          carbonTableIdentifier,
-          dictFolderPath,
-          dimensions,
-          carbonLoadModel.getAllDictPath)
-    }
-    // dictionaryServerClient dictionary generator
-    val dictionaryServerPort = carbonProperty
-      .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
-        CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
-    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
-      getConf.get("spark.driver.host")
-    carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
-    // start dictionary server when use one pass load and dimension with DICTIONARY
-    // encoding is present.
-    val allDimensions =
-      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
-    val createDictionary = allDimensions.exists {
-      carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-                         !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
-    }
-    val server: Option[DictionaryServer] = if (createDictionary) {
-      val dictionaryServer = DictionaryServer
-        .getInstance(dictionaryServerPort.toInt, carbonTable)
-      carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
-      sparkSession.sparkContext.addSparkListener(new SparkListener() {
-        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
-          dictionaryServer.shutdown()
-        }
-      })
-      Some(dictionaryServer)
-    } else {
-      None
-    }
-    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
-      carbonLoadModel,
-      carbonLoadModel.getTablePath,
-      columnar,
-      partitionStatus,
-      server,
-      isOverwriteTable,
-      hadoopConf,
-      dataFrame,
-      updateModel)
-  }
-
-  private def loadData(
-      sparkSession: SparkSession,
-      carbonLoadModel: CarbonLoadModel,
-      columnar: Boolean,
-      partitionStatus: SegmentStatus,
-      hadoopConf: Configuration): Unit = {
-    val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
-      val fields = dataFrame.get.schema.fields
-      import org.apache.spark.sql.functions.udf
-      // extracting only segment from tupleId
-      val getSegIdUDF = udf((tupleId: String) =>
-        CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
-      // getting all fields except tupleId field as it is not required in the value
-      var otherFields = fields.toSeq
-        .filter(field => !field.name
-          .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-        .map(field => new Column(field.name))
-
-      // extract tupleId field which will be used as a key
-      val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
-        .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
-        as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
-      // use dataFrameWithoutTupleId as dictionaryDataFrame
-      val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
-      otherFields = otherFields :+ segIdColumn
-      // use dataFrameWithTupleId as loadDataFrame
-      val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
-      (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
-    } else {
-      (dataFrame, dataFrame)
-    }
-
-    GlobalDictionaryUtil.generateGlobalDictionary(
-      sparkSession.sqlContext,
-      carbonLoadModel,
-      carbonLoadModel.getTablePath,
-      hadoopConf,
-      dictionaryDataFrame)
-    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
-      carbonLoadModel,
-      carbonLoadModel.getTablePath,
-      columnar,
-      partitionStatus,
-      None,
-      isOverwriteTable,
-      hadoopConf,
-      loadDataFrame,
-      updateModel)
-  }
-
-  private def updateTableMetadata(
-      carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      model: DictionaryLoadModel,
-      noDictDimension: Array[CarbonDimension]): Unit = {
-    val sparkSession = sqlContext.sparkSession
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.table)
-
-    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    // read TableInfo
-    val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-
-    // modify TableInfo
-    val columns = tableInfo.getFact_table.getTable_columns
-    for (i <- 0 until columns.size) {
-      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
-        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
-      }
-    }
-    val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
-    entry.setTime_stamp(System.currentTimeMillis())
-
-    // write TableInfo
-    metastore.updateTableSchemaForAlter(carbonTablePath.getCarbonTableIdentifier,
-      carbonTablePath.getCarbonTableIdentifier,
-      tableInfo, entry, carbonTablePath.getPath)(sparkSession)
-
-    // update the schema modified time
-    metastore.updateAndTouchSchemasUpdatedTime()
-
-    val identifier = model.table.getCarbonTableIdentifier
-    // update CarbonDataLoadSchema
-    val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName),
-      identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].carbonTable
-    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
new file mode 100644
index 0000000..3d65862
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.mutation
+
+import org.apache.spark.sql.{CarbonEnv, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.processing.loading.FailureCauses
+
+/**
+ * IUD update delete and compaction framework.
+ *
+ */
+private[sql] case class CarbonProjectForDeleteCommand(
+    plan: LogicalPlan,
+    identifier: Seq[String],
+    timestamp: String)
+  extends DataCommand {
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+      IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
+    val dataFrame = Dataset.ofRows(sparkSession, plan)
+    val dataRdd = dataFrame.rdd
+
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
+      asInstanceOf[CarbonRelation]
+    val carbonTable = relation.carbonTable
+
+    // trigger event for Delete from table
+    val operationContext = new OperationContext
+    val deleteFromTablePreEvent: DeleteFromTablePreEvent =
+      DeleteFromTablePreEvent(sparkSession, carbonTable)
+    OperationListenerBus.getInstance.fireEvent(deleteFromTablePreEvent, operationContext)
+
+    val metadataLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    var lockStatus = false
+    try {
+      lockStatus = metadataLock.lockWithRetries()
+      LOGGER.audit(s" Delete data request has been received " +
+                   s"for ${ relation.databaseName }.${ relation.tableName }.")
+      if (lockStatus) {
+        LOGGER.info("Successfully able to get the table metadata file lock")
+      } else {
+        throw new Exception("Table is locked for deletion. Please try after some time")
+      }
+      val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
+
+      // handle the clean up of IUD.
+      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+
+      if (DeleteExecution.deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp,
+        isUpdateOperation = false, executorErrors)) {
+        // call IUD Compaction.
+        HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
+          isUpdateOperation = false)
+
+        // trigger post event for Delete from table
+        val deleteFromTablePostEvent: DeleteFromTablePostEvent =
+          DeleteFromTablePostEvent(sparkSession, carbonTable)
+        OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
+      }
+    } catch {
+      case e: HorizontalCompactionException =>
+        LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +
+                     " Please check logs. " + e.getMessage)
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
+
+      case e: Exception =>
+        LOGGER.error(e, "Exception in Delete data operation " + e.getMessage)
+        // ****** start clean up.
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+
+        // clean up. Null check is required as for executor error some times message is null
+        if (null != e.getMessage) {
+          sys.error("Delete data operation is failed. " + e.getMessage)
+        }
+        else {
+          sys.error("Delete data operation is failed. Please check logs.")
+        }
+    } finally {
+      if (lockStatus) {
+        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
+      }
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
new file mode 100644
index 0000000..3aadec3
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.mutation
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
+import org.apache.carbondata.processing.loading.FailureCauses
+
+private[sql] case class CarbonProjectForUpdateCommand(
+    plan: LogicalPlan,
+    tableIdentifier: Seq[String])
+  extends DataCommand {
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(CarbonProjectForUpdateCommand.getClass.getName)
+    IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
+    val res = plan find {
+      case relation: LogicalRelation if relation.relation
+        .isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        true
+      case _ => false
+    }
+
+    if (res.isEmpty) {
+      return Seq.empty
+    }
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
+      asInstanceOf[CarbonRelation]
+    val carbonTable = relation.carbonTable
+
+    // trigger event for Update table
+    val operationContext = new OperationContext
+    val updateTablePreEvent: UpdateTablePreEvent =
+      UpdateTablePreEvent(sparkSession, carbonTable)
+    OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
+    val metadataLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    var lockStatus = false
+    // get the current time stamp which should be same for delete and update.
+    val currentTime = CarbonUpdateUtil.readCurrentTime
+    //    var dataFrame: DataFrame = null
+    var dataSet: DataFrame = null
+    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
+    try {
+      lockStatus = metadataLock.lockWithRetries()
+      if (lockStatus) {
+        logInfo("Successfully able to get the table metadata file lock")
+      }
+      else {
+        throw new Exception("Table is locked for updation. Please try after some time")
+      }
+      // Get RDD.
+
+      dataSet = if (isPersistEnabled) {
+        Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
+          CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
+      }
+      else {
+        Dataset.ofRows(sparkSession, plan)
+      }
+      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+
+
+      // handle the clean up of IUD.
+      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+
+      // do delete operation.
+      DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
+        currentTime + "", isUpdateOperation = true, executionErrors)
+
+      if(executionErrors.failureCauses != FailureCauses.NONE) {
+        throw new Exception(executionErrors.errorMsg)
+      }
+
+      // do update operation.
+      performUpdate(dataSet, tableIdentifier, plan, sparkSession, currentTime, executionErrors)
+
+      if(executionErrors.failureCauses != FailureCauses.NONE) {
+        throw new Exception(executionErrors.errorMsg)
+      }
+
+      // Do IUD Compaction.
+      HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
+
+      // trigger event for Update table
+      val updateTablePostEvent: UpdateTablePostEvent =
+        UpdateTablePostEvent(sparkSession, carbonTable)
+      OperationListenerBus.getInstance.fireEvent(updateTablePostEvent, operationContext)
+    } catch {
+      case e: HorizontalCompactionException =>
+        LOGGER.error(
+          "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
+        // In case of failure , clean all related delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
+
+      case e: Exception =>
+        LOGGER.error("Exception in update operation" + e)
+        // ****** start clean up.
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
+
+        // *****end clean up.
+        if (null != e.getMessage) {
+          sys.error("Update operation failed. " + e.getMessage)
+        }
+        if (null != e.getCause && null != e.getCause.getMessage) {
+          sys.error("Update operation failed. " + e.getCause.getMessage)
+        }
+        sys.error("Update operation failed. please check logs.")
+    }
+    finally {
+      if (null != dataSet && isPersistEnabled) {
+        dataSet.unpersist()
+      }
+      if (lockStatus) {
+        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
+      }
+    }
+    Seq.empty
+  }
+
+  private def performUpdate(
+      dataFrame: Dataset[Row],
+      tableIdentifier: Seq[String],
+      plan: LogicalPlan,
+      sparkSession: SparkSession,
+      currentTime: Long,
+      executorErrors: ExecutionErrors): Unit = {
+
+    def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
+
+      val tableName = relation.identifier.getCarbonTableIdentifier.getTableName
+      val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName
+      (tableIdentifier.size > 1 &&
+       tableIdentifier(0) == dbName &&
+       tableIdentifier(1) == tableName) ||
+      (tableIdentifier(0) == tableName)
+    }
+    def getHeader(relation: CarbonDatasourceHadoopRelation, plan: LogicalPlan): String = {
+      var header = ""
+      var found = false
+
+      plan match {
+        case Project(pList, _) if (!found) =>
+          found = true
+          header = pList
+            .filter(field => !field.name
+              .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+            .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) {
+              col.name
+                .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
+            }
+            else {
+              col.name
+            }).mkString(",")
+      }
+      header
+    }
+    val ex = dataFrame.queryExecution.analyzed
+    val res = ex find {
+      case relation: LogicalRelation
+        if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           isDestinationRelation(relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) =>
+        true
+      case _ => false
+    }
+    val carbonRelation: CarbonDatasourceHadoopRelation = res match {
+      case Some(relation: LogicalRelation) =>
+        relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+      case _ => sys.error("")
+    }
+
+    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
+
+    val header = getHeader(carbonRelation, plan)
+
+    CarbonLoadDataCommand(
+      Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
+      carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
+      null,
+      Seq(),
+      Map(("fileheader" -> header)),
+      false,
+      null,
+      Some(dataFrame),
+      Some(updateTableModel)).run(sparkSession)
+
+    executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
+    executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses
+
+    Seq.empty
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index a875ad6..553663e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.command.AlterTableModel
-import org.apache.spark.sql.execution.command.management.AlterTableCompactionCommand
+import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -139,7 +139,7 @@ object HorizontalCompaction {
         Some(factTimeStamp),
         "")
 
-      AlterTableCompactionCommand(alterTableModel).run(sparkSession)
+      CarbonAlterTableCompactionCommand(alterTableModel).run(sparkSession)
     }
     catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
deleted file mode 100644
index 2b9caf7..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.mutation
-
-import org.apache.spark.sql.{CarbonEnv, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonRelation
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.processing.loading.FailureCauses
-/**
- * IUD update delete and compaction framework.
- *
- */
-private[sql] case class ProjectForDeleteCommand(
-    plan: LogicalPlan,
-    identifier: Seq[String],
-    timestamp: String) extends RunnableCommand with DataProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-      IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
-    val dataFrame = Dataset.ofRows(sparkSession, plan)
-    //    dataFrame.show(truncate = false)
-    //    dataFrame.collect().foreach(println)
-    val dataRdd = dataFrame.rdd
-
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.carbonTable
-
-    // trigger event for Delete from table
-    val operationContext = new OperationContext
-    val deleteFromTablePreEvent: DeleteFromTablePreEvent =
-      DeleteFromTablePreEvent(sparkSession, carbonTable)
-    OperationListenerBus.getInstance.fireEvent(deleteFromTablePreEvent, operationContext)
-
-    val metadataLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-        LockUsage.METADATA_LOCK)
-    var lockStatus = false
-    try {
-      lockStatus = metadataLock.lockWithRetries()
-      LOGGER.audit(s" Delete data request has been received " +
-                   s"for ${ relation.databaseName }.${ relation.tableName }.")
-      if (lockStatus) {
-        LOGGER.info("Successfully able to get the table metadata file lock")
-      } else {
-        throw new Exception("Table is locked for deletion. Please try after some time")
-      }
-      val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
-
-      // handle the clean up of IUD.
-      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
-      if (DeleteExecution.deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp,
-        isUpdateOperation = false, executorErrors)) {
-        // call IUD Compaction.
-        HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
-          isUpdateOperation = false)
-
-        // trigger post event for Delete from table
-        val deleteFromTablePostEvent: DeleteFromTablePostEvent =
-          DeleteFromTablePostEvent(sparkSession, carbonTable)
-        OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
-      }
-    } catch {
-      case e: HorizontalCompactionException =>
-        LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +
-                     " Please check logs. " + e.getMessage)
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
-
-      case e: Exception =>
-        LOGGER.error(e, "Exception in Delete data operation " + e.getMessage)
-        // ****** start clean up.
-        // In case of failure , clean all related delete delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-
-        // clean up. Null check is required as for executor error some times message is null
-        if (null != e.getMessage) {
-          sys.error("Delete data operation is failed. " + e.getMessage)
-        }
-        else {
-          sys.error("Delete data operation is failed. Please check logs.")
-        }
-    } finally {
-      if (lockStatus) {
-        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
-      }
-    }
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
deleted file mode 100644
index 341f368..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.mutation
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.management.LoadTableCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
-import org.apache.carbondata.processing.loading.FailureCauses
-
-private[sql] case class ProjectForUpdateCommand(
-    plan: LogicalPlan, tableIdentifier: Seq[String])
-  extends RunnableCommand with DataProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
-    IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
-    val res = plan find {
-      case relation: LogicalRelation if relation.relation
-        .isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        true
-      case _ => false
-    }
-
-    if (res.isEmpty) {
-      return Seq.empty
-    }
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.carbonTable
-
-    // trigger event for Update table
-    val operationContext = new OperationContext
-    val updateTablePreEvent: UpdateTablePreEvent =
-      UpdateTablePreEvent(sparkSession, carbonTable)
-    OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
-    val metadataLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-        LockUsage.METADATA_LOCK)
-    var lockStatus = false
-    // get the current time stamp which should be same for delete and update.
-    val currentTime = CarbonUpdateUtil.readCurrentTime
-    //    var dataFrame: DataFrame = null
-    var dataSet: DataFrame = null
-    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
-    try {
-      lockStatus = metadataLock.lockWithRetries()
-      if (lockStatus) {
-        logInfo("Successfully able to get the table metadata file lock")
-      }
-      else {
-        throw new Exception("Table is locked for updation. Please try after some time")
-      }
-      // Get RDD.
-
-      dataSet = if (isPersistEnabled) {
-        Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
-          CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
-      }
-      else {
-        Dataset.ofRows(sparkSession, plan)
-      }
-      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
-
-
-      // handle the clean up of IUD.
-      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
-      // do delete operation.
-      DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
-        currentTime + "", isUpdateOperation = true, executionErrors)
-
-      if(executionErrors.failureCauses != FailureCauses.NONE) {
-        throw new Exception(executionErrors.errorMsg)
-      }
-
-      // do update operation.
-      performUpdate(dataSet, tableIdentifier, plan, sparkSession, currentTime, executionErrors)
-
-      if(executionErrors.failureCauses != FailureCauses.NONE) {
-        throw new Exception(executionErrors.errorMsg)
-      }
-
-      // Do IUD Compaction.
-      HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
-
-      // trigger event for Update table
-      val updateTablePostEvent: UpdateTablePostEvent =
-        UpdateTablePostEvent(sparkSession, carbonTable)
-      OperationListenerBus.getInstance.fireEvent(updateTablePostEvent, operationContext)
-    } catch {
-      case e: HorizontalCompactionException =>
-        LOGGER.error(
-          "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
-        // In case of failure , clean all related delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
-
-      case e: Exception =>
-        LOGGER.error("Exception in update operation" + e)
-        // ****** start clean up.
-        // In case of failure , clean all related delete delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
-
-        // *****end clean up.
-        if (null != e.getMessage) {
-          sys.error("Update operation failed. " + e.getMessage)
-        }
-        if (null != e.getCause && null != e.getCause.getMessage) {
-          sys.error("Update operation failed. " + e.getCause.getMessage)
-        }
-        sys.error("Update operation failed. please check logs.")
-    }
-    finally {
-      if (null != dataSet && isPersistEnabled) {
-        dataSet.unpersist()
-      }
-      if (lockStatus) {
-        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
-      }
-    }
-    Seq.empty
-  }
-
-  private def performUpdate(
-      dataFrame: Dataset[Row],
-      tableIdentifier: Seq[String],
-      plan: LogicalPlan,
-      sparkSession: SparkSession,
-      currentTime: Long,
-      executorErrors: ExecutionErrors): Unit = {
-
-    def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
-
-      val tableName = relation.identifier.getCarbonTableIdentifier.getTableName
-      val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName
-      (tableIdentifier.size > 1 &&
-       tableIdentifier(0) == dbName &&
-       tableIdentifier(1) == tableName) ||
-      (tableIdentifier(0) == tableName)
-    }
-    def getHeader(relation: CarbonDatasourceHadoopRelation, plan: LogicalPlan): String = {
-      var header = ""
-      var found = false
-
-      plan match {
-        case Project(pList, _) if (!found) =>
-          found = true
-          header = pList
-            .filter(field => !field.name
-              .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-            .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) {
-              col.name
-                .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
-            }
-            else {
-              col.name
-            }).mkString(",")
-      }
-      header
-    }
-    val ex = dataFrame.queryExecution.analyzed
-    val res = ex find {
-      case relation: LogicalRelation
-        if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           isDestinationRelation(relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) =>
-        true
-      case _ => false
-    }
-    val carbonRelation: CarbonDatasourceHadoopRelation = res match {
-      case Some(relation: LogicalRelation) =>
-        relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-      case _ => sys.error("")
-    }
-
-    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
-
-    val header = getHeader(carbonRelation, plan)
-
-    LoadTableCommand(
-      Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
-      carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
-      null,
-      Seq(),
-      Map(("fileheader" -> header)),
-      false,
-      null,
-      Some(dataFrame),
-      Some(updateTableModel)).run(sparkSession)
-
-    executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
-    executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses
-
-    Seq.empty
-
-  }
-}


Mime
View raw message