carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject carbondata git commit: [CARBONDATA-1698]Adding support for table level compaction configuration
Date Thu, 21 Dec 2017 09:08:05 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 0c0f90ca7 -> f1c6dddec


[CARBONDATA-1698]Adding support for table level compaction configuration

Adding support for table level compaction configuration

This closes #1575


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f1c6ddde
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f1c6ddde
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f1c6ddde

Branch: refs/heads/master
Commit: f1c6dddec8d7057dfc1a1f70555faec4c6184a52
Parents: 0c0f90c
Author: Jin Zhou <xaprice@yeah.net>
Authored: Fri Dec 15 19:53:41 2017 +0800
Committer: chenliang613 <chenliang613@huawei.com>
Committed: Thu Dec 21 17:07:44 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  14 +-
 .../carbondata/core/util/CarbonProperties.java  |  10 +-
 docs/data-management-on-carbondata.md           |  27 +-
 .../TableLevelCompactionOptionExample.scala     | 106 +++++++
 .../TestCreateTableWithCompactionOptions.scala  | 198 ++++++++++++++
 ...CompactionSupportGlobalSortBigFileTest.scala |   4 +-
 .../TableLevelCompactionOptionTest.scala        | 274 +++++++++++++++++++
 .../carbondata/spark/util/CommonUtil.scala      | 119 ++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   7 +-
 .../CarbonAlterTableCompactionCommand.scala     |   3 +-
 .../table/CarbonDescribeFormattedCommand.scala  |  32 +++
 .../processing/merger/CarbonDataMergerUtil.java | 107 +++++---
 13 files changed, 854 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 9534099..71ab668 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -789,12 +789,12 @@ public final class CarbonCommonConstants {
    * Size of Major Compaction in MBs
    */
   @CarbonProperty
-  public static final String MAJOR_COMPACTION_SIZE = "carbon.major.compaction.size";
+  public static final String CARBON_MAJOR_COMPACTION_SIZE = "carbon.major.compaction.size";
 
   /**
    * By default size of major compaction in MBs.
    */
-  public static final String DEFAULT_MAJOR_COMPACTION_SIZE = "1024";
+  public static final String DEFAULT_CARBON_MAJOR_COMPACTION_SIZE = "1024";
 
   /**
    * This property is used to tell how many segments to be preserved from merging.
@@ -873,6 +873,16 @@ public final class CarbonCommonConstants {
   public static final String TABLE_BLOCKSIZE = "table_blocksize";
   // set in column level to disable inverted index
   public static final String NO_INVERTED_INDEX = "no_inverted_index";
+  // table property name of major compaction size
+  public static final String TABLE_MAJOR_COMPACTION_SIZE = "major_compaction_size";
+  // table property name of auto load merge
+  public static final String TABLE_AUTO_LOAD_MERGE = "auto_load_merge";
+  // table property name of compaction level threshold
+  public static final String TABLE_COMPACTION_LEVEL_THRESHOLD = "compaction_level_threshold";
+  // table property name of preserve segments numbers while compaction
+  public static final String TABLE_COMPACTION_PRESERVE_SEGMENTS = "compaction_preserve_segments";
+  // table property name of allowed compaction days while compaction
+  public static final String TABLE_ALLOWED_COMPACTION_DAYS = "allowed_compaction_days";
 
   /**
    * 16 mb size

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 11aea99..7b80a8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -620,10 +620,12 @@ public final class CarbonProperties {
   public long getMajorCompactionSize() {
     long compactionSize;
     try {
-      compactionSize = Long.parseLong(getProperty(CarbonCommonConstants.MAJOR_COMPACTION_SIZE,
-          CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE));
+      compactionSize = Long.parseLong(getProperty(
+              CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE,
+              CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE));
     } catch (NumberFormatException e) {
-      compactionSize = Long.parseLong(CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE);
+      compactionSize = Long.parseLong(
+              CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE);
     }
     return compactionSize;
   }
@@ -684,7 +686,7 @@ public final class CarbonProperties {
    * @param commaSeparatedLevels the string format value before separating
    * @return the int array format value after separating by comma
    */
-  private int[] getIntArray(String commaSeparatedLevels) {
+  public int[] getIntArray(String commaSeparatedLevels) {
     String[] levels = commaSeparatedLevels.split(",");
     int[] compactionSize = new int[levels.length];
     int i = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/docs/data-management-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md
index 7f6e3b9..94820ad 100644
--- a/docs/data-management-on-carbondata.md
+++ b/docs/data-management-on-carbondata.md
@@ -92,6 +92,26 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
      ```
      NOTE: 512 or 512M both are accepted.
 
+   - **Table Compaction Configuration**
+   
+     These properties are table level compaction configurations, if not specified, system level configurations in carbon.properties will be used.
+     Following are 5 configurations:
+     
+     * MAJOR_COMPACTION_SIZE: same meaning with carbon.major.compaction.size, size in MB.
+     * AUTO_LOAD_MERGE: same meaning with carbon.enable.auto.load.merge.
+     * COMPACTION_LEVEL_THRESHOLD: same meaning with carbon.compaction.level.threshold.
+     * COMPACTION_PRESERVE_SEGMENTS: same meaning with carbon.numberof.preserve.segments.
+     * ALLOWED_COMPACTION_DAYS: same meaning with carbon.allowed.compaction.days.     
+
+     ```
+     TBLPROPERTIES ('MAJOR_COMPACTION_SIZE'='2048',
+                    'AUTO_LOAD_MERGE'='true',
+                    'COMPACTION_LEVEL_THRESHOLD'='5,6',
+                    'COMPACTION_PRESERVE_SEGMENTS'='10',
+                    'ALLOWED_COMPACTION_DAYS'='5')
+     ```
+
+
 ### Example:
 
    ```
@@ -109,7 +129,12 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
                    'NO_INVERTED_INDEX'='productBatch',
                    'SORT_COLUMNS'='productName,storeCity',
                    'SORT_SCOPE'='NO_SORT',
-                   'TABLE_BLOCKSIZE'='512')
+                   'TABLE_BLOCKSIZE'='512',
+                   'MAJOR_COMPACTION_SIZE'='2048',
+                   'AUTO_LOAD_MERGE'='true',
+                   'COMPACTION_LEVEL_THRESHOLD'='5,6',
+                   'COMPACTION_PRESERVE_SEGMENTS'='10',
+                   'ALLOWED_COMPACTION_DAYS'='5')
    ```
         
 ## TABLE MANAGEMENT  

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala
new file mode 100644
index 0000000..8b34a8b
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+object TableLevelCompactionOptionExample {
+
+  def main(args: Array[String]) {
+    val spark = ExampleUtils.createCarbonSession("DataManagementExample")
+    spark.sparkContext.setLogLevel("WARN")
+
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+
+    // Create table with table level compaction options
+    // while loading and compacting, table level compaction options will be used instead of
+    // options specified in carbon.properties
+    spark.sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS carbon_table(
+         | ID Int,
+         | date Date,
+         | country String,
+         | name String,
+         | phonetype String,
+         | serialname String,
+         | salary Int,
+         | floatField float
+         | ) STORED BY 'carbondata'
+         | TBLPROPERTIES (
+         | 'MAJOR_COMPACTION_SIZE'='1024',
+         | 'AUTO_LOAD_MERGE'='true',
+         | 'COMPACTION_LEVEL_THRESHOLD'='3,2',
+         | 'COMPACTION_PRESERVE_SEGMENTS'='2',
+         | 'ALLOWED_COMPACTION_DAYS'='1')
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val path = s"$rootPath/examples/spark2/src/main/resources/dataSample.csv"
+
+    // load 6 segments
+    // scalastyle:off
+    (1 to 6).foreach(_ => spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon_table
+         | OPTIONS('HEADER'='true')
+       """.stripMargin))
+    // scalastyle:on
+
+    // show all segments, existing segments are 0.1,3,4,5, compacted segments are 0,1,2
+    // because of 2 segments are preserved, only one level-1 minor compaction is triggered
+    spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show()
+
+    // load another 2 segments
+    // scalastyle:off
+    (1 to 2).foreach(_ => spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon_table
+         | OPTIONS('HEADER'='true')
+       """.stripMargin))
+    // scalastyle:on
+
+    // show all segments, existing segments will be 0.2,6,7,
+    // compacted segments are 0,1,2,3,4,5,0.1,3.1
+    spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show()
+
+    // load another 2 segments
+    // scalastyle:off
+    (1 to 2).foreach(_ => spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon_table
+         | OPTIONS('HEADER'='true')
+       """.stripMargin))
+    // scalastyle:on
+
+    // do major compaction, there will be 3 segment left(2 preserved segments)
+    spark.sql("ALTER TABLE carbon_table COMPACT 'MAJOR'")
+    spark.sql("CLEAN FILES FOR TABLE carbon_table")
+    spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show()
+
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+
+    spark.stop()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithCompactionOptions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithCompactionOptions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithCompactionOptions.scala
new file mode 100644
index 0000000..d7c05ce
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithCompactionOptions.scala
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the"License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an"AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestCreateTableWithCompactionOptions extends QueryTest with BeforeAndAfterAll {
+
+  val tableWithCompactionOptions = "tableWithCompactionOptions"
+  val tableWithInvalidMajorCompactionSize = "tableWithInvalidMajorCompactionSize"
+  val tableWithInvalidAutoLoadMerge = "tableWithInvalidAutoLoadMerge"
+  val tableWithInvalidLevelThreshold = "tableWithInvalidLevelThreshold"
+  val tableWithInvalidPreserveSegments = "tableWithInvalidPreserveSegments"
+  val tableWithInvalidAllowedDays = "tableWithInvalidAllowedDays"
+  val tableWithoutCompactionOptions = "tableWithoutCompactionOptions"
+
+  override def beforeAll: Unit = {
+    cleanTables()
+  }
+
+  override def afterAll: Unit = {
+    cleanTables()
+  }
+
+  private def cleanTables(): Unit = {
+    sql("use default")
+    sql(s"DROP TABLE IF EXISTS $tableWithCompactionOptions")
+    sql(s"DROP TABLE IF EXISTS $tableWithInvalidMajorCompactionSize")
+    sql(s"DROP TABLE IF EXISTS $tableWithInvalidAutoLoadMerge")
+    sql(s"DROP TABLE IF EXISTS $tableWithInvalidLevelThreshold")
+    sql(s"DROP TABLE IF EXISTS $tableWithInvalidPreserveSegments")
+    sql(s"DROP TABLE IF EXISTS $tableWithInvalidAllowedDays")
+    sql(s"DROP TABLE IF EXISTS $tableWithoutCompactionOptions")
+  }
+
+  test("test create table with compaction options") {
+    sql(
+      s"""
+         | CREATE TABLE $tableWithCompactionOptions(
+         | intField INT,
+         | stringField STRING
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('MAJOR_COMPACTION_SIZE'='10240',
+         | 'AUTO_LOAD_MERGE'='true',
+         | 'COMPACTION_LEVEL_THRESHOLD'='5,6',
+         | 'COMPACTION_PRESERVE_SEGMENTS'='10',
+         | 'ALLOWED_COMPACTION_DAYS'='5')
+       """.stripMargin)
+
+    val tableOptions = sql(s"DESCRIBE FORMATTED $tableWithCompactionOptions")
+      .collect().map(r => (r.getString(0).trim, r.getString(1).trim)).toMap
+
+    assert(tableOptions.contains("MAJOR_COMPACTION_SIZE"))
+    assert(tableOptions.getOrElse("MAJOR_COMPACTION_SIZE","").equals("10240"))
+    assert(tableOptions.contains("AUTO_LOAD_MERGE"))
+    assert(tableOptions.getOrElse("AUTO_LOAD_MERGE","").equals("true"))
+    assert(tableOptions.contains("COMPACTION_LEVEL_THRESHOLD"))
+    assert(tableOptions.getOrElse("COMPACTION_LEVEL_THRESHOLD","").equals("5,6"))
+    assert(tableOptions.contains("COMPACTION_PRESERVE_SEGMENTS"))
+    assert(tableOptions.getOrElse("COMPACTION_PRESERVE_SEGMENTS","").equals("10"))
+    assert(tableOptions.contains("ALLOWED_COMPACTION_DAYS"))
+    assert(tableOptions.getOrElse("ALLOWED_COMPACTION_DAYS","").equals("5"))
+  }
+
+  test("test create table with invalid major compaction size") {
+    val exception: Exception = intercept[Exception] {
+      sql(
+        s"""
+           |CREATE TABLE $tableWithInvalidMajorCompactionSize
+           |(
+           |intField INT,
+           |stringField STRING
+           |)
+           |STORED BY 'carbondata'
+           |TBLPROPERTIES('MAJOR_COMPACTION_SIZE'='abc')
+       """.stripMargin)
+    }
+    assert(exception.getMessage.contains(
+      "Invalid major_compaction_size value found: abc, " +
+        "only int value greater than 0 is supported."))
+  }
+
+  test("test create table with invalid auto load merge") {
+    val exception: Exception = intercept[Exception] {
+      sql(
+        s"""
+           |CREATE TABLE $tableWithInvalidAutoLoadMerge
+           |(
+           |intField INT,
+           |stringField STRING
+           |)
+           |STORED BY 'carbondata'
+           |TBLPROPERTIES('AUTO_LOAD_MERGE'='123')
+       """.stripMargin)
+    }
+    assert(exception.getMessage.contains(
+      "Invalid auto_load_merge value found: 123, only true|false is supported."))
+  }
+
+  test("test create table with invalid level threshold") {
+    val exception: Exception = intercept[Exception] {
+      sql(
+        s"""
+           |CREATE TABLE $tableWithInvalidLevelThreshold
+           |(
+           |intField INT,
+           |stringField STRING
+           |)
+           |STORED BY 'carbondata'
+           |TBLPROPERTIES(
+           |'AUTO_LOAD_MERGE'='true',
+           |'COMPACTION_LEVEL_THRESHOLD'='x,6')
+       """.stripMargin)
+    }
+    assert(exception.getMessage.contains(
+      "Invalid compaction_level_threshold value found: x,6, " +
+        "only int values separated by comma and between 0 and 100 are supported."))
+  }
+
+  test("test create table with invalid preserve segments number") {
+    val exception: Exception = intercept[Exception] {
+      sql(
+        s"""
+           |CREATE TABLE $tableWithInvalidPreserveSegments
+           |(
+           |intField INT,
+           |stringField STRING
+           |)
+           |STORED BY 'carbondata'
+           |TBLPROPERTIES(
+           |'AUTO_LOAD_MERGE'='true',
+           |'COMPACTION_LEVEL_THRESHOLD'='4,6',
+           |'COMPACTION_PRESERVE_SEGMENTS'='abc')
+       """.stripMargin)
+    }
+    assert(exception.getMessage.contains(
+      "Invalid compaction_preserve_segments value found: abc, " +
+        "only int value between 0 and 100 is supported."))
+  }
+
+  test("test create table with invalid allowed days") {
+    val exception: Exception = intercept[Exception] {
+      sql(
+        s"""
+           |CREATE TABLE $tableWithInvalidAllowedDays
+           |(
+           |intField INT,
+           |stringField STRING
+           |)
+           |STORED BY 'carbondata'
+           |TBLPROPERTIES(
+           |'AUTO_LOAD_MERGE'='true',
+           |'ALLOWED_COMPACTION_DAYS'='abc')
+       """.stripMargin)
+    }
+    assert(exception.getMessage.contains(
+      "Invalid allowed_compaction_days value found: abc, " +
+        "only int value between 0 and 100 is supported."))
+  }
+
+  test("test create table without compaction options") {
+    sql(
+      s"""
+         | CREATE TABLE $tableWithoutCompactionOptions(
+         | intField INT,
+         | stringField STRING
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    val tableOptions = sql(s"DESCRIBE FORMATTED $tableWithoutCompactionOptions")
+      .collect().map(r => (r.getString(0).trim, r.getString(1).trim)).toMap
+
+    assert(!tableOptions.contains("MAJOR_COMPACTION_SIZE"))
+    assert(!tableOptions.contains("AUTO_LOAD_MERGE"))
+    assert(!tableOptions.contains("COMPACTION_LEVEL_THRESHOLD"))
+    assert(!tableOptions.contains("COMPACTION_PRESERVE_SEGMENTS"))
+    assert(!tableOptions.contains("ALLOWED_COMPACTION_DAYS"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
index 6d79f6c..c522c1e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
@@ -51,7 +51,7 @@ class CompactionSupportGlobalSortBigFileTest extends QueryTest with BeforeAndAft
     CompactionSupportGlobalSortBigFileTest.deleteFile(file3)
     CompactionSupportGlobalSortBigFileTest.deleteFile(file4)
     CompactionSupportGlobalSortBigFileTest.deleteFile(file5)
-    resetConf(CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE)
+    resetConf(CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE)
   }
 
   override def beforeEach {
@@ -104,7 +104,7 @@ class CompactionSupportGlobalSortBigFileTest extends QueryTest with BeforeAndAft
 
   private def resetConf(size:String) {
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.MAJOR_COMPACTION_SIZE, size)
+      .addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, size)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
new file mode 100644
index 0000000..5445779
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the"License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an"AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import java.io.{File, PrintWriter}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class TableLevelCompactionOptionTest extends QueryTest
+  with BeforeAndAfterEach with BeforeAndAfterAll {
+
+  val tempFilePath: String = s"$resourcesPath/temp/tableLevelCompactionParaTest.csv"
+  val sampleFilePath: String = resourcesPath + "/sample.csv"
+
+  override def beforeEach {
+    cleanTable()
+  }
+
+  override def afterEach {
+    resetConf()
+    cleanTable()
+  }
+
+  private def resetConf() ={
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE,
+      CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+        CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+        CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+        CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT)
+  }
+
+  private def cleanTable() = {
+    deleteTempFile()
+    sql("DROP TABLE IF EXISTS carbon_table")
+  }
+
+  private def generateTempFile() = {
+    val writer = new PrintWriter(new File(tempFilePath))
+    try {
+      writer.println("id,name,city,age")
+      val lines =
+        s"""|1,david,shenzhen,31
+            |2,eason,shenzhen,27
+            |3,jarry,wuhan,35
+            |3,jarry,Bangalore,35
+            |4,kunal,Delhi,26
+            |4,vishal,Bangalore,29""".stripMargin
+      for (i <- 0 until 250000) {
+        writer.println(lines)
+      }
+      writer.flush()
+    } finally {
+      if (writer != null) writer.close()
+    }
+  }
+
+  private def deleteTempFile() = {
+    val file = new File(tempFilePath)
+    if (file.exists()) {
+      file.delete()
+    }
+  }
+
+  test("MAJOR_COMPACTION_SIZE, use system level configuration"){
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, "10")
+
+    // generate a temp file which is larger than 1M but smaller than 5M
+    generateTempFile()
+
+    sql(
+      """
+        |CREATE TABLE carbon_table
+        |(id INT, name STRING, city STRING, age INT)
+        |STORED BY 'org.apache.carbondata.format'
+        |TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$tempFilePath' INTO TABLE carbon_table")
+    }
+
+    sql("ALTER TABLE carbon_table COMPACT 'MAJOR'")
+    sql("CLEAN FILES FOR TABLE carbon_table")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0"))
+    assert(!SegmentSequenceIds.contains("1"))
+    assert(SegmentSequenceIds.contains("0.1"))
+
+  }
+
+  test("MAJOR_COMPACTION_SIZE, use table level configuration") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, "10")
+
+    // generate a temp file which is larger than 1M but smaller than 5M
+    generateTempFile()
+
+    sql(
+      """
+        |CREATE TABLE carbon_table
+        |(id INT, name STRING, city STRING, age INT)
+        |STORED BY 'org.apache.carbondata.format'
+        |TBLPROPERTIES('SORT_COLUMNS'='city,name',
+        |'MAJOR_COMPACTION_SIZE'='1')
+      """.stripMargin)
+
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$tempFilePath' INTO TABLE carbon_table")
+    }
+
+    // each segment is larger than 1M, so no segments will be compacted
+    sql("ALTER TABLE carbon_table COMPACT 'MAJOR'")
+    sql("CLEAN FILES FOR TABLE carbon_table")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0"))
+    assert(SegmentSequenceIds.contains("1"))
+    assert(!SegmentSequenceIds.contains("0.1"))
+
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE,
+      CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: true, use system level configuration"){
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0")
+
+    sql(
+      """
+        | CREATE TABLE carbon_table(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+
+    for (i <- 0 until 8) {
+      sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
+    }
+    sql("CLEAN FILES FOR TABLE carbon_table")
+    var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
+    var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(segmentSequenceIds.size==1)
+    assert(segmentSequenceIds.contains("0.2"))
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: false, use table level configuration"){
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0")
+
+    sql(
+      """
+        | CREATE TABLE carbon_table(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name',
+        | 'AUTO_LOAD_MERGE'='false')
+      """.stripMargin)
+
+    for (i <- 0 until 8) {
+      sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
+    }
+    // table level configuration: 'AUTO_LOAD_MERGE'='false', so no segments will be compacted
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE carbon_table"), false, "Compacted")
+    var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
+    var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(segmentSequenceIds.size==8)
+    assert(!segmentSequenceIds.contains("0.1"))
+    assert(!segmentSequenceIds.contains("4.1"))
+    assert(!segmentSequenceIds.contains("0.2"))
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: true, use table level configuration") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0")
+
+    sql(
+      """
+         | CREATE TABLE carbon_table(id INT, name STRING, city STRING, age INT)
+         | STORED BY 'org.apache.carbondata.format'
+         | TBLPROPERTIES('SORT_COLUMNS'='city,name',
+         | 'AUTO_LOAD_MERGE'='true',
+         | 'COMPACTION_LEVEL_THRESHOLD'='3,2',
+         | 'COMPACTION_PRESERVE_SEGMENTS'='2',
+         | 'TABLE_ALLOWED_COMPACTION_DAYS'='1')
+      """.stripMargin)
+
+    // load 6 segments, the latest 2 segments are preserved
+    // only one level-1 minor compaction is triggered which compacts segment 0,1,2 to segment 0.1
+    // seg0 \
+    // seg1  -- compacted to seg0.1
+    // seg2 /
+    // seg3
+    // seg4 (preserved)
+    // seg5 (preserved)
+    for (i <- 0 until 6) {
+      sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
+    }
+    sql("CLEAN FILES FOR TABLE carbon_table")
+    var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
+    var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(segmentSequenceIds.contains("0.1"))
+    assert(!segmentSequenceIds.contains("3.1"))
+    assert(!segmentSequenceIds.contains("0.2"))
+
+    // load another two segments, the latest 2 segments are preserved
+    // level-2 minor compaction is triggered which compacts segment 0,1,2,3,4,5 -> 0.2
+    // seg0 \
+    // seg1  -- compacted to seg0.1 \
+    // seg2 /                         -- compacted to seg0.2
+    // seg3 \                       /
+    // seg4  -- compacted to seg3.1
+    // seg5 /
+    // seg6 (preserved)
+    // seg7 (preserved)
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
+    }
+    sql("CLEAN FILES FOR TABLE carbon_table")
+    segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
+    segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(segmentSequenceIds.contains("0.2"))
+    assert(!segmentSequenceIds.contains("0.1"))
+    assert(!segmentSequenceIds.contains("3.1"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index c902537..ae30300 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -471,6 +471,125 @@ object CommonUtil {
   }
 
   /**
+   * validate table level properties for compaction
+   *
+   * @param tableProperties
+   */
+  def validateTableLevelCompactionProperties(tableProperties: Map[String, String]): Unit = {
+    validateMajorCompactionSize(tableProperties)
+    validateAutoLoadMerge(tableProperties)
+    validateCompactionLevelThreshold(tableProperties)
+    validateCompactionPreserveSegmentsOrAllowedDays(tableProperties,
+      CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS)
+    validateCompactionPreserveSegmentsOrAllowedDays(tableProperties,
+      CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS)
+  }
+
+  /**
+   * This method will validate the major compaction size specified by the user
+   * the property is used while doing major compaction
+   *
+   * @param tableProperties
+   */
+  def validateMajorCompactionSize(tableProperties: Map[String, String]): Unit = {
+    var majorCompactionSize: Integer = 0
+    val tblPropName = CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE
+    if (tableProperties.get(tblPropName).isDefined) {
+      val majorCompactionSizeStr: String =
+        parsePropertyValueStringInMB(tableProperties(tblPropName))
+      try {
+        majorCompactionSize = Integer.parseInt(majorCompactionSizeStr)
+      } catch {
+        case e: NumberFormatException =>
+          throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+            s"$majorCompactionSizeStr, only int value greater than 0 is supported.")
+      }
+      if (majorCompactionSize < 0) {
+        throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+          s"$majorCompactionSizeStr, only int value greater than 0 is supported.")
+      }
+      tableProperties.put(tblPropName, majorCompactionSizeStr)
+    }
+  }
+
+  /**
+   * This method will validate the auto merge load property specified by the user
+   * the property is used while doing minor compaction
+   *
+   * @param tableProperties
+   */
+  def validateAutoLoadMerge(tableProperties: Map[String, String]): Unit = {
+    val tblPropName = CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE
+    if (tableProperties.get(tblPropName).isDefined) {
+      val trimStr = tableProperties(tblPropName).trim
+      if (!trimStr.equalsIgnoreCase("true") && !trimStr.equalsIgnoreCase("false")) {
+        throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+          s"$trimStr, only true|false is supported.")
+      }
+      tableProperties.put(tblPropName, trimStr)
+    }
+  }
+
+  /**
+   * This method will validate the compaction level threshold property specified by the user
+   * the property is used while doing minor compaction
+   *
+   * @param tableProperties
+   */
+  def validateCompactionLevelThreshold(tableProperties: Map[String, String]): Unit = {
+    val tblPropName = CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD
+    if (tableProperties.get(tblPropName).isDefined) {
+      val regularedStr = tableProperties(tblPropName).replace(" ", "")
+      try {
+        val levels: Array[String] = regularedStr.split(",")
+        val thresholds = regularedStr.split(",").map(levelThresholdStr => levelThresholdStr.toInt)
+        if (!thresholds.forall(t => t < 100 && t > 0)) {
+          throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+            s"$regularedStr, only int values separated by comma and between 0 " +
+            s"and 100 are supported.")
+        }
+      }
+      catch {
+        case e: NumberFormatException =>
+          throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+            s"$regularedStr, only int values separated by comma and between 0 " +
+            s"and 100 are supported.")
+      }
+      tableProperties.put(tblPropName, regularedStr)
+    }
+  }
+
+  /**
+   * This method will validate the compaction preserve segments property
+   * or compaction allowed days property
+   *
+   * @param tableProperties
+   * @param tblPropName
+   */
+  def validateCompactionPreserveSegmentsOrAllowedDays(tableProperties: Map[String, String],
+                                                      tblPropName: String): Unit = {
+    if (tblPropName.equals(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS) ||
+      tblPropName.equals(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS)) {
+      var propValue: Integer = 0
+      if (tableProperties.get(tblPropName).isDefined) {
+        val propValueStr = tableProperties(tblPropName).trim
+        try {
+          propValue = Integer.parseInt(propValueStr)
+        } catch {
+          case e: NumberFormatException =>
+            throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+              s"$propValueStr, only int value between 0 and 100 is supported.")
+        }
+        if (propValue < 0 || propValue > 100) {
+          throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+            s"$propValueStr, only int value between 0 and 100 is supported.")
+        }
+        tableProperties.put(tblPropName, propValueStr)
+      }
+    }
+  }
+
+  /**
    * This method will validate the table block size specified by the user
    *
    * @param tableProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index a39465f..6e9b36c 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -275,6 +275,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
+    // validate table level properties for compaction
+    CommonUtil.validateTableLevelCompactionProperties(tableProperties)
 
     TableModel(
       ifNotExistPresent,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8c9700c..d288122 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -208,7 +208,8 @@ object CarbonDataRDDFactory {
 
               val newCarbonLoadModel = prepareCarbonLoadModel(table)
 
-              val compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR)
+              val compactionSize = CarbonDataMergerUtil
+                .getCompactionSize(CompactionType.MAJOR, carbonLoadModel)
 
               val newcompactionModel = CompactionModel(
                 compactionSize,
@@ -720,8 +721,8 @@ object CarbonDataRDDFactory {
       operationContext: OperationContext
   ): Unit = {
     LOGGER.info(s"compaction need status is" +
-                s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
-    if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
+                s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
+    if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
       LOGGER.audit(s"Compaction request received for table " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       val compactionSize = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index cb0ff2d..517fbda 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -121,7 +121,8 @@ case class CarbonAlterTableCompactionCommand(
       operationContext: OperationContext): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
     val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
-    val compactionSize: Long = CarbonDataMergerUtil.getCompactionSize(compactionType)
+    val compactionSize: Long = CarbonDataMergerUtil
+      .getCompactionSize(compactionType, carbonLoadModel)
     if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
       if (alterTableModel.segmentUpdateStatusManager.isDefined) {
         carbonLoadModel.setSegmentUpdateStatusManager(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 807c8e5..ccdbd6e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -107,6 +107,38 @@ private[sql] case class CarbonDescribeFormattedCommand(
     val isStreaming = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
       .getOrElse("streaming", "false")
     results ++= Seq(("Streaming", isStreaming, ""))
+
+    val tblProps = carbonTable.getTableInfo.getFactTable.getTableProperties
+    results ++= Seq(("SORT_SCOPE", tblProps.getOrDefault("sort_scope", CarbonCommonConstants
+      .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+
+    // show table level compaction options
+    if (tblProps.containsKey(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE)) {
+      results ++= Seq((CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE.toUpperCase
+        , tblProps.get(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE),
+        CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE))
+    }
+    if (tblProps.containsKey(CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE)) {
+      results ++= Seq((CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE.toUpperCase,
+        tblProps.get(CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE),
+        CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE))
+    }
+    if (tblProps.containsKey(CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD)) {
+      results ++= Seq((CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD.toUpperCase,
+        tblProps.get(CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD),
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD))
+    }
+    if (tblProps.containsKey(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS)) {
+      results ++= Seq((CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS.toUpperCase,
+        tblProps.get(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS),
+        CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER))
+    }
+    if (tblProps.containsKey(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS)) {
+      results ++= Seq((CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS.toUpperCase,
+        tblProps.get(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS),
+        CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT))
+    }
+
     results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
     if (colPropStr.length() > 0) {
       results ++= Seq((colPropStr, "", ""))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 3729b1d..e245927 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -21,15 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -78,7 +70,8 @@ public final class CarbonDataMergerUtil {
     // carbon data file case.
     CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
 
-      @Override public boolean accept(CarbonFile file) {
+      @Override
+      public boolean accept(CarbonFile file) {
         return CarbonTablePath.isCarbonDataFile(file.getName());
       }
     });
@@ -93,17 +86,22 @@ public final class CarbonDataMergerUtil {
   /**
    * To check whether the merge property is enabled or not.
    *
+   * @Params carbonTable
    * @return
    */
-
-  public static boolean checkIfAutoLoadMergingRequired() {
+  public static boolean checkIfAutoLoadMergingRequired(CarbonTable carbonTable) {
     // load merge is not supported as per new store format
     // moving the load merge check in early to avoid unnecessary load listing causing IOException
     // check whether carbons segment merging operation is enabled or not.
     // default will be false.
+    Map<String, String> tblProps = carbonTable.getTableInfo().getFactTable().getTableProperties();
+
     String isLoadMergeEnabled = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
-            CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE);
+            .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+                    CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE);
+    if (tblProps.containsKey(CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE)) {
+      isLoadMergeEnabled = tblProps.get(CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE);
+    }
     if (isLoadMergeEnabled.equalsIgnoreCase("false")) {
       return false;
     }
@@ -386,9 +384,11 @@ public final class CarbonDataMergerUtil {
    * @return
    */
   public static List<LoadMetadataDetails> identifySegmentsToBeMerged(
-      CarbonLoadModel carbonLoadModel, long compactionSize,
-      List<LoadMetadataDetails> segments, CompactionType compactionType) {
+          CarbonLoadModel carbonLoadModel, long compactionSize,
+          List<LoadMetadataDetails> segments, CompactionType compactionType) {
     String tablePath = carbonLoadModel.getTablePath();
+    Map<String, String> tableLevelProperties = carbonLoadModel.getCarbonDataLoadSchema()
+            .getCarbonTable().getTableInfo().getFactTable().getTableProperties();
     List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments);
 
     sortSegments(sortedSegments);
@@ -402,22 +402,24 @@ public final class CarbonDataMergerUtil {
     // check preserve property and preserve the configured number of latest loads.
 
     List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
-        checkPreserveSegmentsPropertyReturnRemaining(sortedSegments);
+            checkPreserveSegmentsPropertyReturnRemaining(sortedSegments, tableLevelProperties);
 
     // filter the segments if the compaction based on days is configured.
 
     List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval =
-        identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
+            identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve,
+                    tableLevelProperties);
     List<LoadMetadataDetails> listOfSegmentsToBeMerged;
     // identify the segments to merge based on the Size of the segments across partition.
     if (CompactionType.MAJOR == compactionType) {
 
       listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
-          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, tablePath);
+              listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, tablePath);
     } else {
 
       listOfSegmentsToBeMerged =
-          identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval);
+              identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval,
+                      tableLevelProperties);
     }
 
     return listOfSegmentsToBeMerged;
@@ -430,7 +432,8 @@ public final class CarbonDataMergerUtil {
   public static void sortSegments(List segments) {
     // sort the segment details.
     Collections.sort(segments, new Comparator<LoadMetadataDetails>() {
-      @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
+      @Override
+      public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
         double seg1Id = Double.parseDouble(seg1.getLoadName());
         double seg2Id = Double.parseDouble(seg2.getLoadName());
         return Double.compare(seg1Id, seg2Id);
@@ -443,19 +446,26 @@ public final class CarbonDataMergerUtil {
    * This property is configurable.
    *
    * @param listOfSegmentsBelowThresholdSize
+   * @param tblProps
    * @return
    */
   private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate(
-      List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) {
+      List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize, Map<String, String> tblProps) {
 
     List<LoadMetadataDetails> loadsOfSameDate =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
     long numberOfDaysAllowedToMerge = 0;
     try {
+      // overwrite system level option by table level option if exists
       numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
-              CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT));
+              .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+                      CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT));
+      if (tblProps.containsKey(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS)) {
+        numberOfDaysAllowedToMerge = Long.parseLong(
+                tblProps.get(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS));
+      }
+
       if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) {
         LOGGER.error(
             "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT
@@ -675,18 +685,27 @@ public final class CarbonDataMergerUtil {
    *
    * @param listOfSegmentsAfterPreserve the list of segments after
    *        preserve and before filtering by minor compaction level
+   * @param tblProps
    * @return the list of segments to be merged after filtering by minor compaction level
    */
   private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
-      List<LoadMetadataDetails> listOfSegmentsAfterPreserve) {
+          List<LoadMetadataDetails> listOfSegmentsAfterPreserve, Map<String, String> tblProps) {
 
     List<LoadMetadataDetails> mergedSegments =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<LoadMetadataDetails> unMergedSegments =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    int[] noOfSegmentLevelsCount =
-        CarbonProperties.getInstance().getCompactionSegmentLevelCount();
+            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    int[] noOfSegmentLevelsCount = CarbonProperties.getInstance()
+            .getCompactionSegmentLevelCount();
+    // overwrite system level option by table level option if exists
+    if (tblProps.containsKey(CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD)) {
+      noOfSegmentLevelsCount = CarbonProperties.getInstance()
+              .getIntArray(tblProps.get(CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD));
+      if (0 == noOfSegmentLevelsCount.length) {
+        noOfSegmentLevelsCount = CarbonProperties.getInstance().getCompactionSegmentLevelCount();
+      }
+    }
 
     int level1Size = 0;
     int level2Size = 0;
@@ -756,14 +775,21 @@ public final class CarbonDataMergerUtil {
    * checks number of loads to be preserved and returns remaining valid segments
    *
    * @param segments
+   * @param tblProps
    * @return
    */
   private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining(
-      List<LoadMetadataDetails> segments) {
+      List<LoadMetadataDetails> segments, Map<String, String> tblProps) {
     // check whether the preserving of the segments from merging is enabled or not.
-    // get the number of loads to be preserved.
-    int numberOfSegmentsToBePreserved =
-        CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved();
+    // get the number of loads to be preserved. default value is system level option
+    // overwrite system level option by table level option if exists
+    int numberOfSegmentsToBePreserved = CarbonProperties.getInstance()
+            .getNumberOfSegmentsToBePreserved();
+    if (tblProps.containsKey(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS)) {
+      numberOfSegmentsToBePreserved = Integer.parseInt(
+              tblProps.get(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS));
+    }
+
     // get the number of valid segments and retain the latest loads from merging.
     return CarbonDataMergerUtil
         .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved);
@@ -808,14 +834,23 @@ public final class CarbonDataMergerUtil {
    * This will give the compaction sizes configured based on compaction type.
    *
    * @param compactionType
+   * @param carbonLoadModel
    * @return
    */
-  public static long getCompactionSize(CompactionType compactionType) {
-
+  public static long getCompactionSize(CompactionType compactionType,
+                                       CarbonLoadModel carbonLoadModel) {
     long compactionSize = 0;
     switch (compactionType) {
       case MAJOR:
+        // default value is system level option
         compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
+        // if table level option is identified, use it to overwrite system level option
+        Map<String, String> tblProps = carbonLoadModel.getCarbonDataLoadSchema()
+                .getCarbonTable().getTableInfo().getFactTable().getTableProperties();
+        if (tblProps.containsKey(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE)) {
+          compactionSize = Long.parseLong(
+                  tblProps.get(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE));
+        }
         break;
       default: // this case can not come.
     }


Mime
View raw message