carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [12/22] incubator-carbondata git commit: IUD horizontal compaction of update and delete delta files support
Date Fri, 06 Jan 2017 13:57:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index a7d831d..f426d65 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -17,11 +17,42 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.carbondata.common.logging.LogServiceFactory
+import java.io.IOException
+import java.text.SimpleDateFormat
+import java.util
+
+import org.apache.carbondata.common.iudprocessor.iuddata.RowCountDetailsVO
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.update.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
+import org.apache.carbondata.core.updatestatus.{SegmentStatusManager, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
+import org.apache.carbondata.processing.exception.MultipleMatchingException
+import org.apache.carbondata.spark.util.QueryPlanUtil
+import org.apache.spark.sql.catalyst.TableIdentifier
 
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
 import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.FileUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.carbon.path.{CarbonTablePath, CarbonStorePath}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.spark.load.{CarbonLoaderUtil, FailureCauses}
+import org.apache.carbondata.spark.merger.CarbonDataMergerUtil._
+import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionType, CarbonDataMergerUtilResult}
+import org.apache.carbondata.spark.DeleteDelataResultImpl
+
 
 /**
  * IUD update delete and compaction framework.
@@ -30,14 +61,73 @@ import org.apache.spark.sql.execution.RunnableCommand
 
 private[sql] case class ProjectForDeleteCommand(
      plan: LogicalPlan,
-     tableIdentifier: Seq[String],
+     identifier: Seq[String],
      timestamp: String) extends RunnableCommand {
 
   val LOG = LogServiceFactory.getLogService(this.getClass.getName)
   var horizontalCompactionFailed = false
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    DataFrame(sqlContext, plan).show(truncate = false)
+
+    val dataFrame = DataFrame(sqlContext, plan)
+    val dataRdd = dataFrame.rdd
+
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation1(deleteExecution.getTableIdentifier(identifier))(sqlContext).
+      asInstanceOf[CarbonRelation]
+    val carbonTable = relation.tableMeta.carbonTable
+    val metadataLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    var lockStatus = false
+    try {
+      lockStatus = metadataLock.lockWithRetries()
+      LOG.audit(s" Delete data request has been received " +
+                s"for ${ relation.databaseName }.${ relation.tableName }.")
+      if (lockStatus) {
+        LOG.info("Successfully able to get the table metadata file lock")
+      }
+      else {
+        throw new Exception("Table is locked for deletion. Please try after some time")
+      }
+      val tablePath = CarbonStorePath.getCarbonTablePath(
+        carbonTable.getStorePath,
+        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
+      var executorErrors = new ExecutionErrors(FailureCauses.NONE, "")
+
+        // handle the clean up of IUD.
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+
+          if (deleteExecution
+            .deleteDeltaExecution(identifier, sqlContext, dataRdd, timestamp, relation,
+              false, executorErrors)) {
+            // call IUD Compaction.
+            IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = false)
+          }
+    } catch {
+      case e: HorizontalCompactionException =>
+          LOG.error("Delete operation passed. Exception in Horizontal Compaction." +
+              " Please check logs. " + e.getMessage)
+          CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
+
+      case e: Exception =>
+        LOG.error("Exception in Delete data operation " + e.getMessage)
+        // ****** start clean up.
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+
+        // clean up. Null check is required as for executor error some times message is null
+        if (null != e.getMessage) {
+          sys.error("Delete data operation is failed. " + e.getMessage)
+        }
+        else {
+          sys.error("Delete data operation is failed. Please check logs.")
+        }
+    } finally {
+      if (lockStatus) {
+        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
+      }
+    }
     Seq.empty
   }
 }
@@ -47,9 +137,721 @@ private[sql] case class ProjectForUpdateCommand(
   val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
-      .EXECUTION_ID_KEY, null)
-    DataFrame(sqlContext, plan).show(truncate = false)
+
+    val res = plan find {
+      case relation: LogicalRelation if (relation.relation
+        .isInstanceOf[CarbonDatasourceRelation]) =>
+        true
+      case _ => false
+    }
+
+    if (!res.isDefined) {
+      return Seq.empty
+    }
+
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext).
+      asInstanceOf[CarbonRelation]
+    val carbonTable = relation.tableMeta.carbonTable
+    val metadataLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    var lockStatus = false
+    // get the current time stamp which should be same for delete and update.
+    val currentTime = CarbonUpdateUtil.readCurrentTime
+    var dataFrame: DataFrame = null
+    val isPersistEnabledUserValue = CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.isPersistEnabled,
+        CarbonCommonConstants.defaultValueIsPersistEnabled)
+   var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean
+    if (isPersistEnabledUserValue.equalsIgnoreCase("false")) {
+      isPersistEnabled = false
+    }
+    else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) {
+      isPersistEnabled = true
+    }
+    try {
+      lockStatus = metadataLock.lockWithRetries()
+      if (lockStatus) {
+        logInfo("Successfully able to get the table metadata file lock")
+      }
+      else {
+        throw new Exception("Table is locked for updation. Please try after some time")
+      }
+      val tablePath = CarbonStorePath.getCarbonTablePath(
+        carbonTable.getStorePath,
+        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
+        // Get RDD.
+        dataFrame = if (isPersistEnabled) {
+          DataFrame(sqlContext, plan)
+            .persist(StorageLevel.MEMORY_AND_DISK)
+        }
+        else {
+          DataFrame(sqlContext, plan)
+        }
+        var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+
+
+        // handle the clean up of IUD.
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+
+        // do delete operation.
+        deleteExecution.deleteDeltaExecution(tableIdentifier, sqlContext, dataFrame.rdd,
+          currentTime + "",
+        relation, isUpdateOperation = true, executionErrors)
+
+        if(executionErrors.failureCauses != FailureCauses.NONE) {
+          throw new Exception(executionErrors.errorMsg)
+        }
+
+        // do update operation.
+        UpdateExecution.performUpdate(dataFrame, tableIdentifier, plan,
+          sqlContext, currentTime, executionErrors)
+
+        if(executionErrors.failureCauses != FailureCauses.NONE) {
+          throw new Exception(executionErrors.errorMsg)
+        }
+
+        // Do IUD Compaction.
+        IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = true)
+    }
+
+    catch {
+      case e: HorizontalCompactionException =>
+        LOGGER.error(
+            "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
+        // In case of failure , clean all related delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
+
+      case e: Exception =>
+        LOGGER.error("Exception in update operation" + e)
+        // ****** start clean up.
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
+
+        // *****end clean up.
+        if (null != e.getMessage) {
+          sys.error("Update operation failed. " + e.getMessage)
+        }
+        if (null != e.getCause && null != e.getCause.getMessage) {
+          sys.error("Update operation failed. " + e.getCause.getMessage)
+        }
+        sys.error("Update operation failed. please check logs.")
+    }
+    finally {
+      if (null != dataFrame && isPersistEnabled) {
+        dataFrame.unpersist()
+      }
+      if (lockStatus) {
+        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
+      }
+    }
+    Seq.empty
+  }
+}
+
+object IUDCommon {
+
+  val LOG = LogServiceFactory.getLogService(this.getClass.getName)
+
+  /**
+   * The method does horizontal compaction. After Update and Delete completion
+   * tryHorizontal compaction will be called. In case this method is called after
+   * Update statement then Update Compaction followed by Delete Compaction will be
+   * processed whereas for tryHorizontalCompaction called after Delete statement
+   * then only Delete Compaction will be processed.
+    *
+    * @param sqlContext
+   * @param carbonRelation
+   * @param isUpdateOperation
+   */
+  def tryHorizontalCompaction(sqlContext: SQLContext,
+      carbonRelation: CarbonRelation,
+      isUpdateOperation: Boolean): Unit = {
+
+    var ishorizontalCompaction = isHorizontalCompactionEnabled()
+
+    if (ishorizontalCompaction == false) {
+      return
+    }
+
+    var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+    val carbonTable = carbonRelation.tableMeta.carbonTable
+    val (db, table) = (carbonTable.getDatabaseName, carbonTable.getFactTableName)
+    val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+    val updateTimeStamp = System.currentTimeMillis()
+    // To make sure that update and delete timestamps are not same,
+    // required to commit to status metadata and cleanup
+    val deleteTimeStamp = updateTimeStamp + 1
+
+    // get the valid segments
+    var segLists = getValidSegmentList(absTableIdentifier)
+
+    if (segLists == null || segLists.size() == 0) {
+      return
+    }
+
+    // Should avoid reading Table Status file from Disk every time. Better to load it
+    // in-memory at the starting and pass it along the routines. The constructor of
+    // SegmentUpdateStatusManager reads the Table Status File and Table Update Status
+    // file and save the content in segmentDetails and updateDetails respectively.
+    val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
+      absTableIdentifier)
+
+    if (isUpdateOperation == true) {
+
+      // This is only update operation, perform only update compaction.
+      compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+      performUpdateDeltaCompaction(sqlContext,
+        compactionTypeIUD,
+        carbonTable,
+        absTableIdentifier,
+        segmentUpdateStatusManager,
+        updateTimeStamp,
+        segLists)
+    }
+
+    // After Update Compaction perform delete compaction
+    compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION
+    segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
+    if (segLists == null || segLists.size() == 0) {
+      return
+    }
+
+    // Delete Compaction
+    performDeleteDeltaCompaction(sqlContext,
+      compactionTypeIUD,
+      carbonTable,
+      absTableIdentifier,
+      segmentUpdateStatusManager,
+      deleteTimeStamp,
+      segLists)
+  }
+
+  /**
+   * Update Delta Horizontal Compaction.
+    *
+    * @param sqlContext
+   * @param compactionTypeIUD
+   * @param carbonTable
+   * @param absTableIdentifier
+   * @param segLists
+   */
+  private def performUpdateDeltaCompaction(sqlContext: SQLContext,
+      compactionTypeIUD: CompactionType,
+      carbonTable: CarbonTable,
+      absTableIdentifier: AbsoluteTableIdentifier,
+      segmentUpdateStatusManager: SegmentUpdateStatusManager,
+      factTimeStamp: Long,
+      segLists: util.List[String]): Unit = {
+    val db = carbonTable.getDatabaseName
+    val table = carbonTable.getFactTableName
+    // get the valid segments qualified for update compaction.
+    val validSegList = getSegListIUDCompactionQualified(segLists,
+      absTableIdentifier,
+      segmentUpdateStatusManager,
+      compactionTypeIUD)
+
+    if (validSegList.size() == 0) {
+      return
+    }
+
+    LOG.info(s"Horizontal Update Compaction operation started for [${db}.${table}].")
+    LOG.audit(s"Horizontal Update Compaction operation started for [${db}.${table}].")
+
+
+    if (compactionTypeIUD == CompactionType.IUD_FACTFILE_COMPACTION ||
+        compactionTypeIUD == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+
+      try {
+        // Update Compaction.
+        if (compactionTypeIUD == CompactionType.IUD_FACTFILE_COMPACTION) {
+          val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName),
+            carbonTable.getFactTableName,
+            Some(segmentUpdateStatusManager),
+            CompactionType.IUD_FACTFILE_COMPACTION.toString,
+            Some(factTimeStamp),
+            "")
+
+          AlterTableCompaction(altertablemodel).run(sqlContext)
+          return
+        }
+        else {
+          val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName),
+            carbonTable.getFactTableName,
+            Some(segmentUpdateStatusManager),
+            CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
+            Some(factTimeStamp),
+            "")
+
+          AlterTableCompaction(altertablemodel).run(sqlContext)
+        }
+      }
+      catch {
+        case e: Exception =>
+          val msg = if (null != e.getMessage) {
+            e.getMessage
+          } else {
+            "Please check logs for more info"
+          }
+          throw new HorizontalCompactionException(
+            s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
+      }
+    }
+    LOG.info(s"Horizontal Update Compaction operation completed for [${db}.${table}].")
+    LOG.audit(s"Horizontal Update Compaction operation completed for [${db}.${table}].")
+  }
+
+  /**
+   * Delete Delta Horizontal Compaction.
+    *
+    * @param sqlContext
+   * @param compactionTypeIUD
+   * @param carbonTable
+   * @param absTableIdentifier
+   * @param segLists
+   */
+  private def performDeleteDeltaCompaction(sqlContext: SQLContext,
+      compactionTypeIUD: CompactionType,
+      carbonTable: CarbonTable,
+      absTableIdentifier: AbsoluteTableIdentifier,
+      segmentUpdateStatusManager: SegmentUpdateStatusManager,
+      factTimeStamp: Long,
+      segLists: util.List[String]): Unit = {
+
+    val db = carbonTable.getDatabaseName
+    val table = carbonTable.getFactTableName
+    val deletedBlocksList = getSegListIUDCompactionQualified(segLists,
+      absTableIdentifier,
+      segmentUpdateStatusManager,
+      compactionTypeIUD)
+
+    if (deletedBlocksList.size() == 0) {
+      return
+    }
+
+    LOG.info(s"Horizontal Delete Compaction operation started for [${db}.${table}].")
+    LOG.audit(s"Horizontal Delete Compaction operation started for [${db}.${table}].")
+
+    try {
+
+      // Delete Compaction RDD
+      val rdd1 = sqlContext.sparkContext
+        .parallelize(deletedBlocksList.asScala.toSeq, deletedBlocksList.size())
+
+      val timestamp = factTimeStamp
+      val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
+      val result = rdd1.mapPartitions(iter =>
+        new Iterator[Seq[CarbonDataMergerUtilResult]] {
+          override def hasNext: Boolean = iter.hasNext
+
+          override def next(): Seq[CarbonDataMergerUtilResult] = {
+            val segmentAndBlocks = iter.next
+            val segment = segmentAndBlocks.substring(0, segmentAndBlocks.lastIndexOf("/"))
+            val blockName = segmentAndBlocks
+              .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
+
+            val result = compactBlockDeleteDeltaFiles(segment, blockName,
+              absTableIdentifier,
+              updateStatusDetails,
+              timestamp)
+
+            result.asScala.toList
+
+          }
+        }).collect
+
+      val resultList = ListBuffer[CarbonDataMergerUtilResult]()
+      result.foreach(x => {
+        x.foreach(y => {
+          resultList += y
+        })
+      })
+
+      val updateStatus = updateStatusFile(resultList.toList.asJava,
+        carbonTable,
+        timestamp.toString,
+        segmentUpdateStatusManager)
+      if (updateStatus == false) {
+        LOG.audit(s"Delete Compaction data operation is failed for [${db}.${table}].")
+        LOG.error("Delete Compaction data operation is failed.")
+        throw new HorizontalCompactionException(
+          s"Horizontal Delete Compaction Failed for [${db}.${table}] ." +
+          s" Please check logs for more info.", factTimeStamp)
+      }
+      else {
+        LOG.info(s"Horizontal Delete Compaction operation completed for [${db}.${table}].")
+        LOG.audit(s"Horizontal Delete Compaction operation completed for [${db}.${table}].")
+      }
+    }
+    catch {
+      case e: Exception =>
+        val msg = if (null != e.getMessage) {
+          e.getMessage
+        } else {
+          "Please check logs for more info"
+        }
+        throw new HorizontalCompactionException(
+          s"Horizontal Delete Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
+    }
+  }
+}
+
+class HorizontalCompactionException(
+    message: String,
+    // required for cleanup
+    val compactionTimeStamp: Long) extends Exception(message) {
+}
+
+object deleteExecution {
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
+    if (tableIdentifier.size > 1) {
+      TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0)))
+    } else {
+      TableIdentifier(tableIdentifier(0), None)
+    }
+  }
+
+  def deleteDeltaExecution(identifier: Seq[String],
+                           sqlContext: SQLContext,
+                           dataRdd: RDD[Row],
+                           timestamp: String, relation: CarbonRelation, isUpdateOperation: Boolean,
+                           executorErrors: ExecutionErrors): Boolean = {
+
+    var res: Array[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] = null
+    val tableName = getTableIdentifier(identifier).table
+    val database = getDB.getDatabaseName(getTableIdentifier(identifier).database, sqlContext)
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation1(getTableIdentifier(identifier))(sqlContext).
+      asInstanceOf[CarbonRelation]
+
+    val storeLocation = relation.tableMeta.storePath
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = new
+        AbsoluteTableIdentifier(storeLocation,
+          relation.tableMeta.carbonTableIdentifier)
+    var tablePath = CarbonStorePath
+      .getCarbonTablePath(storeLocation,
+        absoluteTableIdentifier.getCarbonTableIdentifier())
+    var tableUpdateStatusPath = tablePath.getTableUpdateStatusFilePath
+    val totalSegments = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath).length
+    var factPath = tablePath.getFactDir
+
+    var carbonTable = relation.tableMeta.carbonTable
+    var deleteStatus = true
+    val deleteRdd = if (isUpdateOperation) {
+      val schema =
+        org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
+          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
+          org.apache.spark.sql.types.StringType)))
+      val rdd = dataRdd
+        .map(row => Row(row.get(row.fieldIndex(
+          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
+      sqlContext.createDataFrame(rdd, schema).rdd
+    } else {
+      dataRdd
+    }
+
+    val (carbonInputFormat, job) =
+      QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
+
+    val keyRdd = deleteRdd.map({ row =>
+      val tupleId: String = row
+        .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+      val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+      (key, row)
+    }).groupByKey()
+
+    // if no loads are present then no need to do anything.
+    if (keyRdd.partitions.size == 0) {
+      return true
+    }
+
+    var blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier)
+    val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
+    CarbonUpdateUtil
+      .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
+
+    val rowContRdd = sqlContext.sparkContext
+      .parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
+        keyRdd.partitions.size)
+
+    val rdd = rowContRdd.join(keyRdd)
+
+    res = rdd.mapPartitionsWithIndex(
+      (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
+        Iterator[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] {
+
+          var result = List[(String, (SegmentUpdateDetails, ExecutionErrors))]()
+          while (records.hasNext) {
+            val ((key), (rowCountDetailsVO, groupedRows)) = records.next
+            result = result ++
+              deleteDeltaFunc(index,
+                key,
+                groupedRows.toIterator,
+                timestamp,
+                rowCountDetailsVO)
+
+          }
+          result
+        }
+    ).collect()
+
+    // if no loads are present then no need to do anything.
+    if (res.isEmpty) {
+      return true
+    }
+
+    // update new status file
+    checkAndUpdateStatusFiles
+
+    // all or none : update status file, only if complete delete opeartion is successfull.
+    def checkAndUpdateStatusFiles: Unit = {
+      val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
+      val segmentDetails = new util.HashSet[String]()
+      res.foreach(resultOfSeg => resultOfSeg.foreach(
+        resultOfBlock => {
+          if (resultOfBlock._1.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)) {
+            blockUpdateDetailsList.add(resultOfBlock._2._1)
+            segmentDetails.add(resultOfBlock._2._1.getSegmentName)
+            // if this block is invalid then decrement block count in map.
+            if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getStatus)) {
+              CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
+                blockMappingVO.getSegmentNumberOfBlockMapping)
+            }
+          }
+          else {
+            deleteStatus = false
+            // In case of failure , clean all related delete delta files
+            CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+            LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
+            val errorMsg =
+              "Delete data operation is failed due to failure in creating delete delta file for " +
+                "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
+                resultOfBlock._2._1.getBlockName
+            executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+            executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
+
+            if (executorErrors.failureCauses == FailureCauses.NONE) {
+              executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+              executorErrors.errorMsg = errorMsg
+            }
+            LOGGER.error(errorMsg)
+            return
+          }
+        }
+      )
+      )
+
+      val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
+        .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
+
+
+
+      // this is delete flow so no need of putting timestamp in the status file.
+      if (CarbonUpdateUtil
+        .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) &&
+        CarbonUpdateUtil
+          .updateTableMetadataStatus(segmentDetails,
+            carbonTable,
+            timestamp,
+            !isUpdateOperation,
+            listOfSegmentToBeMarkedDeleted)
+      ) {
+        LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }")
+        LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }")
+      }
+      else {
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+
+        val errorMessage = "Delete data operation is failed due to failure " +
+          "in table status updation."
+        LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
+        LOGGER.error("Delete data operation is failed due to failure in table status updation.")
+        executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
+        executorErrors.errorMsg = errorMessage
+        // throw new Exception(errorMessage)
+      }
+    }
+
+    def deleteDeltaFunc(index: Int,
+                        key: String,
+                        iter: Iterator[Row],
+                        timestamp: String,
+                        rowCountDetailsVO: RowCountDetailsVO):
+    Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] = {
+
+      val result = new DeleteDelataResultImpl()
+      var deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+      val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+      // here key = segment/blockName
+      val blockName = CarbonUpdateUtil
+        .getBlockName(
+          CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
+      val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
+      var deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName)
+      val resultIter = new Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] {
+        val segmentUpdateDetails = new SegmentUpdateDetails()
+        var TID = ""
+        var countOfRows = 0
+        try {
+          while (iter.hasNext) {
+            val oneRow = iter.next
+            TID = oneRow
+              .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
+            val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
+            val blockletId = CarbonUpdateUtil
+              .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
+            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset)
+            // stop delete operation
+            if(!IsValidOffset) {
+              executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING
+              executorErrors.errorMsg = "Multiple input rows matched for same row."
+              throw new MultipleMatchingException("Multiple input rows matched for same row.")
+            }
+            countOfRows = countOfRows + 1
+          }
+
+          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath)
+          val completeBlockName = CarbonTablePath
+            .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
+              CarbonCommonConstants.FACT_FILE_EXT)
+          val deleteDeletaPath = CarbonUpdateUtil
+            .getDeleteDeltaFilePath(blockPath, blockName, timestamp)
+          val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeletaPath,
+            FileFactory.getFileType(deleteDeletaPath))
+
+
+
+          segmentUpdateDetails.setBlockName(blockName)
+          segmentUpdateDetails.setActualBlockName(completeBlockName)
+          segmentUpdateDetails.setSegmentName(segmentId)
+          segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
+          segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)
+
+          val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock
+          val totalDeletedRows: Long = alreadyDeletedRows + countOfRows
+          segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString)
+          if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) {
+            segmentUpdateDetails.setStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
+          }
+          else {
+            // write the delta file
+            carbonDeleteWriter.write(deleteDeltaBlockDetails)
+          }
+
+          deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+        } catch {
+          case e : MultipleMatchingException =>
+            LOGGER.audit(e.getMessage)
+            LOGGER.error(e.getMessage)
+          // dont throw exception here.
+          case e: Exception =>
+            val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
+            LOGGER.audit(errorMsg)
+            LOGGER.error(errorMsg + e.getMessage)
+            throw e
+        }
+
+
+        var finished = false
+
+        override def hasNext: Boolean = {
+          if (!finished) {
+            finished = true
+            finished
+          }
+          else {
+            !finished
+          }
+        }
+
+        override def next(): (String, (SegmentUpdateDetails, ExecutionErrors)) = {
+          finished = true
+          result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors))
+        }
+      }
+      resultIter
+    }
+    true
+  }
+}
+
+
+
+object UpdateExecution {
+
+  def performUpdate(dataFrame: DataFrame, tableIdentifier: Seq[String], plan: LogicalPlan,
+                    sqlContext: SQLContext, currentTime: Long, executorErrors: ExecutionErrors): Unit = {
+
+    def isDestinationRelation(relation: CarbonDatasourceRelation): Boolean = {
+
+      // Raghu Huawei IUD
+      val tableName = ""// relation.getTableName()
+      val dbName = ""// relation.getDatabaseName()
+      (tableIdentifier.size > 1 &&
+        tableIdentifier(0) == dbName &&
+        tableIdentifier(1) == tableName) ||
+        (tableIdentifier(0) == tableName)
+    }
+    def getHeader(relation: CarbonDatasourceRelation, plan: LogicalPlan): String = {
+      var header = ""
+      var found = false
+
+      plan match {
+        case Project(pList, _) if (!found) =>
+          found = true
+          header = pList
+            .filter(field => !field.name
+              .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+            .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) {
+              col.name
+                .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
+            }
+            else {
+              col.name
+            }).mkString(",")
+      }
+      header
+    }
+    val ex = dataFrame.queryExecution.analyzed
+    val res = ex find {
+      case relation: LogicalRelation if (relation.relation.isInstanceOf[CarbonDatasourceRelation] &&
+        isDestinationRelation(relation.relation
+          .asInstanceOf[CarbonDatasourceRelation])) =>
+        true
+      case _ => false
+    }
+    val carbonRelation: CarbonDatasourceRelation = res match {
+      case Some(relation: LogicalRelation) =>
+        relation.relation.asInstanceOf[CarbonDatasourceRelation]
+      case _ => sys.error("")
+    }
+
+    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
+
+    val header = getHeader(carbonRelation, plan)
+
+    // Raghu Huawei IUD
+//    LoadTable(
+//      Some(carbonRelation.getDatabaseName()),
+//      carbonRelation.getTableName(),
+//      null,
+//      Seq(),
+//      Map(("fileheader" -> header)),
+//      false,
+//      null,
+//      Some(dataFrame),
+//      Some(updateTableModel)).run(sqlContext)
+    // Raghu Huawei IUD end
+
+    executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
+    executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses
+
     Seq.empty
+
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
index 924d91a..3098b43 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -24,8 +24,9 @@ import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
 import org.apache.spark.sql.common.util.CarbonHiveContext._
 import org.apache.spark.sql.common.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -113,22 +114,26 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
     sql("clean files for table table2")
 
     // check for table 1.
-    val identifier1 = new AbsoluteTableIdentifier(
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+        AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr"))
+          new CarbonTableIdentifier("default", "table1", "rrr")
+        )
+    )
     // merged segment should not be there
