carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-2347][LUCENE]change datamap factory interface to check supported features
Date Tue, 01 May 2018 07:07:43 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 161347155 -> 5229443bd


[CARBONDATA-2347][LUCENE]change datamap factory interface to check supported features

Added new method to interface which will decide where the table operation present in list of table operations is allowed on the datamap or not.

This closes #2202


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5229443b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5229443b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5229443b

Branch: refs/heads/master
Commit: 5229443bd538185382661e5dd83b877e89677281
Parents: 1613471
Author: akashrn5 <akashnilugal@gmail.com>
Authored: Sat Apr 21 15:30:55 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue May 1 15:07:34 2018 +0800

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       | 28 ++++---
 .../core/datamap/dev/DataMapFactory.java        |  6 ++
 .../core/features/TableOperation.java           | 32 ++++++++
 .../blockletindex/BlockletDataMapFactory.java   |  6 ++
 .../core/metadata/schema/table/CarbonTable.java | 36 +++++++++
 .../apache/carbondata/core/util/CarbonUtil.java |  1 -
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  6 ++
 .../examples/MinMaxIndexDataMapFactory.java     |  6 +-
 .../lucene/LuceneCoarseGrainDataMapFactory.java |  9 ++-
 .../lucene/LuceneDataMapFactoryBase.java        | 49 +++++++++++-
 .../datamap/lucene/LuceneDataMapWriter.java     | 83 +++++++++-----------
 .../datamap/lucene/LuceneFineGrainDataMap.java  |  3 +-
 .../lucene/LuceneFineGrainDataMapFactory.java   | 23 ++++++
 .../lucene/LuceneFineGrainDataMapSuite.scala    | 79 ++++++++++++++++++-
 .../testsuite/datamap/CGDataMapTestCase.scala   |  8 ++
 .../testsuite/datamap/DataMapWriterSuite.scala  |  8 ++
 .../testsuite/datamap/FGDataMapTestCase.scala   |  8 ++
 .../testsuite/datamap/TestDataMapStatus.scala   |  8 ++
 .../TestInsertAndOtherCommandConcurrent.scala   |  8 ++
 .../datamap/CarbonCreateDataMapCommand.scala    | 19 ++---
 .../datamap/CarbonDropDataMapCommand.scala      |  8 ++
 .../CarbonProjectForDeleteCommand.scala         |  7 ++
 .../CarbonProjectForUpdateCommand.scala         |  7 ++
 .../CarbonAlterTableAddColumnCommand.scala      |  6 ++
 .../CarbonAlterTableDataTypeChangeCommand.scala |  6 ++
 .../CarbonAlterTableDropColumnCommand.scala     |  8 ++
 .../schema/CarbonAlterTableRenameCommand.scala  |  5 ++
 .../sql/execution/strategy/DDLStrategy.scala    |  6 ++
 28 files changed, 404 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 0ea601b..a871d57 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -262,26 +262,36 @@ public final class DataMapStoreManager {
   }
 
   /**
-   * Return a new datamap instance and registered in the store manager.
-   * The datamap is created using datamap name, datamap factory class and table identifier.
+   * Return a new datamap instance for the given
+   * @param dataMapSchema
+   * @return
+   * @throws MalformedDataMapCommandException
    */
