carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: 1. implemented system level compaction lock feature. 2. made the DDL call to compaction as blocking call. 3. handled the reqests which are received during one compaction and taking those requests later. 4. Supported
Date Thu, 01 Sep 2016 09:39:45 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master ff7a8382e -> 4d490793a


1. implemented system level compaction lock feature.
2. made the DDL call to compaction as blocking call.
3. handled the reqests which are received during one compaction and taking those requests
later.
4. Supported property "carbon.concurrent.compaction" default to false


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ffbb3596
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ffbb3596
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ffbb3596

Branch: refs/heads/master
Commit: ffbb35964dd5246c59ffc5b26d5239d9791448cf
Parents: ff7a838
Author: ravikiran <ravikiran.sn042@gmail.com>
Authored: Wed Aug 31 22:10:44 2016 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Thu Sep 1 15:08:14 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  27 +
 .../spark/merger/CarbonCompactionUtil.java      | 138 ++++++
 .../spark/merger/CarbonDataMergerUtil.java      |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 491 ++++++++++++++-----
 .../execution/command/carbonTableSchema.scala   |   3 +-
 .../CompactionSystemLockFeatureTest.scala       | 147 ++++++
 .../apache/carbondata/lcm/locks/LockUsage.java  |   7 +-
 7 files changed, 690 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ffbb3596/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c4b412e..6050719 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -904,6 +904,33 @@ public final class CarbonCommonConstants {
    */
   public static final int DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE = 2;
 
+  /**
+   * File created in case of minor compaction request
+   */
+  public static String minorCompactionRequiredFile = "compactionRequired_minor";
+
+  /**
+   * File created in case of major compaction request
+   */
+  public static String majorCompactionRequiredFile = "compactionRequired_major";
+
+  /**
+   * Property for enabling system level compaction lock.1 compaction can run at once.
+   */
+  public static String ENABLE_CONCURRENT_COMPACTION =
+      "carbon.concurrent.compaction";
+
+  /**
+   * Default value of Property for enabling system level compaction lock.1 compaction can
run
+   * at once.
+   */
+  public static String DEFAULT_ENABLE_CONCURRENT_COMPACTION = "false";
+
+  /**
+   * Compaction system level lock folder.
+   */
+  public static String SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER = "SystemCompactionLock";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ffbb3596/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
index 8c4836f..b7cc620 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
@@ -18,25 +18,35 @@
  */
 package org.apache.carbondata.integration.spark.merger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
 
+import org.apache.spark.sql.hive.TableMeta;
+
 /**
  * Utility Class for the Compaction Flow.
  */
 public class CarbonCompactionUtil {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
+
   /**
    * To create a mapping of Segment Id and TableBlockInfo.
    *
@@ -129,4 +139,132 @@ public class CarbonCompactionUtil {
 
   }
 
+  /**
+   * Check whether the file to indicate the compaction is present or not.
+   * @param metaFolderPath
+   * @return
+   */
+  public static boolean isCompactionRequiredForTable(String metaFolderPath) {
+    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.minorCompactionRequiredFile;
+
+    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.majorCompactionRequiredFile;
+    try {
+      if (FileFactory.isFileExist(minorCompactionStatusFile,
+          FileFactory.getFileType(minorCompactionStatusFile)) || FileFactory
+          .isFileExist(majorCompactionStatusFile,
+              FileFactory.getFileType(majorCompactionStatusFile))) {
+        return true;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception in isFileExist compaction request file " + e.getMessage() );
+    }
+    return false;
+  }
+
+  /**
+   * Determine the type of the compaction received.
+   * @param metaFolderPath
+   * @return
+   */
+  public static CompactionType determineCompactionType(String metaFolderPath) {
+    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.minorCompactionRequiredFile;
+
+    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.majorCompactionRequiredFile;
+    try {
+      if (FileFactory.isFileExist(minorCompactionStatusFile,
+          FileFactory.getFileType(minorCompactionStatusFile))) {
+        return CompactionType.MINOR_COMPACTION;
+      }
+      if (FileFactory.isFileExist(majorCompactionStatusFile,
+          FileFactory.getFileType(majorCompactionStatusFile))) {
+        return CompactionType.MAJOR_COMPACTION;
+      }
+
+    } catch (IOException e) {
+      LOGGER.error("Exception in determining the compaction request file " + e.getMessage()
);
+    }
+    return CompactionType.MINOR_COMPACTION;
+  }
+
+  /**
+   * Delete the compation request file once the compaction is done.
+   * @param metaFolderPath
+   * @param compactionType
+   * @return
+   */
+  public static boolean deleteCompactionRequiredFile(String metaFolderPath,
+      CompactionType compactionType) {
+    String statusFile;
+    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.minorCompactionRequiredFile;
+    } else {
+      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.majorCompactionRequiredFile;
+    }
+    try {
+      if (FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) {
+        if (FileFactory.getCarbonFile(statusFile, FileFactory.getFileType(statusFile)).delete())
{
+          LOGGER.info("Deleted the compaction request file " + statusFile);
+        } else {
+          LOGGER.error("Unable to delete the compaction request file " + statusFile);
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception in deleting the compaction request file " + e.getMessage()
);
+    }
+    return false;
+  }
+
+  /**
+   * Creation of the compaction request if someother compaction is in progress.
+   * @param metaFolderPath
+   * @param compactionType
+   * @return
+   */
+  public static boolean createCompactionRequiredFile(String metaFolderPath,
+      CompactionType compactionType) {
+    String statusFile;
+    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.minorCompactionRequiredFile;
+    } else {
+      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.majorCompactionRequiredFile;
+    }
+    try {
+      if (!FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) {
+        if (FileFactory.createNewFile(statusFile, FileFactory.getFileType(statusFile))) {
+          LOGGER.info("successfully created a compaction required file - " + statusFile);
+          return true;
+        } else {
+          LOGGER.error("Not able to create a compaction required file - " + statusFile);
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception in creating the compaction request file " + e.getMessage()
);
+    }
+    return false;
+  }
+
+  /**
+   * This will check if any compaction request has been received for any table.
+   * @param tableMetas
+   * @return
+   */
+  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas) {
+    for (TableMeta table : tableMetas) {
+      CarbonTable ctable = table.carbonTable();
+      String metadataPath = ctable.getMetaDataFilepath();
+      if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath)) {
+        return table;
+      }
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ffbb3596/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 6e77e9e..32a91de 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -707,7 +707,7 @@ public final class CarbonDataMergerUtil {
    * @return
    */
   public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
-      List<LoadMetadataDetails> segments, List<LoadMetadataDetails> loadsToMerge,
+      List<LoadMetadataDetails> segments,
       LoadMetadataDetails lastSeg) {
 
     // take complete list of segments.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ffbb3596/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f1fc6d4..bf4fa9f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -29,7 +29,7 @@ import scala.util.control.Breaks._
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileSplit
-import org.apache.spark.{Logging, Partition, SparkContext, SparkEnv, SparkException}
+import org.apache.spark.{util => _, _}
 import org.apache.spark.sql.{CarbonEnv, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
CompactionModel, Partitioner}
 import org.apache.spark.sql.hive.DistributionUtil
@@ -42,8 +42,8 @@ import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.integration.spark.merger.{CompactionCallable, CompactionType}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable,
CompactionType}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.etl.DataLoadingException
@@ -54,7 +54,6 @@ import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
 
-
 /**
  * This is the factory class which can create different RDD depends on user needs.
  *
@@ -269,45 +268,258 @@ object CarbonDataRDDFactory extends Logging {
     val loadStartTime = CarbonLoaderUtil.readCurrentTime()
     carbonLoadModel.setFactTimeStamp(loadStartTime)
 
+    val isCompactionTriggerByDDl = true
     val compactionModel = CompactionModel(compactionSize,
       compactionType,
       carbonTable,
-      tableCreationTime
+      tableCreationTime,
+      isCompactionTriggerByDDl
     )
 
-    val lock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        LockUsage.COMPACTION_LOCK
+    val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+        CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
       )
+      .equalsIgnoreCase("true")
 
-    if (lock.lockWithRetries()) {
+    // if system level compaction is enabled then only one compaction can run in the system
+    // if any other request comes at this time then it will create a compaction request file.
+    // so that this will be taken up by the compaction process which is executing.
+    if (!isConcurrentCompactionAllowed) {
       logger
-        .info("Acquired the compaction lock for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName
+        .info("System level compaction lock is enabled."
         )
-      startCompactionThreads(sqlContext,
+      handleCompactionForSystemLocking(sqlContext,
         carbonLoadModel,
         partitioner,
         hdfsStoreLocation,
         kettleHomePath,
         storeLocation,
-        compactionModel,
-        lock
+        compactionType,
+        carbonTable,
+        compactionModel
       )
     }
     else {
+      // normal flow of compaction
+      val lock = CarbonLockFactory
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+          LockUsage.COMPACTION_LOCK
+        )
+
+      if (lock.lockWithRetries()) {
+        logger
+          .info("Acquired the compaction lock for table " + carbonLoadModel
+            .getDatabaseName + "." + carbonLoadModel.getTableName
+          )
+        try {
+          startCompactionThreads(sqlContext,
+            carbonLoadModel,
+            partitioner,
+            hdfsStoreLocation,
+            kettleHomePath,
+            storeLocation,
+            compactionModel,
+            lock
+          )
+        }
+        catch {
+          case e : Exception =>
+            logger.error("Exception in start compaction thread. " + e.getMessage)
+            lock.unlock()
+        }
+      }
+      else {
+        logger
+          .audit("Not able to acquire the compaction lock for table " +
+            s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
+          )
+        logger
+          .error("Not able to acquire the compaction lock for table " + carbonLoadModel
+            .getDatabaseName + "." + carbonLoadModel.getTableName
+          )
+        sys.error("Table is already locked for compaction. Please try after some time.")
+      }
+    }
+  }
+
+  def handleCompactionForSystemLocking(sqlContext: SQLContext,
+    carbonLoadModel: CarbonLoadModel,
+    partitioner: Partitioner,
+    hdfsStoreLocation: String,
+    kettleHomePath: String,
+    storeLocation: String,
+    compactionType: CompactionType,
+    carbonTable: CarbonTable,
+    compactionModel: CompactionModel): Unit = {
+    val lock = CarbonLockFactory
+      .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
+        LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
+      )
+    if (lock.lockWithRetries()) {
       logger
-        .audit("Not able to acquire the compaction lock for table " +
+        .info("Acquired the compaction lock for table " + carbonLoadModel
+          .getDatabaseName + "." + carbonLoadModel.getTableName
+        )
+      try {
+        startCompactionThreads(sqlContext,
+          carbonLoadModel,
+          partitioner,
+          hdfsStoreLocation,
+          kettleHomePath,
+          storeLocation,
+          compactionModel,
+          lock
+        )
+      }
+      catch {
+        case e : Exception =>
+          logger.error("Exception in start compaction thread. " + e.getMessage)
+          lock.unlock()
+      }
+    }
+    else {
+      logger
+        .audit("Not able to acquire the system level compaction lock for table " +
           s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
         )
       logger
         .error("Not able to acquire the compaction lock for table " + carbonLoadModel
           .getDatabaseName + "." + carbonLoadModel.getTableName
         )
-      sys.error("Table is already locked for compaction. Please try after some time.")
+      CarbonCompactionUtil
+        .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
+      // do sys error only in case of DDL trigger.
+      if(compactionModel.isDDLTrigger) {
+        sys.error("Compaction is in progress, compaction request for table " + carbonLoadModel
+          .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue.")
+      }
+      else {
+        logger
+          .error("Compaction is in progress, compaction request for table " + carbonLoadModel
+            .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue."
+          )
+      }
     }
   }
 
+  def executeCompaction(carbonLoadModel: CarbonLoadModel,
+    hdfsStoreLocation: String,
+    compactionModel: CompactionModel,
+    partitioner: Partitioner,
+    executor: ExecutorService,
+    sqlContext: SQLContext,
+    kettleHomePath: String,
+    storeLocation: String): Unit = {
+    val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
+      carbonLoadModel.getLoadMetadataDetails
+    )
+    CarbonDataMergerUtil.sortSegments(sortedSegments)
+    val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
+    var segList = carbonLoadModel.getLoadMetadataDetails
+    var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+      hdfsStoreLocation,
+      carbonLoadModel,
+      partitioner.partitionCount,
+      compactionModel.compactionSize,
+      segList,
+      compactionModel.compactionType
+    )
+    while (loadsToMerge.size() > 1) {
+      deletePartialLoadsInCompaction(carbonLoadModel)
+      val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+        CarbonCommonConstants
+          .DEFAULT_COLLECTION_SIZE
+      )
+
+      scanSegmentsAndSubmitJob(futureList,
+        loadsToMerge,
+        executor,
+        hdfsStoreLocation,
+        sqlContext,
+        compactionModel,
+        kettleHomePath,
+        carbonLoadModel,
+        partitioner,
+        storeLocation
+      )
+
+      try {
+
+        futureList.asScala.foreach(future => {
+          future.get
+        }
+        )
+      }
+      catch {
+        case e: Exception =>
+          logger.error("Exception in compaction thread " + e.getMessage)
+          throw e
+      }
+
+
+      // scan again and determine if anything is there to merge again.
+      readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
+      segList = carbonLoadModel.getLoadMetadataDetails
+      // in case of major compaction we will scan only once and come out as it will keep
+      // on doing major for the new loads also.
+      // excluding the newly added segments.
+      if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
+
+        segList = CarbonDataMergerUtil
+          .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
+      }
+      loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+        hdfsStoreLocation,
+        carbonLoadModel,
+        partitioner.partitionCount,
+        compactionModel.compactionSize,
+        segList,
+        compactionModel.compactionType
+      )
+    }
+  }
+  /**
+   * This will submit the loads to be merged into the executor.
+   *
+   * @param futureList
+   */
+  def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
+    loadsToMerge: util
+    .List[LoadMetadataDetails],
+    executor: ExecutorService,
+    hdfsStoreLocation: String,
+    sqlContext: SQLContext,
+    compactionModel: CompactionModel,
+    kettleHomePath: String,
+    carbonLoadModel: CarbonLoadModel,
+    partitioner: Partitioner,
+    storeLocation: String): Unit = {
+
+    loadsToMerge.asScala.foreach(seg => {
+      logger.info("loads identified for merge is " + seg.getLoadName)
+    }
+    )
+
+    val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
+      carbonLoadModel,
+      partitioner,
+      storeLocation,
+      compactionModel.carbonTable,
+      kettleHomePath,
+      compactionModel.tableCreationTime,
+      loadsToMerge,
+      sqlContext,
+      compactionModel.compactionType
+    )
+
+    val future: Future[Void] = executor
+      .submit(new CompactionCallable(compactionCallableModel
+      )
+      )
+    futureList.add(future)
+  }
+
   def startCompactionThreads(sqlContext: SQLContext,
     carbonLoadModel: CarbonLoadModel,
     partitioner: Partitioner,
@@ -333,60 +545,82 @@ object CarbonDataRDDFactory extends Logging {
           )
     }
 