-    val segments = SegmentStatusManager.getSegmentStatus(identifier1)
-        .getValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(!segments.contains("0"))
     assert(!segments.contains("1"))
     // check for table 2.
-    val identifier2 = new AbsoluteTableIdentifier(
+    val segmentStatusManager2: SegmentStatusManager = new SegmentStatusManager(new
+        AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1"))
+          new CarbonTableIdentifier("default", "table2", "rrr1")
+        )
+    )
     // merged segment should not be there
-    val segments2 = SegmentStatusManager.getSegmentStatus(identifier2)
-        .getValidSegments.asScala.toList
+    val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList
     assert(segments2.contains("0.1"))
     assert(!segments2.contains("0"))
     assert(!segments2.contains("1"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperations.java b/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperations.java
deleted file mode 100644
index 92dd825..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperations.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.lcm.fileoperations;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public interface AtomicFileOperations {
-
-  DataInputStream openForRead() throws IOException;
-
-  void close() throws IOException;
-
-  DataOutputStream openForWrite(FileWriteOperation operation) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperationsImpl.java b/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperationsImpl.java
deleted file mode 100644
index 6c56515..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperationsImpl.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.lcm.fileoperations;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
-
-public class AtomicFileOperationsImpl implements AtomicFileOperations {
-
-  private String filePath;
-
-  private FileType fileType;
-
-  private String tempWriteFilePath;
-
-  private DataOutputStream dataOutStream;
-
-  public AtomicFileOperationsImpl(String filePath, FileType fileType) {
-    this.filePath = filePath;
-
-    this.fileType = fileType;
-  }
-
-  @Override public DataInputStream openForRead() throws IOException {
-    return FileFactory.getDataInputStream(filePath, fileType);
-  }
-
-  @Override public DataOutputStream openForWrite(FileWriteOperation operation) throws IOException {
-
-    filePath = filePath.replace("\\", "/");
-
-    tempWriteFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
-
-    if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
-      FileFactory.getCarbonFile(tempWriteFilePath, fileType).delete();
-    }
-
-    FileFactory.createNewFile(tempWriteFilePath, fileType);
-
-    dataOutStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType);
-
-    return dataOutStream;
-
-  }
-
-  /* (non-Javadoc)
-   * @see com.huawei.unibi.carbon.datastorage.store.fileperations.AtomicFileOperations#close()
-   */
-  @Override public void close() throws IOException {
-
-    if (null != dataOutStream) {
-      dataOutStream.close();
-
-      CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-
-      if (!tempFile.renameForce(filePath)) {
-        throw new IOException("temporary file renaming failed, src="
-            + tempFile.getPath() + ", dest=" + filePath);
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/FileWriteOperation.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/FileWriteOperation.java b/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/FileWriteOperation.java
deleted file mode 100644
index 8c228b9..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/FileWriteOperation.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.lcm.fileoperations;
-
-public enum FileWriteOperation {
-
-  APPEND, OVERWRITE
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/AbstractCarbonLock.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/AbstractCarbonLock.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/AbstractCarbonLock.java
deleted file mode 100644
index 8054c41..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/AbstractCarbonLock.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.lcm.locks;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-/**
- * This is the abstract class of the lock implementations.This handles the
- * retrying part of the locking.
- */
-public abstract class AbstractCarbonLock implements ICarbonLock {
-  private int retryCount;
-
-  private int retryTimeout;
-
-  public abstract boolean lock();
-
-  /**
-   * API for enabling the locking of file with retries.
-   */
-  public boolean lockWithRetries() {
-    try {
-      for (int i = 0; i < retryCount; i++) {
-        if (lock()) {
-          return true;
-        } else {
-          Thread.sleep(retryTimeout * 1000L);
-        }
-      }
-    } catch (InterruptedException e) {
-      return false;
-    }
-    return false;
-  }
-
-  /**
-   * Initializes the retry count and retry timeout.
-   * This will determine how many times to retry to acquire lock and the retry timeout.
-   */
-  protected void initRetry() {
-    String retries = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK);
-    try {
-      retryCount = Integer.parseInt(retries);
-    } catch (NumberFormatException e) {
-      retryCount = CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK_DEFAULT;
-    }
-
-    String maxTimeout = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK);
-    try {
-      retryTimeout = Integer.parseInt(maxTimeout);
-    } catch (NumberFormatException e) {
-      retryTimeout = CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK_DEFAULT;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockFactory.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockFactory.java
deleted file mode 100644
index fe98c48..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.lcm.locks;
-
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-/**
- * This class is a Lock factory class which is used to provide lock objects.
- * Using this lock object client can request the lock and unlock.
- */
-public class CarbonLockFactory {
-
-  /**
-   * lockTypeConfigured to check if zookeeper feature is enabled or not for carbon.
-   */
-  private static String lockTypeConfigured;
-
-  static {
-    CarbonLockFactory.getLockTypeConfigured();
-  }
-
-  /**
-   * This method will determine the lock type.
-   *
-   * @param tableIdentifier
-   * @param lockFile
-   * @return
-   */
-  public static ICarbonLock getCarbonLockObj(CarbonTableIdentifier tableIdentifier,
-      String lockFile) {
-    switch (lockTypeConfigured.toUpperCase()) {
-      case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
-        return new LocalFileLock(tableIdentifier, lockFile);
-
-      case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
-        return new ZooKeeperLocking(tableIdentifier, lockFile);
-
-      case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
-        return new HdfsFileLock(tableIdentifier, lockFile);
-
-      default:
-        throw new UnsupportedOperationException("Not supported the lock type");
-    }
-  }
-
-  /**
-   *
-   * @param locFileLocation
-   * @param lockFile
-   * @return carbon lock
-   */
-  public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile) {
-    switch (lockTypeConfigured.toUpperCase()) {
-      case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
-        return new LocalFileLock(locFileLocation, lockFile);
-
-      case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
-        return new ZooKeeperLocking(locFileLocation, lockFile);
-
-      case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
-        return new HdfsFileLock(locFileLocation, lockFile);
-
-      default:
-        throw new UnsupportedOperationException("Not supported the lock type");
-    }
-  }
-
-  /**
-   * This method will set the zookeeper status whether zookeeper to be used for locking or not.
-   */
-  private static void getLockTypeConfigured() {
-    lockTypeConfigured = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.LOCK_TYPE_DEFAULT);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java
deleted file mode 100644
index 31ac8e5..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.lcm.locks;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-/**
- * This class contains all carbon lock utilities
- */
-public class CarbonLockUtil {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonLockUtil.class.getName());
-
-  /**
-   * unlocks given file
-   *
-   * @param carbonLock
-   */
-  public static void fileUnlock(ICarbonLock carbonLock, String locktype) {
-    if (carbonLock.unlock()) {
-      if (locktype.equals(LockUsage.METADATA_LOCK)) {
-        LOGGER.info("Metadata lock has been successfully released");
-      } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) {
-        LOGGER.info("Table status lock has been successfully released");
-      }
-      else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) {
-        LOGGER.info("Clean files lock has been successfully released");
-      }
-      else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) {
-        LOGGER.info("Delete segments lock has been successfully released");
-      }
-    } else {
-      if (locktype.equals(LockUsage.METADATA_LOCK)) {
-        LOGGER.error("Not able to release the metadata lock");
-      } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) {
-        LOGGER.error("Not able to release the table status lock");
-      }
-      else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) {
-        LOGGER.info("Not able to release the clean files lock");
-      }
-      else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) {
-        LOGGER.info("Not able to release the delete segments lock");
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/HdfsFileLock.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/HdfsFileLock.java
deleted file mode 100644
index 9770d7e..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/HdfsFileLock.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.lcm.locks;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * This class is used to handle the HDFS File locking.
- * This is acheived using the concept of acquiring the data out stream using Append option.
- */
-public class HdfsFileLock extends AbstractCarbonLock {
-
-  private static final LogService LOGGER =
-             LogServiceFactory.getLogService(HdfsFileLock.class.getName());
-  /**
-   * location hdfs file location
-   */
-  private String location;
-
-  private DataOutputStream dataOutputStream;
-
-  public static String tmpPath;
-
-  static {
-    Configuration conf = new Configuration(true);
-    String hdfsPath = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
-    // By default, we put the hdfs lock meta file for one table inside this table's store folder.
-    // If can not get the STORE_LOCATION, then use hadoop.tmp.dir .
-    tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION,
-               System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION));
-    if (!tmpPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
-      tmpPath = hdfsPath + tmpPath;
-    }
-  }
-
-  /**
-   * @param lockFileLocation
-   * @param lockFile
-   */
-  public HdfsFileLock(String lockFileLocation, String lockFile) {
-    this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation
-        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
-    LOGGER.info("HDFS lock path:"+this.location);
-    initRetry();
-  }
-
-  /**
-   * @param tableIdentifier
-   * @param lockFile
-   */
-  public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
-    this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
-        .getTableName(), lockFile);
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.carbondata.core.locks.ICarbonLock#lock()
-   */
-  @Override public boolean lock() {
-    try {
-      if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
-        FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
-      }
-      dataOutputStream =
-          FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location));
-
-      return true;
-
-    } catch (IOException e) {
-      return false;
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.carbondata.core.locks.ICarbonLock#unlock()
-   */
-  @Override public boolean unlock() {
-    if (null != dataOutputStream) {
-      try {
-        dataOutputStream.close();
-      } catch (IOException e) {
-        return false;
-      } finally {
-        if (FileFactory.getCarbonFile(location, FileFactory.getFileType(location)).delete()) {
-          LOGGER.info("Deleted the lock file " + location);
-        } else {
-          LOGGER.error("Not able to delete the lock file " + location);
-        }
-      }
-    }
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/ICarbonLock.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/ICarbonLock.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/ICarbonLock.java
deleted file mode 100644
index a9a7f77..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/ICarbonLock.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.lcm.locks;
-
-/**
- * Carbon Lock Interface which handles the locking and unlocking.
- */
-public interface ICarbonLock {
-
-  /**
-   * Does the unlocking of the acquired lock.
-   *
-   * @return
-   */
-  boolean unlock();
-
-  /**
-   * This will acquire the lock and if it doesnt get then it will retry after the confiured time.
-   *
-   * @return
-   */
-  boolean lockWithRetries();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/LocalFileLock.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/LocalFileLock.java
deleted file mode 100644
index f1103b1..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/LocalFileLock.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.lcm.locks;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-
-/**
- * This class handles the file locking in the local file system.
- * This will be handled using the file channel lock API.
- */
-public class LocalFileLock extends AbstractCarbonLock {
-  /**
-   * location is the location of the lock file.
-   */
-  private String location;
-
-  /**
-   * fileOutputStream of the local lock file
-   */
-  private FileOutputStream fileOutputStream;
-
-  /**
-   * channel is the FileChannel of the lock file.
-   */
-  private FileChannel channel;
-
-  /**
-   * fileLock NIO FileLock Object
-   */
-  private FileLock fileLock;
-
-  /**
-   * lock file
-   */
-  private String lockFile;
-
-  public static final String tmpPath;
-
-  private  String lockFilePath;
-
-  /**
-   * LOGGER for  logging the messages.
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(LocalFileLock.class.getName());
-
-  static {
-    tmpPath = System.getProperty("java.io.tmpdir");
-  }
-
-  /**
-   * @param lockFileLocation
-   * @param lockFile
-   */
-  public LocalFileLock(String lockFileLocation, String lockFile) {
-    this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation;
-    this.lockFile = lockFile;
-    initRetry();
-  }
-
-  /**
-   * @param tableIdentifier
-   * @param lockFile
-   */
-  public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
-    this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
-        .getTableName(), lockFile);
-    initRetry();
-  }
-
-  /**
-   * Lock API for locking of the file channel of the lock file.
-   *
-   * @return
-   */
-  @Override public boolean lock() {
-    try {
-      if (!FileFactory.isFileExist(location, FileFactory.getFileType(tmpPath))) {
-        FileFactory.mkdirs(location, FileFactory.getFileType(tmpPath));
-      }
-      lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR +
-          lockFile;
-      if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) {
-        FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location));
-      }
-
-      fileOutputStream = new FileOutputStream(lockFilePath);
-      channel = fileOutputStream.getChannel();
-      try {
-        fileLock = channel.tryLock();
-      } catch (OverlappingFileLockException e) {
-        return false;
-      }
-      if (null != fileLock) {
-        return true;
-      } else {
-        return false;
-      }
-    } catch (IOException e) {
-      return false;
-    }
-
-  }
-
-  /**
-   * Unlock API for unlocking of the acquired lock.
-   *
-   * @return
-   */
-  @Override public boolean unlock() {
-    boolean status;
-    try {
-      if (null != fileLock) {
-        fileLock.release();
-      }
-      status = true;
-    } catch (IOException e) {
-      status = false;
-    } finally {
-      if (null != fileOutputStream) {
-        try {
-          fileOutputStream.close();
-          // deleting the lock file after releasing the lock.
-          if (FileFactory.getCarbonFile(lockFilePath, FileFactory.getFileType(lockFilePath))
-              .delete()) {
-            LOGGER.info("Successfully deleted the lock file " + lockFilePath);
-          } else {
-            LOGGER.error("Not able to delete the lock file " + lockFilePath);
-          }
-        } catch (IOException e) {
-          LOGGER.error(e.getMessage());
-        }
-      }
-    }
-    return status;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
deleted file mode 100644
index 9b84042..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.lcm.locks;
-
-/**
- * This enum is used to define the usecase of the lock.
- * Each enum value is one specific lock case.
- */
-public class LockUsage {
-  public static final String LOCK = ".lock";
-  public static final String METADATA_LOCK = "meta.lock";
-  public static final String COMPACTION_LOCK = "compaction.lock";
-  public static final String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock";
-  public static final String TABLE_STATUS_LOCK = "tablestatus.lock";
-  public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock";
-  public static final String CLEAN_FILES_LOCK = "clean_files.lock";
-  public static final String DROP_TABLE_LOCK = "droptable.lock";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/ZooKeeperLocking.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/ZooKeeperLocking.java
deleted file mode 100644
index c45fbb0..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/ZooKeeperLocking.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.lcm.locks;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-
-/**
- * For Handling the zookeeper locking implementation
- */
-public class ZooKeeperLocking extends AbstractCarbonLock {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ZooKeeperLocking.class.getName());
-
-  /**
-   * zk is the zookeeper client instance
-   */
-  private static ZooKeeper zk;
-
-  /**
-   * zooKeeperLocation is the location in the zoo keeper file system where the locks will be
-   * maintained.
-   */
-  private static final String zooKeeperLocation = CarbonCommonConstants.ZOOKEEPER_LOCATION;
-
-  /**
-   * Unique folder for each table with DatabaseName_TableName
-   */
-  private final String tableIdFolder;
-
-  /**
-   * lockName is the name of the lock to use. This name should be same for every process that want
-   * to share the same lock
-   */
-  private String lockName;
-
-  /**
-   * lockPath is the unique path created for the each instance of the carbon lock.
-   */
-  private String lockPath;
-
-  private String lockTypeFolder;
-
-  public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
-    this(tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName(),
-        lockFile);
-  }
-
-  /**
-   * @param lockLocation
-   * @param lockFile
-   */
-  public ZooKeeperLocking(String lockLocation, String lockFile) {
-    this.lockName = lockFile;
-    this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation;
-
-    String zooKeeperUrl =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ZOOKEEPER_URL);
-    zk = ZookeeperInit.getInstance(zooKeeperUrl).getZookeeper();
-
-    this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation
-        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
-    try {
-      createBaseNode();
-      // if exists returns null then path doesnt exist. so creating.
-      if (null == zk.exists(this.tableIdFolder, true)) {
-        createRecursivly(this.tableIdFolder);
-      }
-      // if exists returns null then path doesnt exist. so creating.
-      if (null == zk.exists(this.lockTypeFolder, true)) {
-        zk.create(this.lockTypeFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      }
-    } catch (KeeperException | InterruptedException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    initRetry();
-  }
-
-  /**
-   * Creating a znode in which all the znodes (lock files )are maintained.
-   */
-  private void createBaseNode() throws KeeperException, InterruptedException {
-    if (null == zk.exists(zooKeeperLocation, true)) {
-      // creating a znode in which all the znodes (lock files )are maintained.
-      zk.create(zooKeeperLocation, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    }
-  }
-
-  /**
-   * Create zookeepr node if not exist
-   * @param path
-   * @throws KeeperException
-   * @throws InterruptedException
-   */
-  private void createRecursivly(String path) throws KeeperException, InterruptedException {
-    try {
-      if (zk.exists(path, true) == null && path.length() > 0) {
-        String temp = path.substring(0, path.lastIndexOf(File.separator));
-        createRecursivly(temp);
-        zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      } else {
-        return;
-      }
-    } catch (KeeperException e) {
-      throw e;
-    } catch (InterruptedException e) {
-      throw e;
-    }
-
-  }
-  /**
-   * Handling of the locking mechanism using zoo keeper.
-   */
-  @Override public boolean lock() {
-    try {
-      // create the lock file with lockName.
-      lockPath =
-          zk.create(this.lockTypeFolder + CarbonCommonConstants.FILE_SEPARATOR + lockName, null,
-              Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
-      // get the children present in zooKeeperLocation.
-      List<String> nodes = zk.getChildren(this.lockTypeFolder, null);
-
-      // sort the childrens
-      Collections.sort(nodes);
-
-      // here the logic is , for each lock request zookeeper will create a file ending with
-      // incremental digits.
-      // so first request will be 00001 next is 00002 and so on.
-      // if the current request is 00002 and already one previous request(00001) is present then get
-      // children will give both nodes.
-      // after the sort we are checking if the lock path is first or not .if it is first then lock
-      // has been acquired.
-
-      if (lockPath.endsWith(nodes.get(0))) {
-        return true;
-      } else {
-        // if locking failed then deleting the created lock as next time again new lock file will be
-        // created.
-        zk.delete(lockPath, -1);
-        return false;
-      }
-    } catch (KeeperException | InterruptedException e) {
-      LOGGER.error(e, e.getMessage());
-      return false;
-    }
-  }
-
-  /**
-   * @return status where lock file is unlocked or not.
-   */
-  @Override public boolean unlock() {
-    try {
-      // exists will return null if the path doesn't exists.
-      if (null != zk.exists(lockPath, true)) {
-        zk.delete(lockPath, -1);
-        lockPath = null;
-      }
-    } catch (KeeperException | InterruptedException e) {
-      LOGGER.error(e, e.getMessage());
-      return false;
-    }
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/ZookeeperInit.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/ZookeeperInit.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/ZookeeperInit.java
deleted file mode 100644
index 03827b6..0000000
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/ZookeeperInit.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.lcm.locks;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-/**
- * This is a singleton class for initialization of zookeeper client.
- */
-public class ZookeeperInit {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ZookeeperInit.class.getName());
-
-  private static ZookeeperInit zooKeeperInit;
-  /**
-   * zk is the zookeeper client instance
-   */
-  private ZooKeeper zk;
-
-  private ZookeeperInit(String zooKeeperUrl) {
-
-    int sessionTimeOut = 100000;
-    try {
-      zk = new ZooKeeper(zooKeeperUrl, sessionTimeOut, new DummyWatcher());
-
-    } catch (IOException e) {
-      LOGGER.error(e.getMessage());
-    }
-
-  }
-
-  public static ZookeeperInit getInstance(String zooKeeperUrl) {
-
-    if (null == zooKeeperInit) {
-      synchronized (ZookeeperInit.class) {
-        if (null == zooKeeperInit) {
-          LOGGER.info("Initiating Zookeeper client.");
-          zooKeeperInit = new ZookeeperInit(zooKeeperUrl);
-        }
-      }
-    }
-    return zooKeeperInit;
-
-  }
-
-  public static ZookeeperInit getInstance() {
-    return zooKeeperInit;
-  }
-
-  public ZooKeeper getZookeeper() {
-    return zk;
-  }
-
-  private static class DummyWatcher implements Watcher {
-    public void process(WatchedEvent event) {
-    }
-  }
-}


Mime
View raw message