-  // TODO: make it private
-  public TableDataMap createAndRegisterDataMap(CarbonTable table,
-      DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException {
+  public DataMapFactory getDataMapFactoryClass(DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
     DataMapFactory dataMapFactory;
     try {
       // try to create datamap by reflection to test whether it is a valid DataMapFactory class
       Class<? extends DataMapFactory> factoryClass =
           (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
-      dataMapFactory = factoryClass.newInstance();
+      return factoryClass.newInstance();
     } catch (ClassNotFoundException e) {
       // try to create DataMapClassProvider instance by taking providerName as short name
-      dataMapFactory =
-          IndexDataMapProvider.getDataMapFactoryByShortName(dataMapSchema.getProviderName());
+      return IndexDataMapProvider.getDataMapFactoryByShortName(dataMapSchema.getProviderName());
     } catch (Throwable e) {
       throw new MetadataProcessException(
-          "failed to create DataMap '" + dataMapSchema.getProviderName() + "'", e);
+          "failed to get DataMap factory for'" + dataMapSchema.getProviderName() + "'", e);
     }
+  }
+
+  /**
+   * registered in the store manager.
+   * The datamap is created using datamap name, datamap factory class and table identifier.
+   */
+  // TODO: make it private
+  public TableDataMap createAndRegisterDataMap(CarbonTable table,
+      DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException {
+    DataMapFactory dataMapFactory  = getDataMapFactoryClass(dataMapSchema);
     return registerDataMap(table, dataMapSchema, dataMapFactory);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index a315ce6..20bdfb7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.events.Event;
@@ -91,4 +92,9 @@ public interface DataMapFactory<T extends DataMap> {
    * delete datamap data if any
    */
   void deleteDatamapData();
+
+  /**
+   * This function should return true is the input operation enum will make the datamap become stale
+   */
+  boolean willBecomeStale(TableOperation operation);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/core/src/main/java/org/apache/carbondata/core/features/TableOperation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/features/TableOperation.java b/core/src/main/java/org/apache/carbondata/core/features/TableOperation.java
new file mode 100644
index 0000000..3d13901
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/features/TableOperation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.features;
+
+public enum TableOperation {
+  ALTER_RENAME,
+  ALTER_DROP,
+  ALTER_ADD_COLUMN,
+  ALTER_CHANGE_DATATYPE,
+  STREAMING,
+  UPDATE,
+  DELETE,
+  PARTITION;
+
+  TableOperation() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 7c6427d..0d7539c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactor
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -277,4 +278,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     }
     return blocklets;
   }
+
+  @Override public boolean willBecomeStale(TableOperation operation) {
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 7f187b9..9ae5ed4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -27,7 +27,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
@@ -60,6 +66,9 @@ import org.apache.carbondata.format.FileHeader;
  */
 public class CarbonTable implements Serializable {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonTable.class.getName());
+
   /**
    * the cached table info
    */
@@ -950,4 +959,31 @@ public class CarbonTable implements Serializable {
   public void setTransactionalTable(boolean transactionalTable) {
     isTransactionalTable = transactionalTable;
   }
+
+  /**
+   * methods returns true if operation is allowed for the corresponding datamap or not
+   * if this operation makes datamap stale it is not allowed
+   * @param carbonTable
+   * @param operation
+   * @return
+   */
+  public boolean canAllow(CarbonTable carbonTable, TableOperation operation) {
+    try {
+      List<TableDataMap> datamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
+      if (!datamaps.isEmpty()) {
+        for (TableDataMap dataMap : datamaps) {
+          DataMapFactory factoryClass =
+              DataMapStoreManager.getInstance().getDataMapFactoryClass(dataMap.getDataMapSchema());
+          return !factoryClass.willBecomeStale(operation);
+        }
+      }
+    } catch (Exception e) {
+      // since method returns true or false and based on that calling function throws exception, no
+      // need to throw the catched exception
+      LOGGER.error(e.getMessage());
+      return true;
+    }
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 6b4a94a..a5351a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2943,6 +2943,5 @@ public final class CarbonUtil {
     }
     return blockId;
   }
-
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index a2f9693..b76390f 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -229,6 +230,11 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
       LOGGER.error("drop datamap failed, failed to delete datamap directory");
     }
   }
+
+  @Override public boolean willBecomeStale(TableOperation operation) {
+    return false;
+  }
+
   @Override
   public DataMapMeta getMeta() {
     return this.dataMapMeta;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 7e43610..c110887 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -31,9 +31,9 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
+import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -157,4 +157,8 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
   @Override public void deleteDatamapData() {
 
   }
+
+  @Override public boolean willBecomeStale(TableOperation operation) {
+    return false;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index e8c740d..ba38319 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.memory.MemoryException;
 
 /**
@@ -72,8 +73,14 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
     return DataMapLevel.CG;
   }
 
-  @Override public void deleteDatamapData() {
+  @Override
+  public void deleteDatamapData() {
+
+  }
 
+  @Override
+  public boolean willBecomeStale(TableOperation operation) {
+    return false;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 9727492..1cde0c1 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.datamap.lucene;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
@@ -29,11 +30,14 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -89,6 +93,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
    */
   List<String> indexedCarbonColumns = null;
 
+  CarbonTable carbonTable = null;
+
 
   @Override
   public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
@@ -96,6 +102,7 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
     Objects.requireNonNull(carbonTable.getAbsoluteTableIdentifier());
     Objects.requireNonNull(dataMapSchema);
 
+    this.carbonTable = carbonTable;
     this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier();
     this.dataMapName = dataMapSchema.getDataMapName();
 
@@ -208,11 +215,10 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
   /**
    * Get all distributable objects of a segmentid
    */
-  @Override
-  public List<DataMapDistributable> toDistributable(Segment segment) {
+  @Override public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> lstDataMapDistribute = new ArrayList<>();
-    CarbonFile[] indexDirs = LuceneDataMapWriter
-        .getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName);
+    CarbonFile[] indexDirs =
+        getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo());
     for (CarbonFile indexDir : indexDirs) {
       // Filter out the tasks which are filtered through CG datamap.
       if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
@@ -261,4 +267,39 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
   public DataMapMeta getMeta() {
     return dataMapMeta;
   }
+
+  /**
+   * returns all the directories of lucene index files for query
+   * @param tablePath
+   * @param segmentId
+   * @return
+   */
+  private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
+    List<CarbonFile> indexDirs = new ArrayList<>();
+    List<TableDataMap> dataMaps = new ArrayList<>();
+    try {
+      // there can be multiple lucene datamaps present on a table, so get all datamaps and form
+      // the path till the index file directories in all datamaps folders present in each segment
+      dataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
+    } catch (IOException ex) {
+      LOGGER.error("failed to get datamaps");
+    }
+    if (dataMaps.size() > 0) {
+      for (TableDataMap dataMap : dataMaps) {
+        List<CarbonFile> indexFiles;
+        String dmPath =
+            CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMap
+                .getDataMapSchema().getDataMapName();
+        FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
+        final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
+        indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
+          @Override public boolean accept(CarbonFile file) {
+            return file.isDirectory();
+          }
+        }));
+        indexDirs.addAll(indexFiles);
+      }
+    }
+    return indexDirs.toArray(new CarbonFile[0]);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 4e8adee..95823bb 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -37,6 +36,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
@@ -84,6 +84,8 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   public static final String BLOCKLETID_NAME = "blockletId";
 
+  private String indexShardName = null;
+
   public static final String PAGEID_NAME = "pageId";
 
   public static final String ROWID_NAME = "rowId";
@@ -111,39 +113,46 @@ public class LuceneDataMapWriter extends DataMapWriter {
    * Start of new block notification.
    */
   public void onBlockStart(String blockId, String indexShardName) throws IOException {
-    if (indexWriter != null) {
-      return;
-    }
-    // get index path, put index data into segment's path
-    String strIndexPath = getIndexPath(indexShardName);
-    Path indexPath = FileFactory.getPath(strIndexPath);
-    FileSystem fs = FileFactory.getFileSystem(indexPath);
-
-    // if index path not exists, create it
-    if (!fs.exists(indexPath)) {
-      fs.mkdirs(indexPath);
-    }
+    if (this.indexShardName == null || !this.indexShardName.equals(indexShardName)) {
+      if (indexWriter != null) {
+        return;
+      }
+      // get index path, put index data into segment's path
+      String strIndexPath = getIndexPath(indexShardName);
+      Path indexPath = FileFactory.getPath(strIndexPath);
+      FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+      // if index path not exists, create it
+      if (!fs.exists(indexPath)) {
+        fs.mkdirs(indexPath);
+      }
 
-    if (null == analyzer) {
-      analyzer = new StandardAnalyzer();
-    }
+      if (null == analyzer) {
+        analyzer = new StandardAnalyzer();
+      }
 
-    // create a index writer
-    Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
+      // the indexWriter closes the FileSystem on closing the writer, so for a new configuration
+      // and disable the cache for the index writer, it will be closed on closing the writer
+      Configuration conf = new Configuration();
+      conf.set("fs.hdfs.impl.disable.cache", "true");
+
+      // create a index writer
+      Directory indexDir = new HdfsDirectory(indexPath, conf);
+
+      IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
+      if (CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
+              CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
+          .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) {
+        indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
+      } else {
+        indexWriterConfig
+            .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
+      }
 
-    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
-    if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
-            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
-        .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) {
-      indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
-    } else {
-      indexWriterConfig
-          .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
+      indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
     }
 
-    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
-
   }
 
   /**
@@ -349,20 +358,4 @@ public class LuceneDataMapWriter extends DataMapWriter {
     return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName
         + File.separator + taskName;
   }
-
-  /**
-   * returns all the directories of lucene index files for query
-   * @param tablePath
-   * @param segmentId
-   * @param dataMapName
-   * @return
-   */
-  public static CarbonFile[] getAllIndexDirs(String tablePath, String segmentId,
-      final String dataMapName) {
-    String dmPath =
-        CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
-    FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
-    final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
-    return dirPath.listFiles();
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index a39c7a2..1104d09 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -204,7 +204,8 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
     }
 
     // execute index search
-    TopDocs result;
+    // initialize to null, else ScoreDoc objects will get accumulated in memory
+    TopDocs result = null;
     try {
       result = indexSearcher.search(query, maxDocs);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index 9026fbc..1dae9b5 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
+import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.memory.MemoryException;
 
 /**
@@ -77,4 +78,26 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
     return DataMapLevel.FG;
   }
 
+  @Override public boolean willBecomeStale(TableOperation operation) {
+    switch (operation) {
+      case ALTER_RENAME:
+        return true;
+      case ALTER_DROP:
+        return true;
+      case ALTER_ADD_COLUMN:
+        return true;
+      case ALTER_CHANGE_DATATYPE:
+        return true;
+      case STREAMING:
+        return true;
+      case DELETE:
+        return true;
+      case UPDATE:
+        return true;
+      case PARTITION:
+        return true;
+      default:
+        return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index bd957dc..c157c48 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -251,7 +251,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     }
     assert(exception_duplicate_column.getMessage
-      .contains("Create lucene datamap dm1 failed, datamap already exists on column(s) name"))
+      .contains("datamap already exists on column(s)"))
     sql("drop datamap if exists dm on table datamap_test_table")
   }
 
@@ -621,6 +621,81 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
       CarbonCommonConstants.BLOCKLET_SIZE, CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)
   }
 
+  test("test lucene datamap creation for blocked features") {
+    sql("DROP TABLE IF EXISTS datamap_test7")
+    sql(
+      """
+        | CREATE TABLE datamap_test7(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm124 ON TABLE datamap_test7
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+         | DMProperties('TEXT_COLUMNS'='Name , cIty')
+      """.stripMargin)
+
+    val ex1 = intercept[MalformedCarbonCommandException] {
+      sql("alter table datamap_test7 rename to datamap_test5")
+    }
+    assert(ex1.getMessage.contains("alter rename is not supported"))
+
+    val ex2 = intercept[MalformedCarbonCommandException] {
+      sql("alter table datamap_test7 add columns(address string)")
+    }
+    assert(ex2.getMessage.contains("alter table add column is not supported"))
+
+    val ex3 = intercept[MalformedCarbonCommandException] {
+      sql("alter table datamap_test7 change id id BIGINT")
+    }
+    assert(ex3.getMessage.contains("alter table change datatype is not supported"))
+
+    val ex4 = intercept[MalformedCarbonCommandException] {
+      sql("alter table datamap_test7 drop columns(name)")
+    }
+    assert(ex4.getMessage.contains("alter table drop column is not supported"))
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test7 OPTIONS('header'='false')")
+    val ex5 = intercept[MalformedCarbonCommandException] {
+      sql("UPDATE datamap_test7 d set(d.city)=('luc') where d.name='n10'").show()
+    }
+    assert(ex5.getMessage.contains("update operation is not supported"))
+
+    val ex6 = intercept[MalformedCarbonCommandException] {
+      sql("delete from datamap_test7 where name = 'n10'").show()
+    }
+    assert(ex6.getMessage.contains("delete operation is not supported"))
+  }
+
+  test("test lucene fine grain multiple data map on table") {
+    sql("DROP TABLE IF EXISTS datamap_test5")
+    sql(
+      """
+        | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm2 ON TABLE datamap_test5
+         | USING 'lucene'
+         | DMProperties('TEXT_COLUMNS'='city')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test5
+         | USING 'lucene'
+         | DMProperties('TEXT_COLUMNS'='Name')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('name:n10')"),
+      sql(s"select * from datamap_test5 where name='n10'"))
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+
+  }
+
   override protected def afterAll(): Unit = {
     LuceneFineGrainDataMapSuite.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")
@@ -632,6 +707,8 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS datamap_test2")
     sql("DROP TABLE IF EXISTS datamap_test3")
     sql("DROP TABLE IF EXISTS datamap_test4")
+    sql("DROP TABLE IF EXISTS datamap_test5")
+    sql("DROP TABLE IF EXISTS datamap_test7")
     sql("DROP TABLE IF EXISTS datamap_main")
     sql("use default")
     sql("drop database if exists lucene cascade")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index f13ffad..247f0ca 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.compression.SnappyCompressor
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, DiskBasedDMSchemaStorageProvider}
@@ -146,6 +147,13 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
    */
   override def deleteDatamapData(): Unit = {
   }
+
+  /**
+   * defines the features scopes for the datamap
+   */
+  override def willBecomeStale(feature: TableOperation): Boolean = {
+    false
+  }
 }
 
 class CGDataMap extends CoarseGrainDataMap {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 54a8dc2..ec72ffb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
 import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
@@ -79,6 +80,13 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory {
   override def deleteDatamapData(): Unit = {
     ???
   }
+
+  /**
+   * defines the features scopes for the datamap
+   */
+  override def willBecomeStale(operation: TableOperation): Boolean = {
+    false
+  }
 }
 
 class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index f7886a2..0422b24 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.compression.SnappyCompressor
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
@@ -144,6 +145,13 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
   override def deleteDatamapData(): Unit = {
     ???
   }
+
+  /**
+   * defines the features scopes for the datamap
+   */
+  override def willBecomeStale(operation: TableOperation): Boolean = {
+    false
+  }
 }
 
 class FGDataMap extends FineGrainDataMap {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index d48ac6b..037ba1e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, Coa
 import org.apache.carbondata.core.datamap.status.{DataMapStatus, DataMapStatusManager}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
 import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
@@ -227,4 +228,11 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
   override def deleteDatamapData(): Unit = {
 
   }
+
+  /**
+   * defines the features scopes for the datamap
+   */
+  override def willBecomeStale(operation: TableOperation): Boolean = {
+    false
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 2b7bd46..12bec0a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, Coa
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier}
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
@@ -336,4 +337,11 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
   override def deleteDatamapData(): Unit = {
 
   }
+
+  /**
+   * defines the features scopes for the datamap
+   */
+  override def willBecomeStale(operation: TableOperation): Boolean = {
+    false
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 050f154..3fe2c00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -70,21 +70,16 @@ case class CarbonCreateDataMapCommand(
 
     dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
     // TODO: move this if logic inside lucene module
-    if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.toString)) {
+    if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.getShortName) ||
+        dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.getClassName)) {
       val datamaps = DataMapStoreManager.getInstance().getAllDataMap(mainTable).asScala
       if (datamaps.nonEmpty) {
         datamaps.foreach(datamap => {
-          val dmColumns = datamap.getDataMapSchema.getProperties.get("text_columns")
-          val existingColumns = dmProperties("text_columns")
-
-          def getAllSubString(columns: String): Set[String] = {
-            columns.inits.flatMap(_.tails).toSet
-          }
-
-          val existingClmSets = getAllSubString(existingColumns)
-          val dmColumnsSets = getAllSubString(dmColumns)
-          val duplicateDMColumn = existingClmSets.intersect(dmColumnsSets).maxBy(_.length)
-          if (!duplicateDMColumn.isEmpty) {
+          val dmColumns = datamap.getDataMapSchema.getProperties.get("text_columns").trim
+            .toLowerCase.split(",").toSet
+          val existingColumns = dmProperties("text_columns").trim.toLowerCase().split(",").toSet
+          val duplicateDMColumn = dmColumns.intersect(existingColumns)
+          if (duplicateDMColumn.nonEmpty) {
             throw new MalformedDataMapCommandException(
               s"Create lucene datamap $dataMapName failed, datamap already exists on column(s) " +
               s"$duplicateDMColumn")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 0235666..98361db 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -87,6 +87,14 @@ case class CarbonDropDataMapCommand(
           lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
         }
         LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
+
+        // drop index datamap on the main table
+        if (mainTable != null &&
+            DataMapStoreManager.getInstance().getAllDataMap(mainTable).size() > 0) {
+          dropDataMapFromSystemFolder(sparkSession)
+          return Seq.empty
+        }
+
         // If datamap to be dropped in parent table then drop the datamap from metastore and remove
         // entry from parent table.
         // If force drop is true then remove the datamap from hivemetastore. No need to remove from

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 225237b..f1fa9b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -24,7 +24,9 @@ import org.apache.spark.sql.execution.command._
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
@@ -52,6 +54,11 @@ private[sql] case class CarbonProjectForDeleteCommand(
       throw new ConcurrentOperationException(carbonTable, "loading", "data delete")
     }
 
+    if (!carbonTable.canAllow(carbonTable, TableOperation.DELETE)) {
+      throw new MalformedCarbonCommandException(
+        "delete operation is not supported for index datamap")
+    }
+
     IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
     val dataFrame = Dataset.ofRows(sparkSession, plan)
     val dataRdd = dataFrame.rdd

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index d8379a7..573ea9a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -30,7 +30,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
@@ -64,6 +66,11 @@ private[sql] case class CarbonProjectForUpdateCommand(
       throw new ConcurrentOperationException(carbonTable, "loading", "data update")
     }
 
+    if (!carbonTable.canAllow(carbonTable, TableOperation.UPDATE)) {
+      throw new MalformedCarbonCommandException(
+        "update operation is not supported for index datamap")
+    }
+
     // trigger event for Update table
     val operationContext = new OperationContext
     val updateTablePreEvent: UpdateTablePreEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index d33fc6d..7e11170 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -24,7 +24,9 @@ import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterT
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.util.AlterTableUtil
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -57,6 +59,10 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       // up relation should be called after acquiring the lock
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_ADD_COLUMN)) {
+        throw new MalformedCarbonCommandException(
+          "alter table add column is not supported for index datamap")
+      }
       val operationContext = new OperationContext
       val alterTableAddColumnListener = AlterTableAddColumnPreEvent(sparkSession, carbonTable,
         alterTableAddColumnsModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index accaa27..fddbb7b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -24,7 +24,9 @@ import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, Da
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.util.AlterTableUtil
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -53,6 +55,10 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_CHANGE_DATATYPE)) {
+        throw new MalformedCarbonCommandException(
+          "alter table change datatype is not supported for index datamap")
+      }
       val operationContext = new OperationContext
       val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(sparkSession,
         carbonTable, alterTableDataTypeChangeModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index ff1541b..a82298c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -25,11 +25,15 @@ import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, Metada
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.util.AlterTableUtil
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
@@ -54,6 +58,10 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_DROP)) {
+        throw new MalformedCarbonCommandException(
+          "alter table drop column is not supported for index datamap")
+      }
       val partitionInfo = carbonTable.getPartitionInfo(tableName)
       if (partitionInfo != null) {
         val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index dfcd12b..779b937 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -81,6 +82,10 @@ private[sql] case class CarbonAlterTableRenameCommand(
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
 
+    if (!oldCarbonTable.canAllow(oldCarbonTable, TableOperation.ALTER_RENAME)) {
+      throw new MalformedCarbonCommandException("alter rename is not supported for index datamap")
+    }
+
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
       LockUsage.COMPACTION_LOCK,
       LockUsage.DELETE_SEGMENT_LOCK,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5229443b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 26d5330..d7e2023 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -35,6 +35,7 @@ import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.util.CarbonProperties
 
 /**
@@ -247,6 +248,11 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
             "Unsupported operation on non transactional table")
         }
 
+        if (carbonTable != null && !carbonTable.canAllow(carbonTable, TableOperation.STREAMING)) {
+          throw new MalformedCarbonCommandException(
+            "streaming is not supported for index datamap")
+        }
+
         // TODO remove this limitation later
         val property = properties.find(_._1.equalsIgnoreCase("streaming"))
         if (property.isDefined) {


Mime
View raw message