carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [30/54] [abbrv] carbondata git commit: [CARBONDATA-2172][Lucene] Add text_columns property for Lucene DataMap
Date Thu, 08 Mar 2018 16:55:31 GMT
[CARBONDATA-2172][Lucene] Add text_columns property for Lucene DataMap

Add text_columns property for Lucene DataMap

This closes #2019


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d23f7fad
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d23f7fad
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d23f7fad

Branch: refs/heads/master
Commit: d23f7fad1f7db029d1dd0cc8e3db7a5b79463179
Parents: f9291cd
Author: QiangCai <qiangcai@qq.com>
Authored: Thu Mar 1 15:40:01 2018 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Mar 8 22:21:11 2018 +0530

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       | 16 ++--
 .../core/datamap/dev/DataMapFactory.java        |  4 +-
 .../blockletindex/BlockletDataMapFactory.java   |  2 +-
 .../ThriftWrapperSchemaConverterImpl.java       |  2 +-
 .../schema/datamap/DataMapProvider.java         |  4 +
 .../schema/table/AggregationDataMapSchema.java  |  2 +-
 .../core/metadata/schema/table/CarbonTable.java | 16 +++-
 .../metadata/schema/table/DataMapSchema.java    | 44 ++++++++--
 .../core/metadata/schema/table/TableInfo.java   |  2 +-
 .../core/metadata/schema/table/TableSchema.java | 16 ++--
 datamap/lucene/pom.xml                          |  5 ++
 .../lucene/LuceneDataMapFactoryBase.java        | 89 ++++++++++++++------
 .../lucene/LuceneFineGrainDataMapSuite.scala    | 58 ++++++++++++-
 .../preaggregate/TestPreAggCreateCommand.scala  | 75 ++++-------------
 .../preaggregate/TestPreAggregateLoad.scala     |  7 +-
 .../timeseries/TestTimeseriesDataLoad.scala     |  6 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   |  4 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |  1 +
 .../testsuite/datamap/FGDataMapTestCase.scala   |  4 +-
 .../iud/InsertOverwriteConcurrentTest.scala     |  0
 .../TestInsertAndOtherCommandConcurrent.scala   | 38 ++++-----
 .../carbondata/spark/load/ValidateUtil.scala    |  0
 .../carbondata/spark/util/DataLoadingUtil.scala |  0
 .../carbondata/datamap/DataMapManager.java      |  4 +-
 .../datamap/IndexDataMapProvider.java           |  6 +-
 .../datamap/PreAggregateDataMapProvider.java    |  2 +-
 .../datamap/TimeseriesDataMapProvider.java      |  4 +-
 .../datamap/CarbonCreateDataMapCommand.scala    | 21 +++--
 .../datamap/CarbonDataMapShowCommand.scala      |  2 +-
 .../preaaggregate/PreAggregateListeners.scala   |  9 +-
 .../command/timeseries/TimeSeriesUtil.scala     | 16 ++--
 .../loading/model/CarbonLoadModelBuilder.java   | 16 ++--
 .../processing/loading/model/LoadOption.java    | 23 ++++-
 33 files changed, 311 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index ab339e8..a8d467f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
