carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [07/50] [abbrv] carbondata git commit: Adding session based properties
Date Wed, 05 Jul 2017 00:44:16 GMT
Adding session based properties

Added set command in carbon to update properties dynamically


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/28e2e171
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/28e2e171
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/28e2e171

Branch: refs/heads/streaming_ingest
Commit: 28e2e171db578e2467be55939d2da9a5f1b70d09
Parents: 2234ec8
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Thu May 18 15:04:17 2017 +0530
Committer: Manohar <manohar.crazy09@gmail.com>
Committed: Tue Jun 27 14:39:50 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/util/CarbonProperties.java  | 23 +++++++
 .../carbondata/core/util/SessionParams.java     | 70 ++++++++++++++++++++
 .../testsuite/commands/SetCommandTestCase.scala | 34 ++++++++++
 .../spark/rdd/AlterTableAddColumnRDD.scala      |  4 ++
 .../spark/rdd/AlterTableDropColumnRDD.scala     |  5 ++
 .../spark/rdd/CarbonCleanFilesRDD.scala         |  5 ++
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |  5 ++
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |  5 ++
 .../spark/rdd/CarbonDropTableRDD.scala          |  6 ++
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 17 ++++-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |  3 +
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  4 ++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  4 ++
 .../spark/rdd/DataLoadCoalescedRDD.scala        |  6 ++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 17 +++--
 .../spark/rdd/UpdateCoalescedRDD.scala          |  7 +-
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |  4 +-
 .../spark/sql/hive/CarbonStrategies.scala       |  4 +-
 .../execution/command/CarbonHiveCommands.scala  | 16 ++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 16 +++--
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  9 ++-
 .../org/apache/spark/sql/CarbonSource.scala     |  5 +-
 .../execution/CarbonLateDecodeStrategy.scala    |  3 +-
 .../execution/CastExpressionOptimization.scala  |  6 +-
 .../execution/command/CarbonHiveCommands.scala  | 18 ++++-
 .../sql/execution/command/DDLStrategy.scala     | 10 +--
 .../execution/command/carbonTableSchema.scala   | 16 +++--
 .../apache/spark/sql/hive/CarbonMetastore.scala | 15 ++---
 28 files changed, 290 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 90c6ffa..0142e38 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -47,6 +49,11 @@ public final class CarbonProperties {
   private Properties carbonProperties;
 
   /**
+   * Added properties on the fly.
+   */
+  private Map<String, String> setProperties = new HashMap<>();
+
+  /**
    * Private constructor this will call load properties method to load all the
    * carbon properties in memory.
    */
@@ -447,10 +454,26 @@ public final class CarbonProperties {
    * @return properties value
    */
   public CarbonProperties addProperty(String key, String value) {
+    setProperties.put(key, value);
     carbonProperties.setProperty(key, value);
     return this;
   }
 
+  /**
+   * Get all the added properties.
+   * @return
+   */
+  public Map<String, String> getAddedProperies() {
+    return setProperties;
+  }
+
+  public void setProperties(Map<String, String> newProperties) {
+    setProperties.putAll(newProperties);
+    for (Map.Entry<String, String> entry : newProperties.entrySet()) {
+      carbonProperties.setProperty(entry.getKey(), entry.getValue());
+    }
+  }
+
   private ColumnarFormatVersion getDefaultFormatVersion() {
     return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
new file mode 100644
index 0000000..781b898
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -0,0 +1,70 @@
+package org.apache.carbondata.core.util;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by root1 on 19/5/17.
+ */
+public class SessionParams implements Serializable {
+
+  protected transient CarbonProperties properties;
+
+  private Map<String, String> sProps;
+
+  public SessionParams() {
+    sProps = new HashMap<>();
+    properties = CarbonProperties.getInstance();
+  }
+
+  public SessionParams(SessionParams sessionParams) {
+    this();
+    sProps.putAll(sessionParams.sProps);
+  }
+
+  /**
+   * This method will be used to get the properties value
+   *
+   * @param key
+   * @return properties value
+   */
+  public String getProperty(String key) {
+    String s = sProps.get(key);
+    if (key == null) {
+      s = properties.getProperty(key);
+    }
+    return s;
+  }
+
+  /**
+   * This method will be used to get the properties value if property is not
+   * present then it will return tghe default value
+   *
+   * @param key
+   * @return properties value
+   */
+  public String getProperty(String key, String defaultValue) {
+    String value = sProps.get(key);
+    if (key == null) {
+      value = properties.getProperty(key, defaultValue);
+    }
+    return value;
+  }
+
+  /**
+   * This method will be used to add a new property
+   *
+   * @param key
+   * @return properties value
+   */
+  public SessionParams addProperty(String key, String value) {
+    sProps.put(key, value);
+    return this;
+  }
+
+  public void setProperties(Map<String, String> newProperties) {
+    sProps.putAll(newProperties);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
new file mode 100644
index 0000000..28e2dbf
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.commands
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.util.CarbonProperties
+
+class SetCommandTestCase  extends QueryTest with BeforeAndAfterAll {
+
+  test("test set command") {
+
+    sql("set key1=value1")
+
+    assert(CarbonProperties.getInstance().getProperty("key1").equals("value1"), "Set command does not work" )
+    assert(sqlContext.getConf("key1").equals("value1"), "Set command does not work" )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index d81ed30..61e1e61 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -55,6 +55,8 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
   val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
     CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new AddColumnPartition(id, column._2, column._1)
@@ -64,6 +66,8 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, String)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index 53796bb..ba91673 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * This is a partitioner class for dividing the newly added columns into partitions
@@ -50,6 +51,8 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
     carbonTableIdentifier: CarbonTableIdentifier,
     carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new DropColumnPartition(id, column._2, column._1)
@@ -59,6 +62,8 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, String)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 9cc46c1..c1a30b7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -37,6 +38,8 @@ class CarbonCleanFilesRDD[V: ClassTag](
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
@@ -44,6 +47,8 @@ class CarbonCleanFilesRDD[V: ClassTag](
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
     val iter = new Iterator[(V)] {
+      // Add the properties added in driver to executor.
+      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index f9a7bdd..f7bed59 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.DeletedLoadResult
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -43,6 +44,8 @@ class CarbonDeleteLoadByDateRDD[K, V](
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {s =>
@@ -52,6 +55,8 @@ class CarbonDeleteLoadByDateRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     new Iterator[(K, V)] {
+      // Add the properties added in driver to executor.
+      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index 26e1abc..3ef9cef 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -37,6 +38,8 @@ class CarbonDeleteLoadRDD[V: ClassTag](
   extends RDD[V](sc, Nil) {
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {f =>
@@ -46,6 +49,8 @@ class CarbonDeleteLoadRDD[V: ClassTag](
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
     val iter = new Iterator[V] {
+      // Add the properties added in driver to executor.
+      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index dc63098..54f8ea5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -22,6 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -34,6 +35,8 @@ class CarbonDropTableRDD[V: ClassTag](
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map { s =>
@@ -43,6 +46,9 @@ class CarbonDropTableRDD[V: ClassTag](
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
 
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
+
     val iter = new Iterator[V] {
       // TODO: Clear Btree from memory
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 1e33188..434fb3c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -178,6 +178,8 @@ class CarbonAllDictionaryCombineRDD(
     model: DictionaryLoadModel)
   extends RDD[(Int, ColumnDistinctValues)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     firstParent[(String, Iterable[String])].partitions
   }
@@ -185,7 +187,8 @@ class CarbonAllDictionaryCombineRDD(
   override def compute(split: Partition, context: TaskContext
   ): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])]
     /*
      * for all dictionary, all columns need to encoding and checking
@@ -272,11 +275,15 @@ class CarbonBlockDistinctValuesCombineRDD(
     model: DictionaryLoadModel)
   extends RDD[(Int, ColumnDistinctValues)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = firstParent[Row].partitions
 
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
@@ -333,10 +340,14 @@ class CarbonGlobalDictionaryGenerateRDD(
     model: DictionaryLoadModel)
   extends RDD[(Int, String, Boolean)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
 
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
@@ -535,6 +546,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     dictFolderPath: String)
   extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val primDimensions = dictionaryLoadModel.primDimensions
     val primDimLength = primDimensions.length
@@ -547,6 +560,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
 
   override def compute(split: Partition, context: TaskContext)
   : Iterator[(Int, ColumnDistinctValues)] = {
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val theSplit = split.asInstanceOf[CarbonColumnDictPatition]
     val primDimension = theSplit.preDefDictDimension
     // read the column dict data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 277005b..38e3680 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -50,6 +51,8 @@ class CarbonIUDMergerRDD[K, V](
     carbonMergerMapping,
     confExecutorsTemp) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index caa389a..dec3ee3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -74,8 +74,12 @@ class CarbonMergerRDD[K, V](
   val factTableName = carbonMergerMapping.factTableName
   val tableId = carbonMergerMapping.tableId
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val iter = new Iterator[(K, V)] {
 
       carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 4807b90..2c10e65 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -67,6 +67,8 @@ class CarbonScanRDD(
 
   private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   @transient private val jobId = new JobID(jobTrackerId, id)
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -180,6 +182,8 @@ class CarbonScanRDD(
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
       )
     }
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
 
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
index 7395e43..5da0835 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
@@ -21,6 +21,8 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 
+import org.apache.carbondata.core.util.CarbonProperties
+
 case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
 
 class DataLoadCoalescedRDD[T: ClassTag](
@@ -28,12 +30,16 @@ class DataLoadCoalescedRDD[T: ClassTag](
   nodeList: Array[String])
     extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run
   }
 
   override def compute(split: Partition,
       context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
 
     new Iterator[DataLoadPartitionWrap[T]] {
       val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 058c5c6..5790369 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.rdd
 import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
+import java.util
 import java.util.{Date, UUID}
 
 import scala.collection.JavaConverters._
@@ -126,12 +127,16 @@ class SparkPartitionLoader(model: CarbonLoadModel,
 
   var storeLocation: String = ""
 
-  def initialize(): Unit = {
+  def initialize(addedProperies: util.Map[String, String]): Unit = {
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
       System.setProperty("carbon.properties.filepath",
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
     }
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
     CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
     CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
@@ -185,6 +190,8 @@ class NewCarbonDataLoadRDD[K, V](
   private val confBroadcast =
     sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     if (isTableSplitPartition) {
       // for table split partition
@@ -239,7 +246,7 @@ class NewCarbonDataLoadRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize()
+        loader.initialize(addedProperies)
         new DataLoadExecutor().execute(model,
           loader.storeLocation,
           recordReaders)
@@ -392,6 +399,7 @@ class NewDataFrameLoaderRDD[K, V](
                                    schemaLastUpdatedTime: Long,
                                    prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -430,7 +438,7 @@ class NewDataFrameLoaderRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize()
+        loader.initialize(addedProperies)
         new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: BadRecordFoundException =>
@@ -587,6 +595,7 @@ class PartitionTableDataLoaderRDD[K, V](
     schemaLastUpdatedTime: Long,
     prev: RDD[Row]) extends RDD[(K, V)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -616,7 +625,7 @@ class PartitionTableDataLoaderRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize()
+        loader.initialize(addedProperies)
         new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
       } catch {
         case e: BadRecordFoundException =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
index 67e094a..30050f7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
@@ -22,6 +22,8 @@ import scala.reflect.ClassTag
 import org.apache.spark._
 import org.apache.spark.rdd.{CoalescedRDDPartition, DataLoadPartitionCoalescer, RDD}
 
+import org.apache.carbondata.core.util.CarbonProperties
+
 // This RDD distributes previous RDD data based on number of nodes. i.e., one partition for one node
 
 class UpdateCoalescedRDD[T: ClassTag](
@@ -29,13 +31,16 @@ class UpdateCoalescedRDD[T: ClassTag](
     nodeList: Array[String])
   extends RDD[T](prev.context, Nil) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run
   }
 
   override def compute(split: Partition,
       context: TaskContext): Iterator[T] = {
-
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     // This iterator combines data from all the parent partitions
     new Iterator[T] {
       val parentPartitionIter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index bcfc096..6b94894 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.util
+
 import scala.collection.mutable
 
 import org.apache.spark.TaskContext
@@ -52,7 +54,7 @@ object UpdateDataLoad {
         segId,
         loadMetadataDetails)
       // Intialize to set carbon properties
-      loader.initialize()
+      loader.initialize(new util.HashMap)
 
       loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
       new DataLoadExecutor().execute(carbonLoadModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index f0cd33b..7bfd742 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
 import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
-import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan}
+import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SetCommand, SparkPlan}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
 import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
@@ -316,6 +316,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         } else {
           ExecutedCommand(HiveNativeCommand(sql)) :: Nil
         }
+      case set@SetCommand(kv) =>
+        ExecutedCommand(CarbonSetCommand(set)) :: Nil
       case _ =>
         Nil
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 0f42940..d047b20 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -18,10 +18,12 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.execution.{RunnableCommand, SetCommand}
 import org.apache.spark.sql.execution.command.DropTableCommand
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
 
+import org.apache.carbondata.core.util.CarbonProperties
+
 private[hive] case class CreateDatabaseCommand(dbName: String,
     command: HiveNativeCommand) extends RunnableCommand {
   def run(sqlContext: SQLContext): Seq[Row] = {
@@ -53,3 +55,15 @@ private[hive] case class DropDatabaseCascadeCommand(dbName: String,
     rows
   }
 }
+
+case class CarbonSetCommand(command: SetCommand)
+  extends RunnableCommand {
+
+  override val output = command.output
+
+  override def run(sparkSession: SQLContext): Seq[Row] = {
+    val rows = command.run(sparkSession)
+    CarbonProperties.getInstance().addProperty(rows.head.getString(0), rows.head.getString(1))
+    rows
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 48af516..2b77654 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -128,7 +128,7 @@ object CarbonDataRDDFactory {
       isCompactionTriggerByDDl
     )
 
-    val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+    val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
         .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
           CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
         )
@@ -275,8 +275,8 @@ object CarbonDataRDDFactory {
               exception = e
           }
           // continue in case of exception also, check for all the tables.
-          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+          val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).
+            sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
                 CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
               ).equalsIgnoreCase("true")
 
@@ -397,8 +397,8 @@ object CarbonDataRDDFactory {
         }
         storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
-        val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+        val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession)
+          .sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
               CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
             )
             .equalsIgnoreCase("true")
@@ -1042,7 +1042,8 @@ object CarbonDataRDDFactory {
     val timeStampFormat = if (specificFormat.isDefined) {
       new SimpleDateFormat(specificFormat.get)
     } else {
-      val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+      val timestampFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
+        .sessionParams.getProperty(CarbonCommonConstants
         .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
       new SimpleDateFormat(timestampFormatString)
     }
@@ -1050,7 +1051,8 @@ object CarbonDataRDDFactory {
     val dateFormat = if (specificFormat.isDefined) {
       new SimpleDateFormat(specificFormat.get)
     } else {
-      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+      val dateFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
+        .sessionParams.getProperty(CarbonCommonConstants
         .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
       new SimpleDateFormat(dateFormatString)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index b46488c..0851ec2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, SessionParams}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -35,6 +35,8 @@ class CarbonEnv {
 
   var carbonMetastore: CarbonMetastore = _
 
+  var sessionParams: SessionParams = _
+
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // set readsupport class global so that the executor can get it.
@@ -45,11 +47,12 @@ class CarbonEnv {
   def init(sparkSession: SparkSession): Unit = {
     sparkSession.udf.register("getTupleId", () => "")
     if (!initialized) {
+      sessionParams = new SessionParams()
       carbonMetastore = {
         val storePath =
-          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
         LOGGER.info(s"carbon env initial: $storePath")
-        new CarbonMetastore(sparkSession.conf, storePath)
+        new CarbonMetastore(sparkSession.conf, storePath, sessionParams)
       }
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
       initialized = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 1c16143..3079c84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -56,7 +56,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         None)
       case _ =>
         val options = new CarbonOption(parameters)
-        val storePath = CarbonProperties.getInstance()
+        val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
           .getProperty(CarbonCommonConstants.STORE_LOCATION)
         val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
         CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
@@ -77,7 +77,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
                                           "specified when creating CarbonContext")
 
     val options = new CarbonOption(parameters)
-    val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
+    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+      .getProperty(CarbonCommonConstants.STORE_LOCATION)
     val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
     val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
       .exists(tablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 4605914..8d0b4ea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -520,7 +520,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
         System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
       } else {
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+        CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+          .getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
           CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
index a8985b9..805a4df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
@@ -24,8 +24,7 @@ import java.util.{Locale, TimeZone}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not}
-import org.apache.spark.sql.CastExpr
-import org.apache.spark.sql.sources
+import org.apache.spark.sql.{CarbonEnv, CastExpr, SparkSession, sources}
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -35,7 +34,8 @@ object CastExpressionOptimization {
 
 
   def typeCastStringToLong(v: Any): Any = {
-    val parser: SimpleDateFormat = new SimpleDateFormat(CarbonProperties.getInstance
+    val parser: SimpleDateFormat = new SimpleDateFormat(
+      CarbonEnv.getInstance(SparkSession.getActiveSession.get).sessionParams
       .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index b72f077..627de02 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand}
+import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand, SetCommand}
+
+import org.apache.carbondata.core.util.CarbonProperties
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
   extends RunnableCommand {
@@ -40,3 +42,17 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
     rows
   }
 }
+
+case class CarbonSetCommand(command: SetCommand)
+  extends RunnableCommand {
+
+  override val output = command.output
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val rows = command.run(sparkSession)
+    CarbonEnv.getInstance(sparkSession).sessionParams
+      .addProperty(rows.head.getString(0), rows.head.getString(1))
+    rows
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 3593b6d..35be543 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -16,13 +16,12 @@
  */
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable,
-ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.hive.execution.command.CarbonDropDatabaseCommand
+import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonSetCommand}
 
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -110,13 +109,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
       case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
-        if
-        CarbonEnv.getInstance(sparkSession).carbonMetastore
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(identifier)(sparkSession) && isFormatted =>
         val resolvedTable =
           sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
         val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
         ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
+      case set@SetCommand(kv) =>
+        ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 0064c21..f1fd05b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -107,7 +107,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
 
-    var storeLocation = CarbonProperties.getInstance
+    var storeLocation = CarbonEnv.getInstance(sparkSession).sessionParams
       .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
         System.getProperty("java.io.tmpdir")
       )
@@ -359,7 +359,8 @@ case class LoadTable(
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
-    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
+    CarbonEnv.getInstance(sparkSession).sessionParams
+      .addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
         .getCarbonTableIdentifier,
@@ -408,7 +409,7 @@ case class LoadTable(
       val columnDict = options.getOrElse("columndict", null)
       val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
       val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordActionValue = CarbonProperties.getInstance()
+      val badRecordActionValue = CarbonEnv.getInstance(sparkSession).sessionParams
         .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
       val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
@@ -428,11 +429,12 @@ case class LoadTable(
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
       carbonLoadModel.setDateFormat(dateFormat)
-      carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      carbonLoadModel.setDefaultTimestampFormat(CarbonEnv.getInstance(sparkSession)
+        .sessionParams.getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_DATE_FORMAT,
+      carbonLoadModel.setDefaultDateFormat(CarbonEnv.getInstance(sparkSession).sessionParams.
+        getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
       carbonLoadModel
         .setSerializationNullFormat(
@@ -534,7 +536,7 @@ case class LoadTable(
                 allDictionaryPath)
           }
           // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = CarbonProperties.getInstance()
+          val dictionaryServerPort = CarbonEnv.getInstance(sparkSession).sessionParams
             .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 04a94ce..54cffc2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
 import org.apache.carbondata.core.reader.ThriftReader
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil, SessionParams}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -104,7 +104,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
+class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams: SessionParams) {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -201,18 +201,15 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
     // if zookeeper is configured as carbon lock type.
     val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
     if (null != zookeeperurl) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
+      sessionParams.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
     }
     if (metadataPath == null) {
       return null
     }
     // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
+    if (null == sessionParams.getProperty(CarbonCommonConstants.LOCK_TYPE) &&
         FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.LOCK_TYPE,
+      sessionParams.addProperty(CarbonCommonConstants.LOCK_TYPE,
           CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
         )
       LOGGER.info("Default lock type HDFSLOCK is configured")


Mime
View raw message