-    var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-      hdfsStoreLocation,
-      carbonLoadModel,
-      partitioner.partitionCount,
-      compactionModel.compactionSize,
-      segList,
-      compactionModel.compactionType
-    )
-
-    if (loadsToMerge.size() > 1) {
-
-      val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
-        segList
-      )
-      CarbonDataMergerUtil.sortSegments(sortedSegments)
-      val lastSegment = sortedSegments.get(sortedSegments.size()-1)
-
-      new Thread {
+      val compactionThread = new Thread {
         override def run(): Unit = {
 
           try {
-            while (loadsToMerge.size() > 1) {
-              deletePartialLoadsInCompaction(carbonLoadModel)
-              val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
-                CarbonCommonConstants
-                  .DEFAULT_COLLECTION_SIZE
-              )
-
-              scanSegmentsAndSubmitJob(futureList, loadsToMerge)
-
-              futureList.asScala.foreach(future => {
-                future.get
-              }
-              )
-
-              // scan again and determine if anything is there to merge again.
-              readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
-              segList = carbonLoadModel.getLoadMetadataDetails
-              // in case of major compaction we will scan only once and come out as it will
keep
-              // on doing major for the new loads also.
-              // excluding the newly added segments.
-              if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
-
-                segList = CarbonDataMergerUtil
-                  .filterOutNewlyAddedSegments(segList, loadsToMerge, lastSegment)
+            executeCompaction(carbonLoadModel: CarbonLoadModel,
+              hdfsStoreLocation: String,
+              compactionModel: CompactionModel,
+              partitioner: Partitioner,
+              executor, sqlContext, kettleHomePath, storeLocation
+            )
+            // check for all the tables.
+            val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+              ).equalsIgnoreCase("true")
+
+            if (!isConcurrentCompactionAllowed) {
+              logger.info("System level compaction lock is enabled.")
+              var tableForCompaction = CarbonCompactionUtil
+                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+                  .tablesMeta.toArray
+                )
+              while(null != tableForCompaction) {
+                logger
+                  .info("Compaction request has been identified for table " + tableForCompaction
+                    .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
+                    .getTableName
+                  )
+                val table: CarbonTable = tableForCompaction.carbonTable
+                val metadataPath = table.getMetaDataFilepath
+                val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
+
+                val newCarbonLoadModel = new CarbonLoadModel()
+                prepareCarbonLoadModel(hdfsStoreLocation, table, newCarbonLoadModel)
+                val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+                    newCarbonLoadModel.getTableName
+                  )
+
+                val compactionSize = CarbonDataMergerUtil
+                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+
+                val newcompactionModel = CompactionModel(compactionSize,
+                  compactionType,
+                  table,
+                  tableCreationTime,
+                  compactionModel.isDDLTrigger
+                )
+                // proceed for compaction
+                try {
+                  executeCompaction(newCarbonLoadModel,
+                    newCarbonLoadModel.getStorePath,
+                    newcompactionModel,
+                    partitioner,
+                    executor, sqlContext, kettleHomePath, storeLocation
+                  )
+                }
+                finally {
+                  // delete the compaction required file
+                  if (!CarbonCompactionUtil
+                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+                    logger
+                      .error("Compaction request file can not be deleted for table " +
+                        tableForCompaction
+                        .carbonTable.getDatabaseName + "." + tableForCompaction
+                        .carbonTableIdentifier
+                        .getTableName
+                      )
+                  }
+                }
+                // ********* check again for all the tables.
+                tableForCompaction = CarbonCompactionUtil
+                  .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+                    .tablesMeta.toArray
+                  )
               }
-              loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-                hdfsStoreLocation,
-                carbonLoadModel,
-                partitioner.partitionCount,
-                compactionModel.compactionSize,
-                segList,
-                compactionModel.compactionType
-              )
             }
           }
           catch {
@@ -400,43 +634,31 @@ object CarbonDataRDDFactory extends Logging {
             compactionLock.unlock()
           }
         }
