carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [22/50] [abbrv] incubator-carbondata git commit: [Issue-643] Column Property addition, extract interface for dictionary (#641)
Date Thu, 30 Jun 2016 17:42:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java b/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
index 17976a6..b814fa8 100644
--- a/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
+++ b/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.carbondata.core.carbon.CarbonTableIdentifier;
+import org.carbondata.core.carbon.ColumnIdentifier;
 import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
 import org.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader;
@@ -59,8 +60,10 @@ public class CarbonDictionarySortIndexWriterImplTest {
   @Test public void write() throws Exception {
     String storePath = hdfsStorePath;
     CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("testSchema", "carbon", UUID.randomUUID().toString());
+    ColumnIdentifier columnIdentifier = new ColumnIdentifier("Name", null, null);
+
     CarbonDictionarySortIndexWriter dictionarySortIndexWriter =
-        new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, "Name", storePath);
+        new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, columnIdentifier, storePath);
     List<int[]> indexList = prepareExpectedData();
     List<Integer> sortIndex = Arrays.asList(ArrayUtils.toObject(indexList.get(0)));
     List<Integer> invertedSortIndex = Arrays.asList(ArrayUtils.toObject(indexList.get(1)));
@@ -68,7 +71,7 @@ public class CarbonDictionarySortIndexWriterImplTest {
     dictionarySortIndexWriter.writeInvertedSortIndex(invertedSortIndex);
     dictionarySortIndexWriter.close();
     CarbonDictionarySortIndexReader carbonDictionarySortIndexReader =
-        new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, "Name", storePath);
+        new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, columnIdentifier, storePath);
     List<Integer> actualSortIndex = carbonDictionarySortIndexReader.readSortIndex();
     List<Integer> actualInvertedSortIndex = carbonDictionarySortIndexReader.readInvertedSortIndex();
     for (int i = 0; i < actualSortIndex.size(); i++) {
@@ -84,15 +87,17 @@ public class CarbonDictionarySortIndexWriterImplTest {
   @Test public void writingEmptyValue() throws Exception {
     String storePath = hdfsStorePath;
     CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("testSchema", "carbon", UUID.randomUUID().toString());
+    ColumnIdentifier columnIdentifier = new ColumnIdentifier("Name", null, null);
+
     CarbonDictionarySortIndexWriter dictionarySortIndexWriter =
-        new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, "Name", storePath);
+        new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, columnIdentifier, storePath);
     List<Integer> sortIndex = new ArrayList<>();
     List<Integer> invertedSortIndex = new ArrayList<>();
     dictionarySortIndexWriter.writeSortIndex(sortIndex);
     dictionarySortIndexWriter.writeInvertedSortIndex(invertedSortIndex);
     dictionarySortIndexWriter.close();
     CarbonDictionarySortIndexReader carbonDictionarySortIndexReader =
-        new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, "Name", storePath);
+        new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, columnIdentifier, storePath);
     List<Integer> actualSortIndex = carbonDictionarySortIndexReader.readSortIndex();
     List<Integer> actualInvertedSortIndex = carbonDictionarySortIndexReader.readInvertedSortIndex();
     for (int i = 0; i < actualSortIndex.size(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
index 2d37bda..deef2cc 100644
--- a/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
+++ b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
@@ -20,7 +20,7 @@ package org.carbondata.examples
 import org.apache.spark.sql.{CarbonContext, CarbonEnv, CarbonRelation}
 
 import org.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
-import org.carbondata.core.carbon.CarbonTableIdentifier
+import org.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
 import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.carbondata.core.carbon.path.CarbonStorePath
 import org.carbondata.examples.util.InitForExamples
@@ -76,7 +76,7 @@ object GenerateDictionaryExample {
       println(s"dictionary of dimension: ${dimension.getColName}")
       println(s"Key\t\t\tValue")
       val columnIdentifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
-        dimension.getColumnId, dimension.getDataType)
+        dimension.getColumnIdentifier, dimension.getDataType)
       val dict = CarbonLoaderUtil.getDictionary(columnIdentifier, carbonContext.storePath)
       var index: Int = 1
       var distinctValue = dict.getDictionaryValueForKey(index)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index 8e12361..690e37b 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -85,10 +85,19 @@ struct ColumnSchema{
 	11: optional string aggregate_function;
 
 	12: optional binary default_value;
-	/**
+	
+	13: optional map<string,string> columnProperties
+	
+    /**
 	* To specify the visibily of the column by default its false
 	*/
-	13: optional bool invisible;
+	14: optional bool invisible;
+
+	/**
+	 * column reference id
+	 */
+	15: optional string columnReferenceId;	
+	
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/hadoop/src/main/java/org/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java b/hadoop/src/main/java/org/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
index ba1aafc..bea8289 100644
--- a/hadoop/src/main/java/org/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
+++ b/hadoop/src/main/java/org/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
@@ -47,8 +47,8 @@ public abstract class AbstractDictionaryDecodedReadSupport<T> implements CarbonR
         try {
           dataTypes[i] = carbonColumns[i].getDataType();
           dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier(
-              absoluteTableIdentifier.getCarbonTableIdentifier(), carbonColumns[i].getColumnId(),
-              dataTypes[i]));
+              absoluteTableIdentifier.getCarbonTableIdentifier(),
+              carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
         } catch (CarbonUtilException e) {
           throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
index 556fc8d..ad21250 100644
--- a/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
@@ -41,6 +41,7 @@ import org.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.carbondata.core.carbon.CarbonTableIdentifier;
+import org.carbondata.core.carbon.ColumnIdentifier;
 import org.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.carbondata.core.carbon.metadata.converter.SchemaConverter;
 import org.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl;
@@ -289,9 +290,10 @@ public class StoreCreator {
     Cache dictCache = CacheProvider.getInstance()
         .createCache(CacheType.REVERSE_DICTIONARY, absoluteTableIdentifier.getStorePath());
     for (int i = 0; i < set.length; i++) {
+      ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
       CarbonDictionaryWriter writer =
           new CarbonDictionaryWriterImpl(absoluteTableIdentifier.getStorePath(),
-              absoluteTableIdentifier.getCarbonTableIdentifier(), dims.get(i).getColumnId());
+              absoluteTableIdentifier.getCarbonTableIdentifier(), columnIdentifier);
       for (String value : set[i]) {
         writer.write(value);
       }
@@ -299,14 +301,14 @@ public class StoreCreator {
 
       Dictionary dict = (Dictionary) dictCache.get(
           new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
-              dims.get(i).getColumnId(), dims.get(i).getDataType()));
+        		  columnIdentifier, dims.get(i).getDataType()));
       CarbonDictionarySortInfoPreparator preparator =
           new CarbonDictionarySortInfoPreparator();
       CarbonDictionarySortInfo dictionarySortInfo =
           preparator.getDictionarySortInfo(dict, dims.get(i).getDataType());
       CarbonDictionarySortIndexWriter carbonDictionaryWriter =
           new CarbonDictionarySortIndexWriterImpl(
-              absoluteTableIdentifier.getCarbonTableIdentifier(), dims.get(i).getColumnId(),
+              absoluteTableIdentifier.getCarbonTableIdentifier(), columnIdentifier,
               absoluteTableIdentifier.getStorePath());
       try {
         carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index c5eb971..220d1b7 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -32,6 +32,7 @@ import org.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.carbondata.core.carbon.CarbonTableIdentifier;
+import org.carbondata.core.carbon.ColumnIdentifier;
 import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.carbondata.core.carbon.metadata.datatype.DataType;
@@ -979,7 +980,7 @@ public final class CarbonLoaderUtil {
   }
 
   public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
-      String columnIdentifier, String carbonStorePath, DataType dataType)
+      ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
       throws CarbonUtilException {
     return getDictionary(
         new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType),

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index a4ac246..ce43c4f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -29,7 +29,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import org.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
-import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnIdentifier}
 import org.carbondata.core.carbon.metadata.datatype.DataType
 import org.carbondata.core.carbon.metadata.encoder.Encoding
 import org.carbondata.query.carbon.util.DataTypeUtil
@@ -102,7 +102,7 @@ case class CarbonDictionaryDecoder(
 
   val getDictionaryColumnIds = {
     val attributes = child.output
-    val dictIds: Array[(String, String, DataType)] = attributes.map { a =>
+    val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
       val attr = aliasMap.getOrElse(a, a)
       val relation = relations.find(p => p.contains(attr))
       if(relation.isDefined) {
@@ -113,7 +113,8 @@ case class CarbonDictionaryDecoder(
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
             canBeDecoded(attr)) {
-          (carbonTable.getFactTableName, carbonDimension.getColumnId, carbonDimension.getDataType)
+          (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+              carbonDimension.getDataType)
         } else {
           (null, null, null)
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 98fc334..e766aaa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import java.nio.charset.Charset
+import java.util
 import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.JavaConverters._
@@ -37,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveQlWrapper
 
 import org.carbondata.core.carbon.metadata.datatype.DataType
+import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.util.DataTypeUtil
 import org.carbondata.spark.exception.MalformedCarbonCommandException
 import org.carbondata.spark.util.CommonUtil
@@ -139,6 +141,7 @@ class CarbonSqlParser()
   protected val STARTTIME = Keyword("STARTTIME")
   protected val SEGMENTS = Keyword("SEGMENTS")
   protected val SEGMENT = Keyword("SEGMENT")
+  protected val SHARED = Keyword("SHARED")
 
   protected val doubleQuotedString = "\"([^\"]+)\"".r
   protected val singleQuotedString = "'([^']+)'".r
@@ -176,7 +179,8 @@ class CarbonSqlParser()
   }
 
   override protected lazy val start: Parser[LogicalPlan] =
-    createCube | showCreateCube | loadManagement | createAggregateTable | describeTable |
+     createCube | showCreateCube | loadManagement | createAggregateTable |
+      describeTable |
       showCube | showLoads | alterCube | showAllCubes | alterTable | createTable
 
   protected lazy val loadManagement: Parser[LogicalPlan] = loadData | dropCubeOrTable |
@@ -467,10 +471,14 @@ class CarbonSqlParser()
           sys.error("Not a carbon format request")
         }
 
-        // prepare table model of the collected tokens
-        val tableModel: tableModel = prepareTableModel(ifNotExistPresent, dbName, tableName, fields,
-          partitionCols,
-          tableProperties)
+      // validate tblProperties
+      if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
+        throw new MalformedCarbonCommandException("Invalid table properties")
+      }
+      // prepare table model of the collected tokens
+      val tableModel: tableModel = prepareTableModel(ifNotExistPresent, dbName, tableName, fields,
+        partitionCols,
+        tableProperties)
 
         // get logical plan.
         CreateCube(tableModel)
@@ -538,6 +546,8 @@ class CarbonSqlParser()
       fields, tableProperties)
     val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
 
+    // column properties
+    val colProps = extractColumnProperties(fields, tableProperties)
     // get column groups configuration from table properties.
     val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
         noDictionaryDims, msrs, dims)
@@ -548,7 +558,7 @@ class CarbonSqlParser()
       dbName.getOrElse("default"), dbName, tableName,
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
       msrs.map(f => normalizeType(f)), "", null, "",
-      None, Seq(), null, Option(noDictionaryDims), null, partitioner, groupCols)
+      None, Seq(), null, Option(noDictionaryDims), null, partitioner, groupCols, Some(colProps))
   }
 
   /**
@@ -562,10 +572,10 @@ class CarbonSqlParser()
       noDictionaryDims: Seq[String],
       msrs: Seq[Field],
       dims: Seq[Field]): Seq[String] = {
-    if (None != tableProperties.get("COLUMN_GROUPS")) {
+    if (None != tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS)) {
 
       var splittedColGrps: Seq[String] = Seq[String]()
-      val nonSplitCols: String = tableProperties.get("COLUMN_GROUPS").get
+      val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
 
       // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
       // here first splitting the value by () . so that the above will be splitted into 2 strings.
@@ -637,13 +647,13 @@ class CarbonSqlParser()
     var partitionClass: String = ""
     var partitionCount: Int = 1
     var partitionColNames: Array[String] = Array[String]()
-    if (None != tableProperties.get("PARTITIONCLASS")) {
-      partitionClass = tableProperties.get("PARTITIONCLASS").get
+    if (None != tableProperties.get(CarbonCommonConstants.PARTITIONCLASS)) {
+      partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
     }
 
-    if (None != tableProperties.get("PARTITIONCOUNT")) {
+    if (None != tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT)) {
       try {
-        partitionCount = tableProperties.get("PARTITIONCOUNT").get.toInt
+        partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
       } catch {
         case e: Exception => // no need to do anything.
       }
@@ -661,6 +671,54 @@ class CarbonSqlParser()
     None
   }
 
+  protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
+  util.Map[String, util.List[ColumnProperty]] = {
+    val colPropMap = new util.HashMap[String, util.List[ColumnProperty]]()
+    fields.foreach { field =>
+      if (field.children.isDefined && field.children.get != null) {
+        fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
+      } else {
+        fillColumnProperty(None, field.column, tableProperties, colPropMap)
+      }
+    }
+    colPropMap
+  }
+
+  protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
+    tableProperties: Map[String, String],
+    colPropMap: util.HashMap[String, util.List[ColumnProperty]]) {
+    fieldChildren.foreach(fields => {
+      fields.foreach(field => {
+        fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
+      }
+      )
+    }
+    )
+  }
+
+  protected def fillColumnProperty(parentColumnName: Option[String],
+    columnName: String,
+    tableProperties: Map[String, String],
+    colPropMap: util.HashMap[String, util.List[ColumnProperty]]) {
+    val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
+    val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
+    if (None != colProps) {
+      colPropMap.put(colProKey, colProps.get)
+    }
+  }
+
+  def getKey(parentColumnName: Option[String],
+    columnName: String): (String, String) = {
+    if (None != parentColumnName) {
+      if (columnName == "val") {
+        (parentColumnName.get, parentColumnName.get + "." + columnName)
+      } else {
+        (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
+      }
+    } else {
+      (columnName, columnName)
+    }
+  }
   /**
    * This will extract the Dimensions and NoDictionary Dimensions fields.
    * By default all string cols are dimensions.
@@ -678,8 +736,9 @@ class CarbonSqlParser()
     var dictIncludeCols: Seq[String] = Seq[String]()
 
     // All excluded cols should be there in create table cols
-    if (tableProperties.get("DICTIONARY_EXCLUDE").isDefined) {
-      dictExcludeCols = tableProperties.get("DICTIONARY_EXCLUDE").get.split(',').map(_.trim)
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+      dictExcludeCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
       dictExcludeCols
         .map { dictExcludeCol =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
@@ -702,8 +761,9 @@ class CarbonSqlParser()
         }
     }
     // All included cols should be there in create table cols
-    if (tableProperties.get("DICTIONARY_INCLUDE").isDefined) {
-      dictIncludeCols = tableProperties.get("DICTIONARY_INCLUDE").get.split(",").map(_.trim)
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+      dictIncludeCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
       dictIncludeCols.map { distIncludeCol =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol))) {
             val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol +
@@ -800,13 +860,15 @@ class CarbonSqlParser()
     var dictExcludedCols: Array[String] = Array[String]()
 
     // get all included cols
-    if (None != tableProperties.get("DICTIONARY_INCLUDE")) {
-      dictIncludedCols = tableProperties.get("DICTIONARY_INCLUDE").get.split(',').map(_.trim)
+    if (None != tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE)) {
+      dictIncludedCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
     }
 
     // get all excluded cols
-    if (None != tableProperties.get("DICTIONARY_EXCLUDE")) {
-      dictExcludedCols = tableProperties.get("DICTIONARY_EXCLUDE").get.split(',').map(_.trim)
+    if (None != tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE)) {
+      dictExcludedCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
     }
 
     // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
@@ -887,8 +949,8 @@ class CarbonSqlParser()
   }
 
   protected def unquoteString(str: String) = str match {
-    case singleQuotedString(s) => s
-    case doubleQuotedString(s) => s
+    case singleQuotedString(s) => s.toLowerCase()
+    case doubleQuotedString(s) => s.toLowerCase()
     case other => other
   }
 
@@ -1259,7 +1321,6 @@ class CarbonSqlParser()
           new DescribeCommand(UnresolvedRelation(tblIdentifier, None), ef.isDefined)
         }
     }
-
   private def normalizeType(field: Field): Field = {
     field.dataType.getOrElse("NIL") match {
       case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index a65b7b7..8868367 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -37,7 +37,9 @@ import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.util.FileUtils
+import org.codehaus.jackson.map.ObjectMapper
 
+import org.carbondata.common.factory.CarbonCommonFactory
 import org.carbondata.common.logging.LogServiceFactory
 import org.carbondata.core.carbon.CarbonDataLoadSchema
 import org.carbondata.core.carbon.metadata.CarbonMetadata
@@ -57,7 +59,8 @@ import org.carbondata.spark.exception.MalformedCarbonCommandException
 import org.carbondata.spark.load._
 import org.carbondata.spark.partition.api.impl.QueryPartitionHelper
 import org.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
+import org.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
+import org.carbondata.spark.CarbonSparkFactory
 
 
 case class tableModel(
@@ -76,7 +79,8 @@ case class tableModel(
     highcardinalitydims: Option[Seq[String]],
     aggregation: Seq[Aggregation],
     partitioner: Option[Partitioner],
-    columnGroups: Seq[String])
+    columnGroups: Seq[String],
+    colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None)
 
 case class Field(column: String, dataType: Option[String], name: Option[String],
     children: Option[List[Field]], parent: String = null,
@@ -93,6 +97,8 @@ case class FieldMapping(levelName: String, columnName: String)
 
 case class HierarchyMapping(hierName: String, hierType: String, levels: Seq[String])
 
+case class ColumnProperty(key: String, value: String)
+
 case class ComplexField(complexType: String, primitiveField: Option[Field],
     complexField: Option[ComplexField])
 
@@ -187,9 +193,26 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
     val columnSchema = new ColumnSchema()
     columnSchema.setDataType(dataType)
     columnSchema.setColumnName(colName)
-    columnSchema.setColumnUniqueId(UUID.randomUUID().toString)
-    columnSchema.setColumnar(isCol)
+    val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
+    if (highCardinalityDims.contains(colName)) {
+      encoders.remove(encoders.remove(Encoding.DICTIONARY))
+    }
+    if (dataType == DataType.TIMESTAMP) {
+      encoders.add(Encoding.DIRECT_DICTIONARY)
+    }
+    var colPropMap = new java.util.HashMap[String, String]()
+    if (None != cm.colProps && null != cm.colProps.get.get(colName)) {
+      val colProps = cm.colProps.get.get(colName)
+      colProps.asScala.foreach { x => colPropMap.put(x.key, x.value) }
+    }
+    columnSchema.setColumnProperties(colPropMap)
     columnSchema.setEncodingList(encoders)
+    val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.schemaName,
+      columnSchema)
+    columnSchema.setColumnUniqueId(columnUniqueId)
+    columnSchema.setColumnReferenceId(columnUniqueId)
+    columnSchema.setColumnar(isCol)
     columnSchema.setDimensionColumn(isDimensionCol)
     columnSchema.setColumnGroup(colGroup)
     columnSchema.setPrecision(precision)
@@ -258,15 +281,6 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
 
     updateColumnGroupsInFields(cm.columnGroups, allColumns)
 
-    for (column <- allColumns) {
-      if (highCardinalityDims.contains(column.getColumnName)) {
-        column.getEncodingList.remove(Encoding.DICTIONARY)
-      }
-      if (column.getDataType == DataType.TIMESTAMP) {
-        column.getEncodingList.add(Encoding.DIRECT_DICTIONARY)
-      }
-    }
-
     var newOrderedDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
     val complexDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
     val measures = scala.collection.mutable.ListBuffer[ColumnSchema]()
@@ -301,7 +315,8 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
       measures += measureColumn
       allColumns = allColumns ++ measures
     }
-
+    val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
+    columnValidator.validateColumns(allColumns)
     newOrderedDims = newOrderedDims ++ complexDims ++ measures
 
     cm.partitioner match {
@@ -1963,11 +1978,19 @@ private[sql] case class DescribeCommandFormatted(
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
       .lookupRelation2(tblIdentifier, None)(sqlContext).asInstanceOf[CarbonRelation]
+    val mapper = new ObjectMapper()
+    val colProps = StringBuilder.newBuilder
     var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
       val comment = if (relation.metaData.dims.contains(field.name)) {
         val dimension = relation.metaData.carbonTable.getDimensionByName(
             relation.cubeMeta.carbonTableIdentifier.getTableName,
             field.name)
+        if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
+          val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
+          colProps.append(field.name).append(".")
+          .append(mapper.writeValueAsString(dimension.getColumnProperties))
+          .append(",")
+        }
         if (dimension.hasEncoding(Encoding.DICTIONARY) &&
             !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
           "DICTIONARY, KEY COLUMN"
@@ -1975,18 +1998,22 @@ private[sql] case class DescribeCommandFormatted(
           "KEY COLUMN"
         }
       } else {
-        ""
+        ("MEASURE")
       }
       (field.name, field.dataType.simpleString, comment)
     }
+    val colPropStr = if (colProps.toString().trim().length() > 0) {
+      // drops additional comma at end
+      colProps.toString().dropRight(1)
+    } else {
+      colProps.toString()
+    }
     results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
     results ++= Seq(("Database Name : ", relation.cubeMeta.carbonTableIdentifier
       .getDatabaseName, "")
     )
     results ++= Seq(("Table Name : ", relation.cubeMeta.carbonTableIdentifier.getTableName, ""))
     results ++= Seq(("CARBON Store Path : ", relation.cubeMeta.storePath, ""))
-    results ++= getColumnGroups(relation.metaData.carbonTable.getDimensionByTableName(
-        relation.cubeMeta.carbonTableIdentifier.getTableName).asScala.toList)
     results ++= Seq(("", "", ""), ("#Aggregate Tables", "", ""))
     val carbonTable = relation.cubeMeta.carbonTable
     val aggTables = carbonTable.getAggregateTablesName
@@ -1994,7 +2021,9 @@ private[sql] case class DescribeCommandFormatted(
       results ++= Seq(("NONE", "", ""))
     } else {
       aggTables.asScala.foreach(aggTable => {
-        results ++= Seq(("", "", ""), ("Agg Table :" + aggTable, "#Columns", "#AggregateType"))
+        results ++= Seq(("", "", ""),
+          ("Agg Table :" + aggTable, "#Columns", "#AggregateType")
+        )
         carbonTable.getDimensionByTableName(aggTable).asScala.foreach(dim => {
           results ++= Seq(("", dim.getColName, ""))
         })
@@ -2004,9 +2033,14 @@ private[sql] case class DescribeCommandFormatted(
       }
       )
     }
-
+    results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
+    if (colPropStr.length() > 0) {
+      results ++= Seq((colPropStr, "", ""))
+    } else {
+      results ++= Seq(("NONE", "", ""))
+    }
     results.map { case (name, dataType, comment) =>
-      Row(name, dataType, comment)
+      Row(f"$name%-36s $dataType%-80s $comment%-72s")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 6fb43a5..412ca90 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -52,14 +52,18 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
   val LOGGER = LogServiceFactory.getLogService("CarbonStrategies")
 
   def getStrategies: Seq[Strategy] = {
-    val total = sqlContext.planner.strategies :+ CarbonTableScans :+ DDLStrategies
+    val total = sqlContext.planner.strategies :+ getCarbonTableScans :+ getDDLStrategies
     total
   }
 
+  def getCarbonTableScans: Strategy = new CarbonTableScans
+
+  def getDDLStrategies: Strategy = new DDLStrategies
+
   /**
    * Carbon strategies for Carbon cube scanning
    */