@@ -89,8 +88,7 @@ public final class DataMapStoreManager {
     List<TableDataMap> dataMaps = new ArrayList<>();
     if (dataMapSchemaList != null) {
       for (DataMapSchema dataMapSchema : dataMapSchemaList) {
-        if (!dataMapSchema.getClassName().equalsIgnoreCase(
-            DataMapProvider.PREAGGREGATE.toString())) {
+        if (dataMapSchema.isIndexDataMap()) {
           dataMaps.add(getDataMap(carbonTable.getAbsoluteTableIdentifier(), dataMapSchema));
         }
       }
@@ -144,26 +142,28 @@ public final class DataMapStoreManager {
    * Return a new datamap instance and registered in the store manager.
    * The datamap is created using datamap name, datamap factory class and table identifier.
    */
-  private TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
+  // TODO: make it private
+  public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
       DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException {
     DataMapFactory dataMapFactory;
     try {
       // try to create datamap by reflection to test whether it is a valid DataMapFactory class
       Class<? extends DataMapFactory> factoryClass =
-          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getClassName());
+          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
       dataMapFactory = factoryClass.newInstance();
     } catch (ClassNotFoundException e) {
       throw new MalformedDataMapCommandException(
-          "DataMap '" + dataMapSchema.getClassName() + "' not found");
+          "DataMap '" + dataMapSchema.getProviderName() + "' not found");
     } catch (Throwable e) {
       throw new MetadataProcessException(
-          "failed to create DataMap '" + dataMapSchema.getClassName() + "'", e);
+          "failed to create DataMap '" + dataMapSchema.getProviderName() + "'", e);
     }
     return registerDataMap(identifier, dataMapSchema, dataMapFactory);
   }
 
   public TableDataMap registerDataMap(AbsoluteTableIdentifier identifier,
-      DataMapSchema dataMapSchema,  DataMapFactory dataMapFactory) throws IOException {
+      DataMapSchema dataMapSchema,  DataMapFactory dataMapFactory)
+      throws IOException, MalformedDataMapCommandException {
     String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(identifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index f2c376a..48038b7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datamap.dev;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
@@ -35,7 +36,8 @@ public interface DataMapFactory<T extends DataMap> {
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) throws IOException;
+  void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema)
+      throws IOException, MalformedDataMapCommandException;
 
   /**
    * Return a new write for this datamap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index a383288..9674958 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -156,7 +156,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       Blocklet blocklet) throws IOException {
     String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
-      if (identifier.getIndexFilePath().equals(carbonIndexFileName)) {
+      if (identifier.getIndexFileName().equals(carbonIndexFileName)) {
         DataMap dataMap = cache.get(identifier);
         return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index a15804e..8d19431 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -346,7 +346,7 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         thriftChildSchema.setChildTableIdentifier(relationIdentifier);
       }
       thriftChildSchema.setProperties(wrapperChildSchema.getProperties());
-      thriftChildSchema.setClassName(wrapperChildSchema.getClassName());
+      thriftChildSchema.setClassName(wrapperChildSchema.getProviderName());
       thriftChildSchema.setDataMapName(wrapperChildSchema.getDataMapName());
       if (wrapperChildSchema.getChildSchema() != null) {
         thriftChildSchema.setChildTableSchema(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
index 0052428..39304d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
@@ -49,6 +49,10 @@ public enum DataMapProvider {
     return className;
   }
 
+  public String getShortName() {
+    return shortName;
+  }
+
   private boolean isEqual(String dataMapClass) {
     return (dataMapClass != null &&
         (dataMapClass.equals(className) ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
index 2a16e1f..a43926c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -64,7 +64,7 @@ public class AggregationDataMapSchema extends DataMapSchema {
 
   private Set aggExpToColumnMapping;
 
-  public AggregationDataMapSchema(String dataMapName, String className) {
+  AggregationDataMapSchema(String dataMapName, String className) {
     super(dataMapName, className);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f81f1dd..f14672f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -151,7 +151,7 @@ public class CarbonTable implements Serializable {
     List<DataMapSchema> dataMapSchemas = new ArrayList<>();
     for (DataMapSchema dataMapSchema : tableInfo.getDataMapSchemaList()) {
       DataMapSchema newDataMapSchema = DataMapSchemaFactory.INSTANCE
-          .getDataMapSchema(dataMapSchema.getDataMapName(), dataMapSchema.getClassName());
+          .getDataMapSchema(dataMapSchema.getDataMapName(), dataMapSchema.getProviderName());
       newDataMapSchema.setChildSchema(dataMapSchema.getChildSchema());
       newDataMapSchema.setProperties(dataMapSchema.getProperties());
       newDataMapSchema.setRelationIdentifier(dataMapSchema.getRelationIdentifier());
@@ -641,6 +641,10 @@ public class CarbonTable implements Serializable {
     return null != partitionInfo && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE;
   }
 
+  public PartitionInfo getPartitionInfo() {
+    return tablePartitionMap.get(getTableName());
+  }
+
   /**
    * @return absolute table identifier
    */
@@ -798,6 +802,16 @@ public class CarbonTable implements Serializable {
     return hasDataMapSchema;
   }
 
+  public DataMapSchema getDataMapSchema(String dataMapName) {
+    List<DataMapSchema> dataMaps = tableInfo.getDataMapSchemaList();
+    for (DataMapSchema dataMap : dataMaps) {
+      if (dataMap.getDataMapName().equalsIgnoreCase(dataMapName)) {
+        return dataMap;
+      }
+    }
+    return null;
+  }
+
   public boolean isChildDataMap() {
     return null != tableInfo.getParentRelationIdentifiers() &&
         !tableInfo.getParentRelationIdentifiers().isEmpty();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index ae49467..877fab7 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -23,6 +23,8 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider;
+
 /**
  * Child schema class to maintain the child table details inside parent table
  */
@@ -32,10 +34,18 @@ public class DataMapSchema implements Serializable, Writable {
 
   protected String dataMapName;
 
-  // this name can be class name of the DataMapProvider implementation or short name of it
-  private String className;
+  /**
+   * There are two kind of DataMaps:
+   * 1. Index DataMap: provider name is class name of implementation class of DataMapFactory
+   * 2. OLAP DataMap: provider name is one of the {@link DataMapProvider#shortName}
+   */
+  private String providerName;
+
+  /**
+   * identifier of the parent table
+   */
+  private RelationIdentifier relationIdentifier;
 
-  protected RelationIdentifier relationIdentifier;
   /**
    * child table schema
    */
@@ -46,16 +56,19 @@ public class DataMapSchema implements Serializable, Writable {
    */
   protected Map<String, String> properties;
 
+  /**
+   * WARN: This constructor should be used by deserialization only
+   */
   public DataMapSchema() {
   }
 
-  public DataMapSchema(String dataMapName, String className) {
+  public DataMapSchema(String dataMapName, String providerName) {
     this.dataMapName = dataMapName;
-    this.className = className;
+    this.providerName = providerName;
   }
 
-  public String getClassName() {
-    return className;
+  public String getProviderName() {
+    return providerName;
   }
 
   public TableSchema getChildSchema() {
@@ -86,9 +99,22 @@ public class DataMapSchema implements Serializable, Writable {
     this.properties = properties;
   }
 
+  /**
+   * Return true if this datamap is an Index DataMap
+   * @return
+   */
+  public boolean isIndexDataMap() {
+    if (providerName.equalsIgnoreCase(DataMapProvider.PREAGGREGATE.getShortName()) ||
+        providerName.equalsIgnoreCase(DataMapProvider.TIMESERIES.getShortName())) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
   @Override public void write(DataOutput out) throws IOException {
     out.writeUTF(dataMapName);
-    out.writeUTF(className);
+    out.writeUTF(providerName);
     boolean isRelationIdentifierExists = null != relationIdentifier;
     out.writeBoolean(isRelationIdentifierExists);
     if (isRelationIdentifierExists) {
@@ -112,7 +138,7 @@ public class DataMapSchema implements Serializable, Writable {
 
   @Override public void readFields(DataInput in) throws IOException {
     this.dataMapName = in.readUTF();
-    this.className = in.readUTF();
+    this.providerName = in.readUTF();
     boolean isRelationIdnentifierExists = in.readBoolean();
     if (isRelationIdnentifierExists) {
       this.relationIdentifier = new RelationIdentifier(null, null, null);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 0d796c7..47e09d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -275,7 +275,7 @@ public class TableInfo implements Serializable, Writable {
         DataMapSchema childSchema = new DataMapSchema();
         childSchema.readFields(in);
         DataMapSchema dataMapSchema = DataMapSchemaFactory.INSTANCE
-            .getDataMapSchema(childSchema.getDataMapName(), childSchema.getClassName());
+            .getDataMapSchema(childSchema.getDataMapName(), childSchema.getProviderName());
         dataMapSchema.setChildSchema(childSchema.getChildSchema());
         dataMapSchema.setRelationIdentifier(childSchema.getRelationIdentifier());
         dataMapSchema.setProperties(childSchema.getProperties());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index 5d79abc..f008821 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -278,13 +278,15 @@ public class TableSchema implements Serializable, Writable {
     RelationIdentifier relationIdentifier =
         new RelationIdentifier(databaseName, tableName, tableId);
     Map<String, String> properties = new HashMap<>();
-    properties.put("CHILD_SELECT QUERY",
-        CarbonUtil.encodeToString(
-            queryString.trim().getBytes(
-                // replace = to with & as hive metastore does not allow = inside. For base 64
-                // only = is allowed as special character , so replace with &
-                CarbonCommonConstants.DEFAULT_CHARSET)).replace("=","&"));
-    properties.put("QUERYTYPE", queryType);
+    if (queryString != null) {
+      properties.put(
+          "CHILD_SELECT QUERY",
+          CarbonUtil.encodeToString(queryString.trim().getBytes(
+              // replace = to with & as hive metastore does not allow = inside. For base 64
+              // only = is allowed as special character , so replace with &
+              CarbonCommonConstants.DEFAULT_CHARSET)).replace("=", "&"));
+      properties.put("QUERYTYPE", queryType);
+    }
     DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className);
     dataMapSchema.setProperties(properties);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/datamap/lucene/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/lucene/pom.xml b/datamap/lucene/pom.xml
index ee504c6..4019065 100644
--- a/datamap/lucene/pom.xml
+++ b/datamap/lucene/pom.xml
@@ -26,6 +26,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.3.2</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.lucene</groupId>
       <artifactId>lucene-core</artifactId>
       <version>${lucene.version}</version>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 88e70d5..ab4e9ee 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Objects;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
@@ -33,14 +34,15 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 
@@ -50,6 +52,8 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer;
 @InterfaceAudience.Internal
 abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFactory<T> {
 
+  static final String TEXT_COLUMNS = "text_columns";
+
   /**
    * Logger
    */
@@ -77,7 +81,7 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
 
   @Override
   public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema)
-      throws IOException {
+      throws IOException, MalformedDataMapCommandException {
     Objects.requireNonNull(identifier);
     Objects.requireNonNull(dataMapSchema);
 
@@ -98,34 +102,17 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
       throw new IOException(errorMessage);
     }
 
-    TableInfo tableInfo = carbonTable.getTableInfo();
-    List<ColumnSchema> lstCoumnSchemas = tableInfo.getFactTable().getListOfColumns();
-
-    // currently add all columns into lucene indexer
-    // TODO:only add index columns
-    List<String> indexedColumns = new ArrayList<String>();
-    for (ColumnSchema columnSchema : lstCoumnSchemas) {
-      if (!columnSchema.isInvisible()) {
-        indexedColumns.add(columnSchema.getColumnName());
-      }
-    }
-
-    // get indexed columns
-    //    Map<String, String> properties = dataMapSchema.getProperties();
-    //    String columns = properties.get("text_column");
-    //    if (columns != null) {
-    //      String[] columnArray = columns.split(CarbonCommonConstants.COMMA, -1);
-    //      Collections.addAll(indexedColumns, columnArray);
-    //    }
+    // validate DataMapSchema and get index columns
+    List<String> indexedColumns =  validateAndGetIndexedColumns(dataMapSchema, carbonTable);
 
     // add optimizedOperations
     List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
-    //    optimizedOperations.add(ExpressionType.EQUALS);
-    //    optimizedOperations.add(ExpressionType.GREATERTHAN);
-    //    optimizedOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
-    //    optimizedOperations.add(ExpressionType.LESSTHAN);
-    //    optimizedOperations.add(ExpressionType.LESSTHAN_EQUALTO);
-    //    optimizedOperations.add(ExpressionType.NOT);
+    // optimizedOperations.add(ExpressionType.EQUALS);
+    // optimizedOperations.add(ExpressionType.GREATERTHAN);
+    // optimizedOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
+    // optimizedOperations.add(ExpressionType.LESSTHAN);
+    // optimizedOperations.add(ExpressionType.LESSTHAN_EQUALTO);
+    // optimizedOperations.add(ExpressionType.NOT);
     optimizedOperations.add(ExpressionType.TEXT_MATCH);
     this.dataMapMeta = new DataMapMeta(indexedColumns, optimizedOperations);
 
@@ -135,6 +122,52 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
   }
 
   /**
+   * validate Lucene DataMap
+   * 1. require TEXT_COLUMNS property
+   * 2. TEXT_COLUMNS can't contains illegal argument(empty, blank)
+   * 3. TEXT_COLUMNS can't contains duplicate same columns
+   * 4. TEXT_COLUMNS should be exists in table columns
+   * 5. TEXT_COLUMNS support only String DataType columns
+   */
+  private List<String> validateAndGetIndexedColumns(DataMapSchema dataMapSchema,
+      CarbonTable carbonTable) throws MalformedDataMapCommandException {
+    String textColumnsStr = dataMapSchema.getProperties().get(TEXT_COLUMNS);
+    if (textColumnsStr == null || StringUtils.isBlank(textColumnsStr)) {
+      throw new MalformedDataMapCommandException(
+          "Lucene DataMap require proper TEXT_COLUMNS property.");
+    }
+    String[] textColumns = textColumnsStr.split(",", -1);
+    for (int i = 0; i < textColumns.length; i++) {
+      textColumns[i] = textColumns[i].trim().toLowerCase();
+    }
+    for (int i = 0; i < textColumns.length; i++) {
+      if (textColumns[i].isEmpty()) {
+        throw new MalformedDataMapCommandException("TEXT_COLUMNS contains illegal argument.");
+      }
+      for (int j = i + 1; j < textColumns.length; j++) {
+        if (textColumns[i].equals(textColumns[j])) {
+          throw new MalformedDataMapCommandException(
+              "TEXT_COLUMNS has duplicate columns :" + textColumns[i]);
+        }
+      }
+    }
+    List<String> textColumnList = new ArrayList<String>(textColumns.length);
+    for (int i = 0; i < textColumns.length; i++) {
+      CarbonColumn column = carbonTable.getColumnByName(carbonTable.getTableName(), textColumns[i]);
+      if (null == column) {
+        throw new MalformedDataMapCommandException("TEXT_COLUMNS: " + textColumns[i]
+            + " does not exist in table. Please check create DataMap statement.");
+      } else if (column.getDataType() != DataTypes.STRING) {
+        throw new MalformedDataMapCommandException(
+            "TEXT_COLUMNS only supports String column. " + "Unsupported column: " + textColumns[i]
+                + ", DataType: " + column.getDataType());
+      }
+      textColumnList.add(column.getColName());
+    }
+    return textColumnList;
+  }
+
+  /**
    * Return a new write for this datamap
    */
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 5e28e8a..bfcfa67 100644
--- a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -24,7 +24,7 @@ import scala.util.Random
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 
 class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
 
@@ -42,9 +42,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
-  }
 
-  test("test lucene fine grain data map") {
     sql("DROP TABLE IF EXISTS datamap_test")
     sql(
       """
@@ -52,11 +50,65 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
         | STORED BY 'carbondata'
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
+  }
+
+  test("validate TEXT_COLUMNS DataMap property") {
+    // require TEXT_COLUMNS
+    var exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+      """.stripMargin))
+
+    assertResult("Lucene DataMap require proper TEXT_COLUMNS property.")(exception.getMessage)
+
+    // illegal argumnet.
+    exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+         | DMProperties('text_COLUMNS'='name, ')
+      """.stripMargin))
+
+    assertResult("TEXT_COLUMNS contains illegal argument.")(exception.getMessage)
+
+    // not exists
+    exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+         | DMProperties('text_COLUMNS'='city,school')
+    """.stripMargin))
+
+    assertResult("TEXT_COLUMNS: school does not exist in table. Please check create DataMap statement.")(exception.getMessage)
+
+    // duplicate columns
+    exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+         | DMProperties('text_COLUMNS'='name,city,name')
+      """.stripMargin))
 
+    assertResult("TEXT_COLUMNS has duplicate columns :name")(exception.getMessage)
+
+    // only support String DataType
+    exception = intercept[MalformedDataMapCommandException](sql(
+    s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+         | DMProperties('text_COLUMNS'='city,id')
+      """.stripMargin))
+
+    assertResult("TEXT_COLUMNS only supports String column. Unsupported column: id, DataType: INT")(exception.getMessage)
+  }
+
+  test("test lucene fine grain data map") {
     sql(
       s"""
          | CREATE DATAMAP dm ON TABLE datamap_test
          | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+         | DMProperties('TEXT_COLUMNS'='Name , cIty')
       """.stripMargin)
 
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index f208c92..50b8bec 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
+import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, Row}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
@@ -248,9 +248,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
       s"""create datamap preagg_sum on table tbl_1 using 'preaggregate' as select mac,avg(age) from tbl_1 group by mac"""
         .stripMargin)
     sql(
-      "create datamap agg2 on table tbl_1 using 'preaggregate' DMPROPERTIES ('timeseries" +
-      ".eventTime'='prodate', 'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select prodate," +
-      "mac from tbl_1 group by prodate,mac")
+      "create datamap agg2 on table tbl_1 using 'preaggregate' as select prodate, mac from tbl_1 group by prodate,mac")
     checkExistence(sql("show tables"), false, "tbl_1_preagg_sum","tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT)
   }
@@ -303,8 +301,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
           | GROUP BY column3,column5,column2
         """.stripMargin)
     }
-    assert(e.getMessage.contains(
-      s"Unknown datamap provider/class abc"))
+    assert(e.getMessage.contains(s"DataMap 'abc' not found"))
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 
@@ -324,66 +321,26 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS tbl_1")
     sql("create table if not exists  tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ")
     sql(
-      "create datamap agg2 on table tbl_1 using 'preaggregate' DMPROPERTIES ('timeseries" +
-      ".eventTime'='prodate', 'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select prodate," +
-      "mac from tbl_1 group by prodate,mac")
+      "create datamap agg2 on table tbl_1 using 'preaggregate' as select prodate, mac from tbl_1 group by prodate,mac")
     checkExistence(sql("show tables"), false, "tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year")
   }
 
   test("test pre agg create table 21: should support 'if not exists'") {
-    try {
-      sql(
-        """
-          | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable
-          | USING 'preaggregate'
-          | AS SELECT
-          |   column3,
-          |   sum(column3),
-          |   column5,
-          |   sum(column5)
-          | FROM maintable
-          | GROUP BY column3,column5,column2
-        """.stripMargin)
-
-      sql(
-        """
-          | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable
-          | USING 'preaggregate'
-          | AS SELECT
-          |   column3,
-          |   sum(column3),
-          |   column5,
-          |   sum(column5)
-          | FROM maintable
-          | GROUP BY column3,column5,column2
-        """.stripMargin)
-      assert(true)
-    } catch {
-      case _: Exception =>
-        assert(false)
-    }
+    sql(
+      """
+        | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable
+        | USING 'preaggregate'
+        | AS SELECT
+        |   column3,
+        |   sum(column3),
+        |   column5,
+        |   sum(column5)
+        | FROM maintable
+        | GROUP BY column3,column5,column2
+      """.stripMargin)
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 
-  test("test pre agg create table 22: don't support 'create datamap if exists'") {
-    val e: Exception = intercept[AnalysisException] {
-      sql(
-        """
-          | CREATE DATAMAP IF EXISTS agg0 ON TABLE mainTable
-          | USING 'preaggregate'
-          | AS SELECT
-          |   column3,
-          |   sum(column3),
-          |   column5,
-          |   sum(column5)
-          | FROM maintable
-          | GROUP BY column3,column5,column2
-        """.stripMargin)
-      assert(true)
-    }
-    assert(e.getMessage.contains("identifier matching regex"))
-    sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
-  }
 
   test("test show tables filterted with datamaps") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,"false")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index da1ffb5..9ab6db8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -21,10 +21,13 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil4Test
 import org.scalatest.{BeforeAndAfterAll, Ignore}
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+
 class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
 
   val testData = s"$resourcesPath/sample.csv"
@@ -389,7 +392,7 @@ test("check load and select for avg double datatype") {
          | group by id
        """.stripMargin)
 
-    val e: Exception = intercept[TableAlreadyExistsException] {
+    val e: Exception = intercept[MalformedDataMapCommandException] {
       sql(
         s"""
            | create datamap preagg_sum
@@ -399,7 +402,7 @@ test("check load and select for avg double datatype") {
            | group by id
        """.stripMargin)
     }
-    assert(e.getMessage.contains("already exists in database"))
+    assert(e.getMessage.contains("DataMap name 'preagg_sum' already exist"))
     checkAnswer(sql(s"select * from maintable_preagg_sum"),
       Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
     sql("drop table if exists maintable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
index b43b93b..c5f07a0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil4Test
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
 import org.apache.carbondata.core.util.CarbonProperties
@@ -270,7 +271,7 @@ class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
         Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40),
         Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50),
         Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50)))
-    val e: Exception = intercept[TableAlreadyExistsException] {
+    val e: Exception = intercept[MalformedDataMapCommandException] {
       sql(
         s"""
            | CREATE DATAMAP agg0_second ON TABLE mainTable
@@ -282,7 +283,8 @@ class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
            | GROUP BY mytime
         """.stripMargin)
     }
-    assert(e.getMessage.contains("already exists in database"))
+    assert(e.getMessage.contains("DataMap name 'agg0_second' already exist"))
+    sql("DROP DATAMAP agg0_second ON TABLE mainTable")
   }
 
   test("create datamap with 'if not exists' after load data into mainTable and create datamap") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 3cba3ff..b19c80d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -350,7 +350,9 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
     // register datamap writer
-    sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+    sql(s"create datamap cgdatamap on table datamap_test_cg " +
+        s"using '${classOf[CGDataMapFactory].getName}' " +
+        s"DMPROPERTIES('indexcolumns'='name')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
     checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
       sql("select * from normal_test where name='n502670'"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index d5cae15..73e9fd9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, Coa
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index fae87a5..3bfc7b9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -175,7 +175,7 @@ class FGDataMap extends FineGrainDataMap {
   override def prune(
       filterExp: FilterResolverIntf,
       segmentProperties: SegmentProperties,
-      partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
+      partitions: java.util.List[PartitionSpec]): java.util.List[FineGrainBlocklet] = {
     val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
     val expression = filterExp.getFilterExpression
     getEqualToExpression(expression, buffer)
@@ -189,7 +189,7 @@ class FGDataMap extends FineGrainDataMap {
   }
 
   private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
-      value: Array[Byte]): Option[Blocklet] = {
+      value: Array[Byte]): Option[FineGrainBlocklet] = {
     val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
     val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
     val obj = new ObjectInputStream(outputStream)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 3f0ca42..1f4b7fe 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -28,18 +28,16 @@ import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory}
+import org.apache.carbondata.core.datamap.dev.DataMapWriter
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
 
 // This testsuite test insert and insert overwrite with other commands concurrently
 class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
@@ -50,11 +48,12 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA
     dropTable()
     buildTestData()
 
-    // register hook to the table to sleep, thus the other command will be executed
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"),
-      classOf[WaitingDataMap].getName,
-      "test")
+    sql(
+      s"""
+         | create datamap test on table orders
+         | using '${classOf[WaitingDataMap].getName}'
+         | as select count(a) from hiveMetaStoreTable_1")
+       """.stripMargin)
   }
 
   private def buildTestData(): Unit = {
@@ -267,26 +266,22 @@ object Global {
   var overwriteRunning = false
 }
 
-class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
+class WaitingDataMap() extends CoarseGrainDataMapFactory {
 
   private var identifier: AbsoluteTableIdentifier = _
 
-  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = {
-    this.identifier = identifier
-  }
-
   override def fireEvent(event: Event): Unit = ???
 
   override def clear(segmentId: Segment): Unit = {}
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
 
-  override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
+  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
 
-  override def createWriter(segment: Segment, writeDirectoryPath: String): AbstractDataMapWriter = {
-    new AbstractDataMapWriter(identifier, segment, writeDirectoryPath) {
+  override def createWriter(segment: Segment, writeDirectoryPath: String): DataMapWriter = {
+    new DataMapWriter(identifier, segment, writeDirectoryPath) {
       override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { }
 
       override def onBlockletEnd(blockletId: Int): Unit = { }
@@ -312,4 +307,9 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
   override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, Seq(ExpressionType.EQUALS).asJava)
 
   override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ???
+
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = identifier
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
index 2b3a306..b23d676 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
@@ -40,9 +40,9 @@ public class DataMapManager {
    */
   public DataMapProvider getDataMapProvider(DataMapSchema dataMapSchema) {
     DataMapProvider provider;
-    if (dataMapSchema.getClassName().equalsIgnoreCase(PREAGGREGATE.toString())) {
+    if (dataMapSchema.getProviderName().equalsIgnoreCase(PREAGGREGATE.toString())) {
       provider = new PreAggregateDataMapProvider();
-    } else if (dataMapSchema.getClassName().equalsIgnoreCase(TIMESERIES.toString())) {
+    } else if (dataMapSchema.getProviderName().equalsIgnoreCase(TIMESERIES.toString())) {
       provider = new TimeseriesDataMapProvider();
     } else {
       provider = new IndexDataMapProvider();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index c7651bb..1f075de 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -81,14 +81,14 @@ public class IndexDataMapProvider implements DataMapProvider {
     try {
       // try to create DataMapProvider instance by taking providerName as class name
       Class<? extends DataMapFactory> providerClass =
-          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getClassName());
+          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
       dataMapFactory = providerClass.newInstance();
     } catch (ClassNotFoundException e) {
       // try to create DataMapProvider instance by taking providerName as short name
-      dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getClassName());
+      dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getProviderName());
     } catch (Throwable e) {
       throw new MetadataProcessException(
-          "failed to create DataMapProvider '" + dataMapSchema.getClassName() + "'", e);
+          "failed to create DataMapProvider '" + dataMapSchema.getProviderName() + "'", e);
     }
     return dataMapFactory;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
index dc53900..c33354e 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -37,7 +37,7 @@ public class PreAggregateDataMapProvider implements DataMapProvider {
       SparkSession sparkSession) throws MalformedDataMapCommandException {
     validateDmProperty(dataMapSchema);
     helper = new PreAggregateTableHelper(
-        mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(),
+        mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getProviderName(),
         dataMapSchema.getProperties(), ctasSqlStatement, null, false);
     helper.initMeta(sparkSession);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
index a66f26a..f1575cd 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
@@ -36,13 +36,13 @@ public class TimeseriesDataMapProvider extends PreAggregateDataMapProvider {
   public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
       SparkSession sparkSession) {
     Map<String, String> dmProperties = dataMapSchema.getProperties();
-    String dmProviderName = dataMapSchema.getClassName();
+    String dmProviderName = dataMapSchema.getProviderName();
     TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmProviderName);
     Tuple2<String, String> details =
         TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, dmProviderName);
     dmProperties.remove(details._1());
     helper = new PreAggregateTableHelper(
-        mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(),
+        mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getProviderName(),
         dmProperties, ctasSqlStatement, new Some(details._1()), false);
     helper.initMeta(sparkSession);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 8532e9d..8c475d4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -53,9 +53,17 @@ case class CarbonCreateDataMapCommand(
       throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
     }
 
-    validateDataMapName(mainTable)
+    if (mainTable.getDataMapSchema(dataMapName) != null) {
+      if (!ifNotExistsSet) {
+        throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")
+      } else {
+        return Seq.empty
+      }
+    }
+
     dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
-    dataMapSchema.setProperties(new java.util.HashMap[String, String](dmProperties.asJava))
+    dataMapSchema.setProperties(new java.util.HashMap[String, String](
+      dmProperties.map(x => (x._1.trim, x._2.trim)).asJava))
     dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema)
     dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull, sparkSession)
 
@@ -64,15 +72,6 @@ case class CarbonCreateDataMapCommand(
     Seq.empty
   }
 
-  private def validateDataMapName(carbonTable: CarbonTable): Unit = {
-    val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList
-    existingDataMaps.asScala.foreach { dataMapSchema =>
-      if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) {
-        throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")
-      }
-    }
-  }
-
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dataMapProvider != null) {
       dataMapProvider.initData(mainTable, sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index fdd17d9..443ff70 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -51,7 +51,7 @@ case class CarbonDataMapShowCommand(
         if (relationIdentifier != null) {
           table = relationIdentifier.getDatabaseName + "." + relationIdentifier.getTableName
         }
-        Row(s.getDataMapName, s.getClassName, table)
+        Row(s.getDataMapName, s.getProviderName, table)
       }
     } else {
       Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 11f451b..a516d1b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -271,21 +271,22 @@ object CommitPreAggregateListener extends OperationEventListener {
       operationContext.getProperty("isCompaction")).getOrElse("false").toString.toBoolean
     val dataMapSchemas =
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getDataMapSchemaList
+      .asScala.filter(_.getChildSchema != null)
     // extract all child LoadCommands
     val childLoadCommands = if (!isCompactionFlow) {
       // If not compaction flow then the key for load commands will be tableName
-        dataMapSchemas.asScala.map { dataMapSchema =>
+        dataMapSchemas.map { dataMapSchema =>
           operationContext.getProperty(dataMapSchema.getChildSchema.getTableName)
             .asInstanceOf[CarbonLoadDataCommand]
         }
       } else {
       // If not compaction flow then the key for load commands will be tableName_Compaction
-        dataMapSchemas.asScala.map { dataMapSchema =>
+        dataMapSchemas.map { dataMapSchema =>
           operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction")
             .asInstanceOf[CarbonLoadDataCommand]
         }
       }
-     if (dataMapSchemas.size() > 0) {
+     if (dataMapSchemas.nonEmpty) {
        val uuid = operationContext.getProperty("uuid").toString
       // keep committing until one fails
       val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
@@ -299,7 +300,7 @@ object CommitPreAggregateListener extends OperationEventListener {
         renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
       }
       // if true then the commit for one of the child tables has failed
-      val commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.size()) != 0
+      val commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.length) != 0
       if (commitFailed) {
         LOGGER.warn("Reverting table status file to original state")
         renamedDataMaps.foreach {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 6fd0820..5c23805 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.execution.command.timeseries
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.execution.command.{DataMapField, Field}
 
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
@@ -172,14 +174,14 @@ object TimeSeriesUtil {
    * @param fieldMapping     fields from select plan
    * @param timeSeriesColumn timeseries column name
    */
-  def validateEventTimeColumnExitsInSelect(fieldMapping: scala.collection.mutable
-  .LinkedHashMap[Field, DataMapField],
+  def validateEventTimeColumnExitsInSelect(
+      fieldMapping: mutable.LinkedHashMap[Field, DataMapField],
       timeSeriesColumn: String) : Any = {
-    val isTimeSeriesColumnExits = fieldMapping
-      .exists(obj => obj._2.columnTableRelationList.isDefined &&
-                     obj._2.columnTableRelationList.get(0).parentColumnName
-                       .equalsIgnoreCase(timeSeriesColumn) &&
-                     obj._2.aggregateFunction.isEmpty)
+    val isTimeSeriesColumnExits = fieldMapping.exists { case (_, f) =>
+      f.columnTableRelationList.isDefined &&
+      f.columnTableRelationList.get.head.parentColumnName.equalsIgnoreCase(timeSeriesColumn) &&
+      f.aggregateFunction.isEmpty
+    }
     if(!isTimeSeriesColumnExits) {
       throw new MalformedCarbonCommandException(s"Time series column ${ timeSeriesColumn } does " +
                                                 s"not exists in select")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 29dfa40..7871643 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -148,11 +148,12 @@ public class CarbonLoadModelBuilder {
 
     if (Boolean.parseBoolean(bad_records_logger_enable) ||
         LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
-      bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path);
       if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
         throw new InvalidLoadOptionException("Invalid bad records location.");
       }
+      bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path);
     }
+
     carbonLoadModel.setBadRecordsLocation(bad_record_path);
 
     validateGlobalSortPartitions(global_sort_partitions);
@@ -226,10 +227,8 @@ public class CarbonLoadModelBuilder {
         delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
       throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same");
     } else {
-      carbonLoadModel.setComplexDelimiterLevel1(
-          CarbonUtil.delimiterConverter(complex_delimeter_level1));
-      carbonLoadModel.setComplexDelimiterLevel2(
-          CarbonUtil.delimiterConverter(complex_delimeter_level2));
+      carbonLoadModel.setComplexDelimiterLevel1(complex_delimeter_level1);
+      carbonLoadModel.setComplexDelimiterLevel2(complex_delimeter_level2);
     }
     // set local dictionary path, and dictionary file extension
     carbonLoadModel.setAllDictPath(all_dictionary_path);
@@ -320,10 +319,13 @@ public class CarbonLoadModelBuilder {
 
   private void validateSortScope(String sortScope) throws InvalidLoadOptionException {
     if (sortScope != null) {
-      // Don't support use global sort on partitioned table.
+      // We support global sort for Hive standard partition, but don't support
+      // global sort for other partition type.
       if (table.getPartitionInfo(table.getTableName()) != null &&
+          !table.isHivePartitionTable() &&
           sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString())) {
-        throw new InvalidLoadOptionException("Don't support use global sort on partitioned table.");
+        throw new InvalidLoadOptionException("Don't support use global sort on "
+            + table.getPartitionInfo().getPartitionType() +  " partition table.");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d23f7fad/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index bac1a94..608d147 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.loading.model;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -112,11 +113,11 @@ public class LoadOption {
 
     optionsFinal.put(
         "complex_delimiter_level_1",
-        Maps.getOrDefault(options,"complex_delimiter_level_1", "\\$"));
+        Maps.getOrDefault(options,"complex_delimiter_level_1", "$"));
 
     optionsFinal.put(
         "complex_delimiter_level_2",
-        Maps.getOrDefault(options, "complex_delimiter_level_2", "\\:"));
+        Maps.getOrDefault(options, "complex_delimiter_level_2", ":"));
 
     optionsFinal.put(
         "dateformat",
@@ -259,7 +260,23 @@ public class LoadOption {
                 + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile));
       }
     }
-    return csvColumns;
+
+    // In case of static partition columns just change the name of header if already exists as
+    // we should not take the column from csv file and add them as new columns at the end.
+    if (staticPartitionCols.size() > 0) {
+      List<String> updatedColumns = new ArrayList<>();
+      for (int i = 0; i < csvColumns.length; i++) {
+        if (staticPartitionCols.contains(csvColumns[i])) {
+          updatedColumns.add(csvColumns[i] + "1");
+        } else {
+          updatedColumns.add(csvColumns[i]);
+        }
+      }
+      updatedColumns.addAll(staticPartitionCols);
+      return updatedColumns.toArray(new String[updatedColumns.size()]);
+    } else {
+      return csvColumns;
+    }
   }
 
 }


Mime
View raw message