-      }.start
-    }
-    else {
-      compactionLock.unlock()
-    }
-
-    /**
-     * This will submit the loads to be merged into the executor.
-      *
-      * @param futureList
-     */
-    def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]], loadsToMerge: util
-    .List[LoadMetadataDetails]): Unit = {
-
-      loadsToMerge.asScala.foreach(seg => {
-        logger.info("loads identified for merge is " + seg.getLoadName)
       }
-      )
-
-      val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
-        carbonLoadModel,
-        partitioner,
-        storeLocation,
-        compactionModel.carbonTable,
-        kettleHomePath,
-        compactionModel.tableCreationTime,
-        loadsToMerge,
-        sqlContext,
-        compactionModel.compactionType
-      )
+      if(compactionModel.isDDLTrigger) {
+        // making this an blocking call for DDL
+        compactionThread.run()
+      }
+      else {
+        // non blocking call in case of auto compaction.
+        compactionThread.start()
+      }
+  }
 
-      val future: Future[Void] = executor
-        .submit(new CompactionCallable(compactionCallableModel
-        )
-        )
-      futureList.add(future)
-    }
+  def prepareCarbonLoadModel(hdfsStoreLocation: String,
+    table: CarbonTable,
+    newCarbonLoadModel: CarbonLoadModel): Unit = {
+    newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
+    newCarbonLoadModel.setTableName(table.getFactTableName)
+    val dataLoadSchema = new CarbonDataLoadSchema(table)
+    // Need to fill dimension relation
+    newCarbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+    newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
+    newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
+    newCarbonLoadModel.setStorePath(table.getStorePath)
+    readLoadMetadataDetails(newCarbonLoadModel, hdfsStoreLocation)
+    val loadStartTime = CarbonLoaderUtil.readCurrentTime()
+    newCarbonLoadModel.setFactTimeStamp(loadStartTime)
   }
 
   def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