-  private[sql] object CarbonTableScans extends Strategy {
+  private[sql] class CarbonTableScans extends Strategy {
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case PhysicalOperation(projectList, predicates,
@@ -321,7 +325,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
     }
   }
 
-  object DDLStrategies extends Strategy {
+  class DDLStrategies extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case ShowCubeCommand(schemaName) =>
         ExecutedCommand(ShowAllTablesInSchema(schemaName, plan.output)) :: Nil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
index c4c214d..beab9cb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
@@ -26,10 +26,10 @@ private[sql] object CarbonStrategy {
   def getStrategy(context: SQLContext): Seq[Strategy] = {
     val carbonStrategy = new CarbonStrategies(context)
     if (context.conf.asInstanceOf[CarbonSQLConf].pushComputation) {
-      Seq(carbonStrategy.CarbonTableScans, carbonStrategy.DDLStrategies)
+      Seq(carbonStrategy.getCarbonTableScans, carbonStrategy.getDDLStrategies)
     } else {
       // TODO: need to remove duplicate code in strategies.
-      Seq(new CarbonRawStrategies(context).CarbonRawTableScans, carbonStrategy.DDLStrategies)
+      Seq(new CarbonRawStrategies(context).CarbonRawTableScans, carbonStrategy.getDDLStrategies)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonColumnValidator.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonColumnValidator.scala
new file mode 100644
index 0000000..51c4b67
--- /dev/null
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonColumnValidator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.carbondata.spark
+
+import org.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
+import org.carbondata.spark.exception.MalformedCarbonCommandException
+
+ /**
+  * Carbon column validator
+  */
+class CarbonColumnValidator extends ColumnValidator {
+  def validateColumns(allColumns: Seq[ColumnSchema]) {
+    allColumns.foreach { columnSchema =>
+      val colWithSameId = allColumns.filter { x =>
+        x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)
+      }
+      if (colWithSameId.size > 1) {
+        throw new MalformedCarbonCommandException("Two column can not have same columnId")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonSparkFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonSparkFactory.scala
new file mode 100644
index 0000000..80a6ad6
--- /dev/null
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonSparkFactory.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.carbondata.spark
+
+import org.carbondata.core.carbon.CarbonTableIdentifier
+import org.carbondata.core.carbon.ColumnIdentifier
+import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
+import org.carbondata.core.carbon.path.CarbonTablePath
+import org.carbondata.spark.exception.MalformedCarbonCommandException
+
+ /**
+  * Column validator
+  */
+trait ColumnValidator {
+  def validateColumns(columns: Seq[ColumnSchema])
+}
+/**
+ * Dictionary related helper service
+ */
+trait DictionaryDetailService {
+  def getDictionaryDetail(dictFolderPath: String, primDimensions: Array[CarbonDimension],
+      table: CarbonTableIdentifier, hdfsLocation: String): DictionaryDetail
+}
+
+/**
+ * Dictionary related detail
+ */
+case class DictionaryDetail(columnIdentifiers: Array[ColumnIdentifier],
+    dictFilePaths: Array[String], dictFileExists: Array[Boolean])
+
+/**
+ * Factory class
+ */
+object CarbonSparkFactory {
+   /**
+    * @return column validator
+    */
+  def getCarbonColumnValidator(): ColumnValidator = {
+    new CarbonColumnValidator
+  }
+
+  /**
+   * @return dictionary helper
+   */
+  def getDictionaryDetailService(): DictionaryDetailService = {
+    new DictionaryDetailHelper
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/carbondata/spark/DictionaryDetailHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark/src/main/scala/org/carbondata/spark/DictionaryDetailHelper.scala
new file mode 100644
index 0000000..e144f00
--- /dev/null
+++ b/integration/spark/src/main/scala/org/carbondata/spark/DictionaryDetailHelper.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.carbondata.spark
+
+import scala.collection.mutable.HashMap
+
+import org.carbondata.common.factory.CarbonCommonFactory
+import org.carbondata.core.carbon.CarbonTableIdentifier
+import org.carbondata.core.carbon.ColumnIdentifier
+import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.carbondata.core.carbon.path.CarbonStorePath
+import org.carbondata.core.carbon.path.CarbonTablePath
+import org.carbondata.core.datastorage.store.filesystem.CarbonFile
+import org.carbondata.core.datastorage.store.filesystem.CarbonFileFilter
+import org.carbondata.core.datastorage.store.impl.FileFactory
+
+class DictionaryDetailHelper extends DictionaryDetailService {
+  def getDictionaryDetail(dictfolderPath: String, primDimensions: Array[CarbonDimension],
+      table: CarbonTableIdentifier, hdfsLocation: String): DictionaryDetail = {
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
+    val dictFilePaths = new Array[String](primDimensions.length)
+    val dictFileExists = new Array[Boolean](primDimensions.length)
+    val columnIdentifier = new Array[ColumnIdentifier](primDimensions.length)
+
+    val fileType = FileFactory.getFileType(dictfolderPath)
+    // Metadata folder
+    val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType)
+    // need list all dictionary file paths with exists flag
+    val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
+      @Override def accept(pathname: CarbonFile): Boolean = {
+        CarbonTablePath.isDictionaryFile(pathname)
+      }
+    })
+    // 2 put dictionary file names to fileNamesMap
+    val fileNamesMap = new HashMap[String, Int]
+    for (i <- 0 until carbonFiles.length) {
+      fileNamesMap.put(carbonFiles(i).getName, i)
+    }
+    // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not.
+    primDimensions.zipWithIndex.foreach { f =>
+      columnIdentifier(f._2) = f._1.getColumnIdentifier
+      dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId)
+      dictFileExists(f._2) =
+        fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match {
+          case None => false
+          case Some(_) => true
+        }
+    }
+
+    DictionaryDetail(columnIdentifier, dictFilePaths, dictFileExists)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index ca18d24..a78daa8 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
 
 import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.carbon.CarbonTableIdentifier
+import org.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
 import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.spark.load.CarbonLoaderUtil
@@ -140,6 +140,7 @@ case class DictionaryLoadModel(table: CarbonTableIdentifier,
     highCardIdentifyEnable: Boolean,
     highCardThreshold: Int,
     rowCountPercentage: Double,
+    columnIdentifier: Array[ColumnIdentifier],
     isFirstLoad: Boolean) extends Serializable
 
 case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
@@ -232,7 +233,7 @@ class CarbonGlobalDictionaryGenerateRDD(
         val t1 = System.currentTimeMillis
         dictionaryForDistinctValueLookUp = if (model.dictFileExists(split.index)) {
           CarbonLoaderUtil.getDictionary(model.table,
-            model.primDimensions(split.index).getColumnId,
+            model.columnIdentifier(split.index),
             model.hdfsLocation,
             model.primDimensions(split.index).getDataType
           )
@@ -275,7 +276,7 @@ class CarbonGlobalDictionaryGenerateRDD(
           val t4 = System.currentTimeMillis
           if (distinctValueCount > 0) {
             dictionaryForSortIndexWriting = CarbonLoaderUtil.getDictionary(model.table,
-              model.primDimensions(split.index).getColumnId,
+              model.columnIdentifier(split.index),
               model.hdfsLocation,
               model.primDimensions(split.index).getDataType)
             GlobalDictionaryUtil.writeGlobalDictionaryColumnSortInfo(model, split.index,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/CommonUtil.scala
index 72a1256..9ede819 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CommonUtil.scala
@@ -16,8 +16,13 @@
  */
 package org.carbondata.spark.util
 
-import org.apache.spark.sql.execution.command.Field
+import java.util
+import java.util.UUID
 
+import org.apache.spark.sql.execution.command.{ColumnProperty, Field}
+
+import org.carbondata.core.carbon.metadata.datatype.DataType
+import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.spark.exception.MalformedCarbonCommandException
 
 object CommonUtil {
@@ -54,6 +59,7 @@ object CommonUtil {
 
   }
 
+
   def isTimeStampColumn(colName: String, dims: Seq[Field]): Boolean = {
     dims.foreach { dim =>
       if (dim.column.equalsIgnoreCase(colName)) {
@@ -84,4 +90,75 @@ object CommonUtil {
     }
     false
   }
+
+  def getColumnProperties(column: String,
+      tableProperties: Map[String, String]): Option[util.List[ColumnProperty]] = {
+    val fieldProps = new util.ArrayList[ColumnProperty]()
+    val columnPropertiesStartKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
+    tableProperties.foreach {
+      case (key, value) =>
+        if (key.startsWith(columnPropertiesStartKey)) {
+          fieldProps.add(ColumnProperty(key.substring(columnPropertiesStartKey.length(),
+            key.length()), value))
+        }
+    }
+    if (fieldProps.isEmpty()) {
+      None
+    } else {
+      Some(fieldProps)
+    }
+  }
+
+  def validateTblProperties(tableProperties: Map[String, String], fields: Seq[Field]): Boolean = {
+    val itr = tableProperties.keys
+    var isValid: Boolean = true
+    tableProperties.foreach {
+      case (key, value) =>
+        if (!validateFields(key, fields)) {
+          isValid = false
+          throw new MalformedCarbonCommandException(s"Invalid table properties ${ key }")
+        }
+    }
+    isValid
+  }
+
+  def validateFields(key: String, fields: Seq[Field]): Boolean = {
+    var isValid: Boolean = false
+    fields.foreach { field =>
+      if (field.children.isDefined && field.children.get != null) {
+        field.children.foreach(fields => {
+          fields.foreach(complexfield => {
+            val column = if ("val" == complexfield.column) {
+              field.column
+            } else {
+              field.column + "." + complexfield.column
+            }
+            if (validateColumnProperty(key, column)) {
+              isValid = true
+            }
+          }
+          )
+        }
+        )
+      } else {
+        if (validateColumnProperty(key, field.column)) {
+          isValid = true
+        }
+      }
+
+    }
+    isValid
+  }
+
+  def validateColumnProperty(key: String, column: String): Boolean = {
+    if (!key.startsWith(CarbonCommonConstants.COLUMN_PROPERTIES)) {
+      return true
+    }
+    val columnPropertyKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
+    if (key.startsWith(columnPropertyKey)) {
+      true
+    } else {
+      false
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
index 6298aaa..77e2eec 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -33,9 +33,11 @@ import org.apache.spark.sql.{CarbonEnv, CarbonRelation, DataFrame, SQLContext}
 import org.apache.spark.sql.hive.CarbonMetastoreCatalog
 import org.apache.spark.util.FileUtils
 
+import org.carbondata.common.factory.CarbonCommonFactory
 import org.carbondata.core.cache.dictionary.Dictionary
 import org.carbondata.core.carbon.CarbonDataLoadSchema
 import org.carbondata.core.carbon.CarbonTableIdentifier
+import org.carbondata.core.carbon.ColumnIdentifier
 import org.carbondata.core.carbon.metadata.datatype.DataType
 import org.carbondata.core.carbon.metadata.encoder.Encoding
 import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
@@ -53,6 +55,7 @@ import org.carbondata.spark.load.CarbonLoaderUtil
 import org.carbondata.spark.load.CarbonLoadModel
 import org.carbondata.spark.partition.reader.CSVWriter
 import org.carbondata.spark.rdd.{ArrayParser, CarbonBlockDistinctValuesCombineRDD, CarbonDataRDDFactory, CarbonGlobalDictionaryGenerateRDD, ColumnPartitioner, DataFormat, DictionaryLoadModel, GenericParser, PrimitiveParser, StructParser}
+import org.carbondata.spark.CarbonSparkFactory
 
 /**
  * A object which provide a method to generate global dictionary from CSV files.
@@ -137,9 +140,12 @@ object GlobalDictionaryUtil extends Logging {
   def writeGlobalDictionaryToFile(model: DictionaryLoadModel,
       columnIndex: Int,
       iter: Iterator[String]): Unit = {
-    val writer: CarbonDictionaryWriter = new CarbonDictionaryWriterImpl(
-      model.hdfsLocation, model.table,
-      model.primDimensions(columnIndex).getColumnId)
+    val dictService = CarbonCommonFactory.getDictionaryService
+    val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter(
+      model.table,
+      model.columnIdentifier(columnIndex),
+      model.hdfsLocation
+    )
     try {
       while (iter.hasNext) {
         writer.write(iter.next)
@@ -158,12 +164,13 @@ object GlobalDictionaryUtil extends Logging {
       index: Int,
       dictionary: Dictionary): Unit = {
     val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator
+    val dictService = CarbonCommonFactory.getDictionaryService
     val dictionarySortInfo: CarbonDictionarySortInfo =
       preparator.getDictionarySortInfo(dictionary,
         model.primDimensions(index).getDataType)
     val carbonDictionaryWriter: CarbonDictionarySortIndexWriter =
-      new CarbonDictionarySortIndexWriterImpl(model.table,
-        model.primDimensions(index).getColumnId, model.hdfsLocation)
+      dictService.getDictionarySortIndexWriter(model.table, model.columnIdentifier(index),
+          model.hdfsLocation)
     try {
       carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
       carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted)
@@ -179,7 +186,7 @@ object GlobalDictionaryUtil extends Logging {
     val dictMap = new HashMap[String, Dictionary]
     model.primDimensions.zipWithIndex.filter(f => model.dictFileExists(f._2)).foreach { m =>
       val dict = CarbonLoaderUtil.getDictionary(model.table,
-        m._1.getColumnId, model.hdfsLocation,
+        m._1.getColumnIdentifier, model.hdfsLocation,
         m._1.getDataType
       )
       dictMap.put(m._1.getColumnId, dict)
@@ -192,11 +199,13 @@ object GlobalDictionaryUtil extends Logging {
    */
   def readGlobalDictionaryFromFile(model: DictionaryLoadModel): HashMap[String, HashSet[String]] = {
     val dictMap = new HashMap[String, HashSet[String]]
+    val dictService = CarbonCommonFactory.getDictionaryService
     for (i <- model.primDimensions.indices) {
       val set = new HashSet[String]
       if (model.dictFileExists(i)) {
-        val reader: CarbonDictionaryReader = new CarbonDictionaryReaderImpl(
-          model.hdfsLocation, model.table, model.primDimensions(i).getColumnId)
+        val reader: CarbonDictionaryReader = dictService.getDictionaryReader(model.table,
+          model.columnIdentifier(i), model.hdfsLocation
+        )
         val values = reader.read
         if (values != null) {
           for (j <- 0 until values.size) {
@@ -293,52 +302,13 @@ object GlobalDictionaryUtil extends Logging {
         isComplexes += dimensions(i).isComplex
       }
     }
-    val primDimensions = primDimensionsBuffer.map { x => x }.toArray
-    val dictFilePaths = new Array[String](primDimensions.length)
-    val dictFileExists = new Array[Boolean](primDimensions.length)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
-    val fileType = FileFactory.getFileType(dictfolderPath)
-    // Metadata folder
-    val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType)
-    // need list all dictionary file paths with exists flag
-    metadataDirectory.exists match {
-      case true =>
-        // if Metadata folder is exists, check whether each dictionary file is exists or not.
-        // 1 list all dictionary files in Metadata folder
-        val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
-          @Override def accept(pathname: CarbonFile): Boolean = {
-            CarbonTablePath.isDictionaryFile(pathname)
-          }
-        })
-        // 2 put dictionary file names to fileNamesMap
-        val fileNamesMap = new HashMap[String, Int]
-        for (i <- 0 until carbonFiles.length) {
-          fileNamesMap.put(carbonFiles(i).getName, i)
-        }
-        // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not.
-        primDimensions.zipWithIndex.foreach { f =>
-          dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId)
-          dictFileExists(f._2) =
-            fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match {
-              case None => false
-              case Some(_) => true
-            }
-        }
-      case false =>
-        // if Metadata folder is not exists, all dictionary files are not exists also.
-        try {
-          // create Metadata folder
-          FileFactory.mkdirs(dictfolderPath, fileType)
-        } catch {
-          case ex: IOException =>
-            throw new IOException(s"Failed to created dictionary folder: ${dictfolderPath}")
-        }
-        primDimensions.zipWithIndex.foreach { f =>
-          dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId)
-          // all dictionary files are not exists
-          dictFileExists(f._2) = false
-        }
-    }
+    val primDimensions = primDimensionsBuffer.map { x => x }.toArray
+    val dictDetail = CarbonSparkFactory.getDictionaryDetailService().
+    getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
+    val dictFilePaths = dictDetail.dictFilePaths
+    val dictFileExists = dictDetail.dictFileExists
+    val columnIdentifier = dictDetail.columnIdentifiers
 
     // load high cardinality identify configure
     val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty(
@@ -367,6 +337,7 @@ object GlobalDictionaryUtil extends Logging {
       highCardIdentifyEnable,
       highCardThreshold,
       rowCountPercentage,
+      columnIdentifier,
       carbonLoadModel.getLoadMetadataDetails.size() == 0)
   }
 
@@ -552,9 +523,11 @@ object GlobalDictionaryUtil extends Logging {
     val values = valuesBuffer.toArray
     java.util.Arrays.sort(values, Ordering[String])
     var distinctValueCount: Int = 0
-    val writer: CarbonDictionaryWriter = new CarbonDictionaryWriterImpl(
-      model.hdfsLocation, model.table,
-      model.primDimensions(columnIndex).getColumnId)
+    val dictService = CarbonCommonFactory.getDictionaryService
+    val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter(
+        model.table,
+        model.columnIdentifier(columnIndex),
+        model.hdfsLocation)
     try {
       if (!model.dictFileExists(columnIndex)) {
         writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java b/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
index 88ab188..8399d28 100644
--- a/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
+++ b/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
@@ -1,4 +1,4 @@
-package org.carbondata.spark.load;
+package org.carbondata.integration.spark.load;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -7,7 +7,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
-
+import org.carbondata.spark.load.CarbonLoaderUtil;
 import org.junit.Test;
 import org.pentaho.di.core.util.Assert;
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java b/integration/spark/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
index a254f8d..ccc2820 100644
--- a/integration/spark/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
+++ b/integration/spark/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
@@ -1,4 +1,4 @@
-package org.carbondata.spark.testsuite.validation;
+package org.carbondata.integration.spark.testsuite.validation;
 
 import org.apache.spark.sql.common.util.CarbonHiveContext;
 import org.carbondata.core.carbon.CarbonTableIdentifier;
@@ -15,10 +15,10 @@ import org.carbondata.format.BlockletInfo;
 import org.carbondata.format.DataChunk;
 import org.carbondata.format.Encoding;
 import org.carbondata.format.FileFooter;
-
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
+
 import static org.junit.Assert.assertTrue;
 
 public class FileFooterValidator {
@@ -44,7 +44,7 @@ public class FileFooterValidator {
                 + "PARTITIONDATA(DELIMITER ',', QUOTECHAR '\"')");
     String storePath =
         CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
-    CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("default", "validatefooter");
+    CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("default", "validatefooter", "1");
     String segmentPath = CarbonStorePath.getCarbonTablePath(storePath, tableIdentifier)
         .getCarbonDataDirectoryPath("0", "0");
     CarbonFile carbonFile =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
index 969b01f..f825ce2 100644
--- a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
@@ -2,6 +2,7 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.common.util.QueryTest
 import org.apache.spark.sql.execution.command.Field
+import org.carbondata.core.constants.CarbonCommonConstants
 
 /**
   * Stub class for calling the CarbonSqlParser
@@ -69,7 +70,7 @@ class TestCarbonSqlParser extends QueryTest {
   // Testing the column group Splitting method.
   test("Test-updateColumnGroupsInField") {
     val colGroupStr = "(col2,col3),(col5,col6),(col7,col8)"
-    val tableProperties = Map("COLUMN_GROUPS" -> colGroupStr)
+    val tableProperties = Map(CarbonCommonConstants.COLUMN_GROUPS -> colGroupStr)
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     val colgrps = stub.updateColumnGroupsInFieldTest(fields, tableProperties)
@@ -80,7 +81,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
   test("Test-ColumnGroupsInvalidField_Shouldnotallow") {
     val colGroupStr = "(col1,col2),(col10,col6),(col7,col8)"
-    val tableProperties = Map("COLUMN_GROUPS" -> colGroupStr)
+    val tableProperties = Map(CarbonCommonConstants.COLUMN_GROUPS -> colGroupStr)
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     try {
@@ -93,7 +94,7 @@ class TestCarbonSqlParser extends QueryTest {
   test("Test-MeasureInColumnGroup_ShouldNotAllow") {
     //col1 is measure
     val colGroupStr = "(col1,col2),(col5,col6),(col7,col8)"
-    val tableProperties = Map("COLUMN_GROUPS" -> colGroupStr)
+    val tableProperties = Map(CarbonCommonConstants.COLUMN_GROUPS -> colGroupStr)
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     try {
@@ -107,7 +108,7 @@ class TestCarbonSqlParser extends QueryTest {
     //col5 is no dictionary
     val colGroupStr = "(col2,col3),(col5,col6),(col7,col8)"
     val noDictStr = "col5"
-    val tableProperties = Map("COLUMN_GROUPS" -> colGroupStr, "DICTIONARY_EXCLUDE" -> noDictStr)
+    val tableProperties = Map(CarbonCommonConstants.COLUMN_GROUPS -> colGroupStr, CarbonCommonConstants.DICTIONARY_EXCLUDE -> noDictStr)
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     try {
@@ -119,7 +120,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
   test("Test-SameColumnInDifferentGroup_ShouldNotAllow") {
     val colGroupStr = "(col2,col3),(col5,col6),(col6,col7,col8)"
-    val tableProperties = Map("COLUMN_GROUPS" -> colGroupStr)
+    val tableProperties = Map(CarbonCommonConstants.COLUMN_GROUPS -> colGroupStr)
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     try {
@@ -132,7 +133,7 @@ class TestCarbonSqlParser extends QueryTest {
   
    test("Test-ColumnAreNotTogetherAsInSchema_ShouldNotAllow") {
     val colGroupStr = "(col2,col3),(col5,col8)"
-    val tableProperties = Map("COLUMN_GROUPS" -> colGroupStr)
+    val tableProperties = Map(CarbonCommonConstants.COLUMN_GROUPS -> colGroupStr)
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     try {
@@ -144,7 +145,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
   test("Test-ColumnInColumnGroupAreShuffledButInSequence") {
     val colGroupStr = "(col2,col3),(col7,col8,col6)"
-    val tableProperties = Map("COLUMN_GROUPS" -> colGroupStr)
+    val tableProperties = Map(CarbonCommonConstants.COLUMN_GROUPS -> colGroupStr)
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     
@@ -164,7 +165,7 @@ class TestCarbonSqlParser extends QueryTest {
 
   // Testing the extracting of Dims and no Dictionary
   test("Test-extractDimColsAndNoDictionaryFields") {
-    val tableProperties = Map("DICTIONARY_EXCLUDE" -> "col2", "DICTIONARY_INCLUDE" -> "col4")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col4")
     var fields: Seq[Field] = loadAllFields
 
     val stub = new TestCarbonSqlParserStub()
@@ -184,7 +185,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
 
   test("Test-DimAndMsrColsWithNoDictionaryFields1") {
-    val tableProperties = Map("DICTIONARY_EXCLUDE" -> "col1")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1")
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     var (dimCols, noDictionary) = stub
@@ -207,7 +208,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
 
   test("Test-DimAndMsrColsWithNoDictionaryFields2") {
-    val tableProperties = Map("DICTIONARY_INCLUDE" -> "col1")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     var (dimCols, noDictionary) = stub
@@ -229,7 +230,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
 
   test("Test-DimAndMsrColsWithNoDictionaryFields3") {
-    val tableProperties = Map("DICTIONARY_EXCLUDE" -> "col1", "DICTIONARY_INCLUDE" -> "col4")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col4")
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     var (dimCols, noDictionary) = stub
@@ -252,7 +253,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
 
   test("Test-DimAndMsrColsWithNoDictionaryFields4") {
-    val tableProperties = Map("DICTIONARY_EXCLUDE" -> "col3", "DICTIONARY_INCLUDE" -> "col2")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col3", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2")
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     var (dimCols, noDictionary) = stub
@@ -275,7 +276,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
 
   test("Test-DimAndMsrColsWithNoDictionaryFields5") {
-    val tableProperties = Map("DICTIONARY_EXCLUDE" -> "col4", "DICTIONARY_INCLUDE" -> "col2")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col4", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2")
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     var (dimCols, noDictionary) = stub
@@ -298,7 +299,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
 
   test("Test-DimAndMsrColsWithNoDictionaryFields6") {
-    val tableProperties = Map("DICTIONARY_EXCLUDE" -> "col2", "DICTIONARY_INCLUDE" -> "col1")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     var (dimCols, noDictionary) = stub
@@ -321,8 +322,8 @@ class TestCarbonSqlParser extends QueryTest {
   }
 
   test("Test-DimAndMsrColsWithNoDictionaryFields7") {
-    val tableProperties = Map("DICTIONARY_EXCLUDE" -> "col2 ,col1  ",
-      "DICTIONARY_INCLUDE" -> "col3 ,col4 "
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2 ,col1  ",
+      CarbonCommonConstants.DICTIONARY_INCLUDE -> "col3 ,col4 "
     )
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
@@ -347,7 +348,7 @@ class TestCarbonSqlParser extends QueryTest {
   }
 
   test("Test-DimAndMsrColsWithNoDictionaryFields8") {
-    val tableProperties = Map("DICTIONARY_EXCLUDE" -> "col2,col4", "DICTIONARY_INCLUDE" -> "col3")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE-> "col2,col4", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col3")
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     var (dimCols, noDictionary) = stub
@@ -372,7 +373,7 @@ class TestCarbonSqlParser extends QueryTest {
 
   // Testing the extracting of measures
   test("Test-extractMsrColsFromFields") {
-    val tableProperties = Map("DICTIONARY_EXCLUDE" -> "col2", "DICTIONARY_INCLUDE" -> "col4")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col4")
     var fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
     var msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
index 9d10937..21a8a46 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
@@ -87,7 +87,7 @@ class TestLoadDataWithMalformedCarbonCommandException extends QueryTest with Bef
       buildTableWithNoExistDictExclude()
     } catch {
       case e: MalformedCarbonCommandException =>
-        assert(e.getMessage.equals("DICTIONARY_EXCLUDE column: CCC does not exist in table. " +
+        assert(e.getMessage.equals("DICTIONARY_EXCLUDE column: ccc does not exist in table. " +
           "Please check create table statement."))
       case _ => assert(false)
     }
@@ -98,7 +98,7 @@ class TestLoadDataWithMalformedCarbonCommandException extends QueryTest with Bef
       buildTableWithNoExistDictInclude()
     } catch {
       case e: MalformedCarbonCommandException =>
-        assert(e.getMessage.equals("DICTIONARY_INCLUDE column: AAA does not exist in table. " +
+        assert(e.getMessage.equals("DICTIONARY_INCLUDE column: aaa does not exist in table. " +
           "Please check create table statement."))
       case _ => assert(false)
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/ColumnPropertyValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/ColumnPropertyValidationTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/ColumnPropertyValidationTestCase.scala
new file mode 100644
index 0000000..b9affa7
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/ColumnPropertyValidationTestCase.scala
@@ -0,0 +1,28 @@
+package org.carbondata.spark.testsuite.detailquery;
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.sql.Row
+
+class ColumnPropertyValidationTestCase extends QueryTest with BeforeAndAfterAll {
+  test("Validate ColumnProperties_ valid key") {
+     try {
+       sql("create table employee(empname String,empid String,city String,country String,gender String,salary Double) stored by 'org.apache.carbondata.format' tblproperties('columnproperties.gender.key'='value')")
+       assert(true)
+       sql("drop table employee")
+     } catch {
+       case e =>assert(false)
+     }
+  }
+  test("Validate Dictionary include _ invalid key") {
+     try {
+       sql("create table employee(empname String,empid String,city String,country String,gender String,salary Double) stored by 'org.apache.carbondata.format' tblproperties('columnproperties.invalid.key'='value')")
+       assert(false)
+       sql("drop table employee")
+     } catch {
+       case e =>assert(true)
+     }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
index dbe7796..ed50788 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.common.util.CarbonHiveContext
 import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.QueryTest
 
+import org.carbondata.core.carbon.ColumnIdentifier
 import org.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
 import org.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
 import org.carbondata.core.constants.CarbonCommonConstants
@@ -159,10 +160,10 @@ class GlobalDictionaryUtilTestCase extends QueryTest with BeforeAndAfterAll {
     val dimension = table.getDimensionByName(table.getFactTableName, columnName)
     val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid")
 
-    val columnIdentifier = new DictionaryColumnUniqueIdentifier(tableIdentifier,
-      dimension.getColumnId, dimension.getDataType
+    val dictColumnIdentifier = new DictionaryColumnUniqueIdentifier(tableIdentifier,
+      dimension.getColumnIdentifier, dimension.getDataType
     )
-    val dict = CarbonLoaderUtil.getDictionary(columnIdentifier,
+    val dict = CarbonLoaderUtil.getDictionary(dictColumnIdentifier,
       CarbonHiveContext.hdfsCarbonBasePath
     )
     assert(dict.getSurrogateKey(value) != CarbonCommonConstants.INVALID_SURROGATE_KEY)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
index d074473..435180b 100644
--- a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
+++ b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -538,6 +538,7 @@ public class GraphGenerator {
     seqMeta.setTaskNo(taskNo);
     seqMeta.setCarbondim(graphConfiguration.getDimensionString());
     seqMeta.setComplexTypeString(graphConfiguration.getComplexTypeString());
+    seqMeta.setColumnPropertiesString(graphConfiguration.getColumnPropertiesString());
     seqMeta.setBatchSize(Integer.parseInt(graphConfiguration.getBatchSize()));
     seqMeta.setNoDictionaryDims(graphConfiguration.getNoDictionaryDims());
     seqMeta.setDimensionColumnsDataType(graphConfiguration.getDimensionColumnsDataType());
@@ -787,6 +788,8 @@ public class GraphGenerator {
         .setDimensions(CarbonSchemaParser.getCubeDimensions(dimensions, carbonDataLoadSchema));
     graphConfiguration
         .setActualDims(CarbonSchemaParser.getCubeDimensions(dimensions, carbonDataLoadSchema));
+    graphConfiguration
+        .setColumnPropertiesString(CarbonSchemaParser.getColumnPropertiesString(dimensions));
     graphConfiguration.setComplexTypeString(CarbonSchemaParser.getComplexTypeString(dimensions));
     prepareNoDictionaryMapping(dimensions, graphConfiguration);
     graphConfiguration

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java b/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
index 46960dc..8ab37a6 100644
--- a/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
+++ b/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
@@ -190,6 +190,8 @@ public class GraphConfigurationInfo {
 
   private Boolean[] isNoDictionaryDimMapping;
 
+  private String columnPropertiesString;
+
   /**
    * wrapper object holding the columnschemadetails
    */
@@ -999,6 +1001,14 @@ public class GraphConfigurationInfo {
     this.isNoDictionaryDimMapping = isNoDictionaryDimMapping;
   }
 
+  public void setColumnPropertiesString(String columnPropertiesString) {
+    this.columnPropertiesString = columnPropertiesString;
+  }
+
+  public String getColumnPropertiesString() {
+    return this.columnPropertiesString;
+  }
+
   /**
    * @return columngroups
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/processing/src/main/java/org/carbondata/processing/schema/metadata/ColumnsInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/schema/metadata/ColumnsInfo.java b/processing/src/main/java/org/carbondata/processing/schema/metadata/ColumnsInfo.java
index 5774453..9613a86 100644
--- a/processing/src/main/java/org/carbondata/processing/schema/metadata/ColumnsInfo.java
+++ b/processing/src/main/java/org/carbondata/processing/schema/metadata/ColumnsInfo.java
@@ -154,6 +154,8 @@ public class ColumnsInfo {
    */
   private ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper;
 
+  private Map<String, Map<String, String>> columnProperties;
+
   public Map<String, GenericDataType> getComplexTypesMap() {
     return complexTypesMap;
   }
@@ -510,4 +512,12 @@ public class ColumnsInfo {
   public void setColumnSchemaDetailsWrapper(ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper) {
     this.columnSchemaDetailsWrapper = columnSchemaDetailsWrapper;
   }
+
+  public void setColumnProperties(Map<String, Map<String, String>> columnProperties) {
+    this.columnProperties = columnProperties;
+  }
+
+  public Map<String, String> getColumnProperties(String columnName) {
+    return this.columnProperties.get(columnName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
index 4625b4a..77f7297 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
@@ -85,6 +85,8 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
   protected Map<String, GenericDataType> complexTypes =
       new HashMap<String, GenericDataType>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
+  protected Map<String, Map<String, String>> columnProperties =
+      new HashMap<String, Map<String, String>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   /**
    * dimLens
    */
@@ -265,6 +267,8 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
   private String complexDelimiterLevel2;
   private String complexTypeString;
 
+  private String columnPropertiesString;
+
   private String[] complexTypeColumns;
   /**
    * Primary Key String
@@ -412,6 +416,10 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
     this.complexTypeString = complexTypeString;
   }
 
+  public void setColumnPropertiesString(String columnPropertiesString) {
+    this.columnPropertiesString = columnPropertiesString;
+  }
+
   public String[] getComplexTypeColumns() {
     return complexTypeColumns;
   }
@@ -617,6 +625,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
     carbonhierColumn = "";
     foreignKeyHierarchyString = "";
     complexTypeString = "";
+    columnPropertiesString = "";
     complexDelimiterLevel1 = "";
     complexDelimiterLevel2 = "";
     primaryKeysString = "";
@@ -663,6 +672,8 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
         .append(XMLHandler.addTagValue("foreignKeyHierarchyString", foreignKeyHierarchyString));
     retval.append("    ").append(XMLHandler.addTagValue("complexTypeString", complexTypeString));
     retval.append("    ")
+        .append(XMLHandler.addTagValue("columnPropertiesString", columnPropertiesString));
+    retval.append("    ")
         .append(XMLHandler.addTagValue("complexDelimiterLevel1", complexDelimiterLevel1));
     retval.append("    ")
         .append(XMLHandler.addTagValue("complexDelimiterLevel2", complexDelimiterLevel2));
@@ -719,6 +730,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
       carbonhierColumn = XMLHandler.getTagValue(stepnode, "carbonhierColumn");
       foreignKeyHierarchyString = XMLHandler.getTagValue(stepnode, "foreignKeyHierarchyString");
       complexTypeString = XMLHandler.getTagValue(stepnode, "complexTypeString");
+      columnPropertiesString = XMLHandler.getTagValue(stepnode, "columnPropertiesString");
       complexDelimiterLevel1 = XMLHandler.getTagValue(stepnode, "complexDelimiterLevel1");
       complexDelimiterLevel2 = XMLHandler.getTagValue(stepnode, "complexDelimiterLevel2");
       primaryKeysString = XMLHandler.getTagValue(stepnode, "primaryKeysString");
@@ -782,6 +794,9 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
       complexTypeColumns = new String[0];
     }
 
+    if (null != columnPropertiesString) {
+      updateColumnPropertiesMap(columnPropertiesString);
+    }
     hirches = getHierarichies(carbonhier);
 
     hierColumnMap = getHierarchiesColumnMap(carbonhierColumn);
@@ -824,6 +839,23 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
     updateDenormColunList(denormColumNames);
   }
 
+  private void updateColumnPropertiesMap(String columnPropertiesString) {
+    String[] colsProperty = columnPropertiesString.split(CarbonCommonConstants.HASH_SPC_CHARACTER);
+    for (String property : colsProperty) {
+      String[] colKeyVals = property.split(CarbonCommonConstants.COLON_SPC_CHARACTER);
+      String colName = colKeyVals[0];
+      Map<String, String> colPropMap = new HashMap<>();
+      String[] keyVals = colKeyVals[1].split(CarbonCommonConstants.COMA_SPC_CHARACTER);
+      for (int i = 0; i < keyVals.length; i++) {
+        String[] keyVal = keyVals[i].split(CarbonCommonConstants.HYPHEN_SPC_CHARACTER);
+        String key = keyVal[0];
+        String value = keyVal[1];
+        colPropMap.put(key, value);
+      }
+      columnProperties.put(colName, colPropMap);
+    }
+  }
+
   private void updateDenormColunList(String denormColumNames) {
     //
     if (null == denormColumNames || "".equals(denormColumNames)) {
@@ -1634,6 +1666,10 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
     return taskNo;
   }
 
+  public Map<String, Map<String, String>> getColumnPropertiesMap() {
+    return columnProperties;
+  }
+
   /**
    * returns wrapper object having the columnSchemaDetails
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/38d84e0e/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index d5c8e61..f4e333e 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -367,6 +367,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
           columnsInfo.setComplexTypesMap(meta.getComplexTypes());
           columnsInfo.setDimensionColumnIds(meta.getDimensionColumnIds());
           columnsInfo.setColumnSchemaDetailsWrapper(meta.getColumnSchemaDetailsWrapper());
+          columnsInfo.setColumnProperties(meta.getColumnPropertiesMap());
           updateBagLogFileName();
           String key = meta.getSchemaName() + '/' + meta.getCubeName() + '_' + meta.getTableName();
           badRecordslogger = new BadRecordslogger(key, csvFilepath, getBadLogStoreLocation(


Mime
View raw message