@@ -478,17 +700,13 @@ object CarbonDataRDDFactory extends Logging {
             .getDatabaseName + "." + carbonLoadModel.getTableName
           )
         val compactionSize = 0
-
+        val isCompactionTriggerByDDl = false
         val compactionModel = CompactionModel(compactionSize,
           CompactionType.MINOR_COMPACTION,
           carbonTable,
-          tableCreationTime
+          tableCreationTime,
+          isCompactionTriggerByDDl
         )
-        val lock = CarbonLockFactory
-          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-            LockUsage.COMPACTION_LOCK
-          )
-
         var storeLocation = ""
         val configuredStore = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
         if (null != configuredStore && configuredStore.length > 0) {
@@ -499,27 +717,60 @@ object CarbonDataRDDFactory extends Logging {
         }
         storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
-        if (lock.lockWithRetries()) {
-          logger.info("Acquired the compaction lock.")
-          startCompactionThreads(sqlContext,
+        val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+            CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+          )
+          .equalsIgnoreCase("true")
+
+        if (!isConcurrentCompactionAllowed) {
+
+          handleCompactionForSystemLocking(sqlContext,
             carbonLoadModel,
             partitioner,
             hdfsStoreLocation,
             kettleHomePath,
             storeLocation,
-            compactionModel,
-            lock
+            CompactionType.MINOR_COMPACTION,
+            carbonTable,
+            compactionModel
           )
         }
         else {
-          logger
-            .audit("Not able to acquire the compaction lock for table " + carbonLoadModel
-              .getDatabaseName + "." + carbonLoadModel.getTableName
-            )
-          logger
-            .error("Not able to acquire the compaction lock for table " + carbonLoadModel
-              .getDatabaseName + "." + carbonLoadModel.getTableName
+          val lock = CarbonLockFactory
+            .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+              LockUsage.COMPACTION_LOCK
             )
+
+          if (lock.lockWithRetries()) {
+            logger.info("Acquired the compaction lock.")
+            try {
+              startCompactionThreads(sqlContext,
+                carbonLoadModel,
+                partitioner,
+                hdfsStoreLocation,
+                kettleHomePath,
+                storeLocation,
+                compactionModel,
+                lock
+              )
+            }
+            catch {
+              case e : Exception =>
+                logger.error("Exception in start compaction thread. " + e.getMessage)
+                lock.unlock()
+            }
+          }
+          else {
+            logger
+              .audit("Not able to acquire the compaction lock for table " + carbonLoadModel
+                .getDatabaseName + "." + carbonLoadModel.getTableName
+              )
+            logger
+              .error("Not able to acquire the compaction lock for table " + carbonLoadModel
+                .getDatabaseName + "." + carbonLoadModel.getTableName
+              )
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ffbb3596/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index a0a9c99..1e06165 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -158,7 +158,8 @@ case class AlterTableModel(dbName: Option[String], tableName: String,
 case class CompactionModel(compactionSize: Long,
   compactionType: CompactionType,
   carbonTable: CarbonTable,
-  tableCreationTime: Long)
+  tableCreationTime: Long,
+  isDDLTrigger: Boolean)
 
 case class CompactionCallableModel(hdfsStoreLocation: String, carbonLoadModel: CarbonLoadModel,
   partitioner: Partitioner, storeLocation: String, carbonTable: CarbonTable, kettleHomePath:
String,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ffbb3596/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
new file mode 100644
index 0000000..ae29650
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import java.io.File
+
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import scala.collection.JavaConverters._
+
+/**
+  * FT for compaction scenario where major segment should not be included in minor.
+  */
+class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    sql("drop table if exists  table1")
+    sql("drop table if exists  table2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    sql(
+      "CREATE TABLE IF NOT EXISTS table1 (country String, ID Int, date Timestamp, name "
+
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata"
+
+        ".format'"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS table2 (country String, ID Int, date Timestamp, name "
+
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata"
+
+        ".format'"
+    )
+
+
+    val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+      .getCanonicalPath
+    val csvFilePath1 = currentDirectory + "/src/test/resources/compaction/compaction1.csv"
+
+    val csvFilePath2 = currentDirectory + "/src/test/resources/compaction/compaction2.csv"
+    val csvFilePath3 = currentDirectory + "/src/test/resources/compaction/compaction3.csv"
+
+    // load table1
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE table1 OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE table1  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+
+    // load table2
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE table2 OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE table2  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+
+    // create  a file in table 2 so that it will also be compacted.
+    val absoluteTableIdentifier = new
+        AbsoluteTableIdentifier(
+          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+          new CarbonTableIdentifier("default", "table2", "rrr")
+        )
+    val carbonTablePath: CarbonTablePath = CarbonStorePath
+      .getCarbonTablePath(absoluteTableIdentifier.getStorePath,
+        absoluteTableIdentifier.getCarbonTableIdentifier
+      )
+
+    val file = carbonTablePath.getMetadataDirectoryPath + CarbonCommonConstants
+      .FILE_SEPARATOR + CarbonCommonConstants.majorCompactionRequiredFile
+
+    FileFactory.createNewFile(file, FileFactory.getFileType(file))
+
+    // compaction will happen here.
+    sql("alter table table1 compact 'major'"
+    )
+
+  }
+
+  /**
+    * Test whether major compaction is done for both.
+    */
+  test("check for compaction in both tables") {
+    // delete merged segments
+    sql("clean files for table table1")
+    sql("clean files for table table2")
+
+    // check for table 1.
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+        AbsoluteTableIdentifier(
+          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+          new CarbonTableIdentifier("default", "table1", "rrr")
+        )
+    )
+    // merged segment should not be there
+    val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+    assert(segments.contains("0.1"))
+    assert(!segments.contains("0"))
+    assert(!segments.contains("1"))
+    // check for table 2.
+    val segmentStatusManager2: SegmentStatusManager = new SegmentStatusManager(new
+        AbsoluteTableIdentifier(
+          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+          new CarbonTableIdentifier("default", "table2", "rrr1")
+        )
+    )
+    // merged segment should not be there
+    val segments2 = segmentStatusManager2.getValidSegments.listOfValidSegments.asScala.toList
+    assert(segments2.contains("0.1"))
+    assert(!segments2.contains("0"))
+    assert(!segments2.contains("1"))
+
+  }
+
+  override def afterAll {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql("drop table if exists  table1")
+    sql("drop table if exists  table2")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ffbb3596/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
index a03652d..4c604fd 100644
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
+++ b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
@@ -24,8 +24,9 @@ package org.apache.carbondata.lcm.locks;
  */
 public class LockUsage {
   public static String LOCK = ".lock";
-  public static String METADATA_LOCK="meta.lock";
-  public static String COMPACTION_LOCK="compaction.lock";
-  public static String TABLE_STATUS_LOCK="tablestatus.lock";
+  public static String METADATA_LOCK = "meta.lock";
+  public static String COMPACTION_LOCK = "compaction.lock";
+  public static String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock";
+  public static String TABLE_STATUS_LOCK = "tablestatus.lock";
 
 }



Mime
View raw message