carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/2] carbondata git commit: [CARBONDATA-2633][BloomDataMap] Fix bugs in bloomfilter for dictionary/sort/date/TimeStamp column
Date Wed, 04 Jul 2018 06:14:12 GMT
[CARBONDATA-2633][BloomDataMap] Fix bugs in bloomfilter for dictionary/sort/date/TimeStamp column

for dictionary column, carbon convert literal value to dict value, then
convert dict value to mdk value, at last it stores the mdk value as
internal value in carbonfile.

for other columns, carbon convert literal value to internal value using
field-converter.

Since bloomfilter datamap stores the internal value, during query we
should convert the literal value in filter to internal value in order to
match the value stored in bloomfilter datamap.

Changes are made:

1.FieldConverters were refactored to extract common value convert methods.
2.BloomQueryModel was optimized to support converting literal value to
internal value.
3.fix bugs for int/float/date/timestamp as bloom index column
4.fix bugs in dictionary/sort column as bloom index column
5.add tests
6.block (deferred) rebuild for bloom datamap (contains bugs that does
not fix in this commit)

This closes #2403


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cd7c2102
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cd7c2102
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cd7c2102

Branch: refs/heads/master
Commit: cd7c2102c6e4517fd4fd6907a93aac9ff1ece47e
Parents: 133ec17
Author: xuchuanyin <xuchuanyin@hust.edu.cn>
Authored: Fri Jun 29 22:23:55 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Wed Jul 4 14:13:47 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +
 .../apache/carbondata/core/datamap/Segment.java |   4 +
 .../core/datamap/dev/DataMapFactory.java        |  11 +-
 .../core/datamap/dev/DataMapWriter.java         |   2 +-
 .../blockletindex/BlockletDataMapFactory.java   |   6 +-
 .../DateDirectDictionaryGenerator.java          |  12 +-
 .../TimeStampDirectDictionaryGenerator.java     |  12 +-
 datamap/bloom/pom.xml                           |  10 +
 .../datamap/bloom/BloomCoarseGrainDataMap.java  | 188 +++++++--
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  20 +-
 .../datamap/bloom/BloomDataMapBuilder.java      |  11 +-
 .../datamap/bloom/BloomDataMapModel.java        |  12 +-
 .../datamap/bloom/BloomDataMapWriter.java       |  94 ++++-
 .../datamap/bloom/DataConvertUtil.java          |  42 ++
 .../examples/MinMaxIndexDataMapFactory.java     |   8 +-
 .../lucene/LuceneDataMapFactoryBase.java        |   7 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   |   4 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   5 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |   4 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |   5 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |   5 +-
 .../datamap/IndexDataMapRebuildRDD.scala        |   2 +-
 .../BloomCoarseGrainDataMapFunctionSuite.scala  | 412 +++++++++++++++++++
 .../bloom/BloomCoarseGrainDataMapSuite.scala    |   6 +-
 .../bloom/BloomCoarseGrainDataMapTestUtil.scala |  63 +++
 .../datamap/DataMapWriterListener.java          |  11 +-
 .../loading/AbstractDataLoadProcessorStep.java  |  19 +-
 .../loading/converter/FieldConverter.java       |   5 +
 .../impl/ComplexFieldConverterImpl.java         |  11 +-
 .../impl/DictionaryFieldConverterImpl.java      |  45 +-
 .../DirectDictionaryFieldConverterImpl.java     |  24 +-
 .../converter/impl/FieldEncoderFactory.java     |   6 +-
 .../impl/MeasureFieldConverterImpl.java         |  33 +-
 .../impl/NonDictionaryFieldConverterImpl.java   |  37 +-
 .../store/CarbonFactDataHandlerModel.java       |   6 +-
 35 files changed, 948 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 50c7138..7470603 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -28,6 +28,10 @@ public final class CarbonCommonConstants {
    */
   public static final int DICT_VALUE_NULL = 1;
   /**
+   * surrogate value of null for direct dictionary
+   */
+  public static final int DIRECT_DICT_VALUE_NULL = 1;
+  /**
    * integer size in bytes
    */
   public static final int INT_SIZE_IN_BYTE = 4;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 425cdf6..30e811a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -126,6 +126,10 @@ public class Segment implements Serializable {
     return segmentFileName;
   }
 
+  public void setReadCommittedScope(ReadCommittedScope readCommittedScope) {
+    this.readCommittedScope = readCommittedScope;
+  }
+
   public static List<Segment> toSegmentList(String[] segmentIds,
       ReadCommittedScope readCommittedScope) {
     List<Segment> list = new ArrayList<>(segmentIds.length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index ad709a0..ff4ef72 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
@@ -57,17 +58,15 @@ public abstract class DataMapFactory<T extends DataMap> {
   /**
    * Create a new write for this datamap, to write new data into the specified segment and shard
    */
-  public abstract DataMapWriter createWriter(Segment segment, String shardName)
-      throws IOException;
-
+  public abstract DataMapWriter createWriter(Segment segment, String shardName,
+      SegmentProperties segmentProperties) throws IOException;
   /**
    * Create a new DataMapBuilder for this datamap, to rebuild the specified
    * segment and shard data in the main table.
    * TODO: refactor to unify with DataMapWriter
    */
-  public abstract DataMapBuilder createBuilder(Segment segment, String shardName)
-      throws IOException;
-
+  public abstract DataMapBuilder createBuilder(Segment segment, String shardName,
+      SegmentProperties segmentProperties) throws IOException;
   /**
    * Get the datamap for segmentid
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 8c8d2d8..2ebb530 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -43,7 +43,7 @@ public abstract class DataMapWriter {
 
   protected String dataMapPath;
 
-  private List<CarbonColumn> indexColumns;
+  protected List<CarbonColumn> indexColumns;
 
   private boolean isWritingFinished;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 65fcb4b..836b6a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -87,12 +87,14 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override
-  public DataMapWriter createWriter(Segment segment, String shardName) {
+  public DataMapWriter createWriter(Segment segment, String shardName,
+      SegmentProperties segmentProperties) throws IOException {
     throw new UnsupportedOperationException("not implemented");
   }
 
   @Override
-  public DataMapBuilder createBuilder(Segment segment, String shardName) {
+  public DataMapBuilder createBuilder(Segment segment, String shardName,
+      SegmentProperties segmentProperties) throws IOException {
     throw new UnsupportedOperationException("not implemented");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index 329e260..1caa3e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -85,7 +85,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
   @Override public int generateDirectSurrogateKey(String memberStr) {
     if (null == memberStr || memberStr.trim().isEmpty() || memberStr
         .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-      return 1;
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
     }
     return getDirectSurrogateForMember(memberStr);
   }
@@ -103,7 +103,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
     } else {
       if (null == memberStr || memberStr.trim().isEmpty() || memberStr
           .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-        return 1;
+        return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
       }
       return getDirectSurrogateForMember(memberStr);
     }
@@ -127,7 +127,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
     }
     //adding +2 to reserve the first cuttOffDiff value for null or empty date
     if (null == dateToStr) {
-      return 1;
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
     } else {
       return generateKey(dateToStr.getTime());
     }
@@ -140,7 +140,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
    * @return member value/actual value Date
    */
   @Override public Object getValueFromSurrogate(int key) {
-    if (key == 1) {
+    if (key == CarbonCommonConstants.DIRECT_DICT_VALUE_NULL) {
       return null;
     }
     return key - cutOffDate;
@@ -157,7 +157,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
       }
     }
     if (timeValue == -1) {
-      return 1;
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
     } else {
       return generateKey(timeValue);
     }
@@ -168,7 +168,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Value for date type column is not in valid range. Value considered as null.");
       }
-      return 1;
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
     }
     return (int) Math.floor((double) timeValue / MILLIS_PER_DAY) + cutOffDate;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index c7a4194..72ed66c 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -126,7 +126,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
   @Override public int generateDirectSurrogateKey(String memberStr) {
     if (null == memberStr || memberStr.trim().isEmpty() || memberStr
         .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-      return 1;
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
     }
     return getDirectSurrogateForMember(memberStr);
   }
@@ -144,7 +144,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
     } else {
       if (null == memberStr || memberStr.trim().isEmpty() || memberStr
           .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-        return 1;
+        return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
       }
       return getDirectSurrogateForMember(memberStr);
     }
@@ -168,7 +168,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
     }
     //adding +2 to reserve the first cuttOffDiff value for null or empty date
     if (null == dateToStr) {
-      return 1;
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
     } else {
       return generateKey(dateToStr.getTime());
     }
@@ -181,7 +181,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
    * @return member value/actual value Date
    */
   @Override public Object getValueFromSurrogate(int key) {
-    if (key == 1) {
+    if (key == CarbonCommonConstants.DIRECT_DICT_VALUE_NULL) {
       return null;
     }
     long timeStamp = ((key - 2) * granularityFactor + cutOffTimeStamp);
@@ -200,7 +200,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
       }
     }
     if (timeValue == -1) {
-      return 1;
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
     } else {
       return generateKey(timeValue);
     }
@@ -212,7 +212,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
     if (time >= (long) Integer.MIN_VALUE && time <= (long) Integer.MAX_VALUE) {
       keyValue = (int) time;
     }
-    return keyValue < 0 ? 1 : keyValue + 2;
+    return keyValue < 0 ? CarbonCommonConstants.DIRECT_DICT_VALUE_NULL : keyValue + 2;
   }
 
   public void initialize() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/datamap/bloom/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/bloom/pom.xml b/datamap/bloom/pom.xml
index f9ebae2..d13eb4f 100644
--- a/datamap/bloom/pom.xml
+++ b/datamap/bloom/pom.xml
@@ -24,6 +24,16 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-processing</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>14.0.1</version>
+    </dependency>
+    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index ad18704..3143c62 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -20,29 +20,48 @@ package org.apache.carbondata.datamap.bloom;
 import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.bloom.CarbonBloomFilter;
@@ -58,10 +77,15 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
   public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
-  private Set<String> indexedColumn;
+  private Map<String, CarbonColumn> name2Col;
   private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
   private String shardName;
   private Path indexPath;
+  /**
+   * This is used to convert literal filter value to internal carbon value
+   */
+  private Map<String, FieldConverter> name2Converters;
+  private BadRecordLogHolder badRecordLogHolder;
 
   @Override
   public void init(DataMapModel dataMapModel) throws IOException {
@@ -70,10 +94,49 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
     if (dataMapModel instanceof BloomDataMapModel) {
       BloomDataMapModel model = (BloomDataMapModel) dataMapModel;
       this.cache = model.getCache();
-      this.indexedColumn = model.getIndexedColumnNames();
     }
   }
 
+  /**
+   * init field converters for index columns
+   */
+  public void initIndexColumnConverters(CarbonTable carbonTable, List<CarbonColumn> indexedColumn) {
+    this.name2Col = new HashMap<>(indexedColumn.size());
+    for (CarbonColumn col : indexedColumn) {
+      this.name2Col.put(col.getColName(), col);
+    }
+
+    try {
+      this.name2Converters = new HashMap<>(indexedColumn.size());
+      AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier
+          .from(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
+      String nullFormat = "\\N";
+      Map<Object, Integer>[] localCaches = new Map[indexedColumn.size()];
+
+      for (int i = 0; i < indexedColumn.size(); i++) {
+        localCaches[i] = new ConcurrentHashMap<>();
+        DataField dataField = new DataField(indexedColumn.get(i));
+        String dateFormat = CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_DATE_FORMAT,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+        dataField.setDateFormat(dateFormat);
+        String tsFormat = CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+        dataField.setTimestampFormat(tsFormat);
+        FieldConverter fieldConverter =
+            FieldEncoderFactory.getInstance().createFieldEncoder(dataField, absoluteTableIdentifier,
+                i, nullFormat, null, false, localCaches[i], false);
+        this.name2Converters.put(indexedColumn.get(i).getColName(), fieldConverter);
+      }
+    } catch (IOException e) {
+      LOGGER.error(e, "Exception occurs while init index columns");
+      throw new RuntimeException(e);
+    }
+    this.badRecordLogHolder = new BadRecordLogHolder();
+    this.badRecordLogHolder.setLogged(false);
+  }
+
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
       List<PartitionSpec> partitions) throws IOException {
@@ -83,7 +146,13 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
       return null;
     }
 
-    List<BloomQueryModel> bloomQueryModels = getQueryValue(filterExp.getFilterExpression());
+    List<BloomQueryModel> bloomQueryModels;
+    try {
+      bloomQueryModels = createQueryModel(filterExp.getFilterExpression());
+    } catch (DictionaryGenerationException | UnsupportedEncodingException e) {
+      LOGGER.error(e, "Exception occurs while creating query model");
+      throw new RuntimeException(e);
+    }
     for (BloomQueryModel bloomQueryModel : bloomQueryModels) {
       LOGGER.debug("prune blocklet for query: " + bloomQueryModel);
       BloomCacheKeyValue.CacheKey cacheKey = new BloomCacheKeyValue.CacheKey(
@@ -91,8 +160,7 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
       BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey);
       List<CarbonBloomFilter> bloomIndexList = cacheValue.getBloomFilters();
       for (CarbonBloomFilter bloomFilter : bloomIndexList) {
-        boolean scanRequired = bloomFilter.membershipTest(new Key(
-            convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue)));
+        boolean scanRequired = bloomFilter.membershipTest(new Key(bloomQueryModel.filterValue));
         if (scanRequired) {
           LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
               String.valueOf(bloomFilter.getBlockletNo())));
@@ -107,45 +175,26 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
     return new ArrayList<>(hitBlocklets);
   }
 
-  private byte[] convertValueToBytes(DataType dataType, Object value) {
-    try {
-      if (dataType == DataTypes.STRING) {
-        if (value instanceof byte[]) {
-          return (byte[]) value;
-        } else {
-          return String.valueOf(value).getBytes("utf-8");
-        }
-      } else {
-        return CarbonUtil.getValueAsBytes(dataType, value);
-      }
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException("Error occurs while converting " + value + " to " + dataType, e);
-    }
-  }
-
-  private List<BloomQueryModel> getQueryValue(Expression expression) {
+  private List<BloomQueryModel> createQueryModel(Expression expression)
+      throws DictionaryGenerationException, UnsupportedEncodingException {
     List<BloomQueryModel> queryModels = new ArrayList<BloomQueryModel>();
     if (expression instanceof EqualToExpression) {
       Expression left = ((EqualToExpression) expression).getLeft();
       Expression right = ((EqualToExpression) expression).getRight();
       String column;
-      DataType dataType;
-      Object value;
       if (left instanceof ColumnExpression && right instanceof LiteralExpression) {
         column = ((ColumnExpression) left).getColumnName();
-        if (indexedColumn.contains(column)) {
-          dataType = ((ColumnExpression) left).getDataType();
-          value = ((LiteralExpression) right).getLiteralExpValue();
-          BloomQueryModel bloomQueryModel = new BloomQueryModel(column, dataType, value);
+        if (this.name2Col.containsKey(column)) {
+          BloomQueryModel bloomQueryModel =
+              buildQueryModelFromExpression((ColumnExpression) left, (LiteralExpression) right);
           queryModels.add(bloomQueryModel);
         }
         return queryModels;
       } else if (left instanceof LiteralExpression && right instanceof ColumnExpression) {
         column = ((ColumnExpression) right).getColumnName();
-        if (indexedColumn.contains(column)) {
-          dataType = ((ColumnExpression) right).getDataType();
-          value = ((LiteralExpression) left).getLiteralExpValue();
-          BloomQueryModel bloomQueryModel = new BloomQueryModel(column, dataType, value);
+        if (this.name2Col.containsKey(column)) {
+          BloomQueryModel bloomQueryModel =
+              buildQueryModelFromExpression((ColumnExpression) right, (LiteralExpression) left);
           queryModels.add(bloomQueryModel);
         }
         return queryModels;
@@ -153,11 +202,68 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
     }
 
     for (Expression child : expression.getChildren()) {
-      queryModels.addAll(getQueryValue(child));
+      queryModels.addAll(createQueryModel(child));
     }
     return queryModels;
   }
 
+  private BloomQueryModel buildQueryModelFromExpression(ColumnExpression ce,
+      LiteralExpression le) throws DictionaryGenerationException, UnsupportedEncodingException {
+    String columnName = ce.getColumnName();
+    DataType dataType = ce.getDataType();
+    Object expressionValue = le.getLiteralExpValue();
+    Object literalValue;
+    // note that if the datatype is date/timestamp, the expressionValue is long type.
+    if (le.getLiteralExpDataType() == DataTypes.DATE) {
+      DateFormat format = new SimpleDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+      // the below settings are set statically according to DateDirectDirectionaryGenerator
+      format.setLenient(false);
+      format.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+      literalValue = format.format(new Date((long) expressionValue / 1000));
+    } else if (le.getLiteralExpDataType() == DataTypes.TIMESTAMP) {
+      DateFormat format =
+          new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+      // the below settings are set statically according to TimeStampDirectDirectionaryGenerator
+      format.setLenient(false);
+      literalValue = format.format(new Date((long) expressionValue / 1000));
+    } else {
+      literalValue = expressionValue;
+    }
+
+    return buildQueryModelInternal(this.name2Col.get(columnName), literalValue, dataType);
+  }
+
+  private BloomQueryModel buildQueryModelInternal(CarbonColumn carbonColumn,
+      Object filterLiteralValue, DataType filterValueDataType) throws
+      DictionaryGenerationException, UnsupportedEncodingException {
+    // convert the filter value to string and apply convertes on it to get carbon internal value
+    String strFilterValue = null;
+    if (null != filterLiteralValue) {
+      strFilterValue = String.valueOf(filterLiteralValue);
+    }
+
+    Object convertedValue = this.name2Converters.get(carbonColumn.getColName()).convert(
+        strFilterValue, badRecordLogHolder);
+
+    byte[] internalFilterValue;
+    if (carbonColumn.isMeasure()) {
+      // for measures, the value is already the type, just convert it to bytes.
+      internalFilterValue = CarbonUtil.getValueAsBytes(carbonColumn.getDataType(), convertedValue);
+    } else if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY) ||
+        carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
+      // for dictionary/date columns, convert the surrogate key to bytes
+      internalFilterValue = CarbonUtil.getValueAsBytes(DataTypes.INT, convertedValue);
+    } else {
+      // for non dictionary dimensions, is already bytes,
+      internalFilterValue = (byte[]) convertedValue;
+    }
+    if (internalFilterValue.length == 0) {
+      internalFilterValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+    }
+    return new BloomQueryModel(carbonColumn.getColName(), internalFilterValue);
+  }
+
   @Override
   public boolean isScanRequired(FilterResolverIntf filterExp) {
     return true;
@@ -177,12 +283,17 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
   }
   static class BloomQueryModel {
     private String columnName;
-    private DataType dataType;
-    private Object filterValue;
+    private byte[] filterValue;
 
-    private BloomQueryModel(String columnName, DataType dataType, Object filterValue) {
+    /**
+     * represent an query model will be applyied on bloom index
+     *
+     * @param columnName bloom index column
+     * @param filterValue key for the bloom index,
+     *                   this value is converted from user specified filter value in query
+     */
+    private BloomQueryModel(String columnName, byte[] filterValue) {
       this.columnName = columnName;
-      this.dataType = dataType;
       this.filterValue = filterValue;
     }
 
@@ -190,8 +301,7 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
     public String toString() {
       final StringBuilder sb = new StringBuilder("BloomQueryModel{");
       sb.append("columnName='").append(columnName).append('\'');
-      sb.append(", dataType=").append(dataType);
-      sb.append(", filterValue=").append(filterValue);
+      sb.append(", filterValue=").append(Arrays.toString(filterValue));
       sb.append('}');
       return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index e5f27a9..741e5fb 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -199,19 +200,21 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
   }
 
   @Override
-  public DataMapWriter createWriter(Segment segment, String shardName) throws IOException {
+  public DataMapWriter createWriter(Segment segment, String shardName,
+      SegmentProperties segmentProperties) throws IOException {
     LOGGER.info(
         String.format("Data of BloomCoarseGranDataMap %s for table %s will be written to %s",
             this.dataMapName, getCarbonTable().getTableName() , shardName));
     return new BloomDataMapWriter(getCarbonTable().getTablePath(), this.dataMapName,
-        this.dataMapMeta.getIndexedColumns(), segment, shardName,
+        this.dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties,
         this.bloomFilterSize, this.bloomFilterFpp, bloomCompress);
   }
 
   @Override
-  public DataMapBuilder createBuilder(Segment segment, String shardName) throws IOException {
+  public DataMapBuilder createBuilder(Segment segment, String shardName,
+      SegmentProperties segmentProperties) throws IOException {
     return new BloomDataMapBuilder(getCarbonTable().getTablePath(), this.dataMapName,
-        this.dataMapMeta.getIndexedColumns(), segment, shardName,
+        this.dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties,
         this.bloomFilterSize, this.bloomFilterFpp, bloomCompress);
   }
 
@@ -232,8 +235,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
       }
       for (String shard : shardPaths) {
         BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
-        bloomDM.init(new BloomDataMapModel(shard, cache,
-            new HashSet<>(dataMapMeta.getIndexedColumnNames())));
+        bloomDM.init(new BloomDataMapModel(shard, cache));
+        bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
         dataMaps.add(bloomDM);
       }
     } catch (Exception e) {
@@ -248,8 +251,9 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
     BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap();
     String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
-    bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache,
-        new HashSet<>(dataMapMeta.getIndexedColumnNames())));
+    bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache));
+    bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(),
+        dataMapMeta.getIndexedColumns());
     coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
     return coarseGrainDataMaps;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
index d16af06..f7100e6 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -38,10 +39,12 @@ import org.apache.hadoop.util.bloom.Key;
 public class BloomDataMapBuilder extends BloomDataMapWriter implements DataMapBuilder {
 
   BloomDataMapBuilder(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
-      Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp,
-      boolean bloomCompress) throws IOException {
-    super(tablePath, dataMapName, indexColumns, segment, shardName, bloomFilterSize, bloomFilterFpp,
-        bloomCompress);
+      Segment segment, String shardName, SegmentProperties segmentProperties,
+      int bloomFilterSize, double bloomFilterFpp, boolean bloomCompress) throws IOException {
+    super(tablePath, dataMapName, indexColumns, segment, shardName, segmentProperties,
+        bloomFilterSize, bloomFilterFpp, bloomCompress);
+    throw new RuntimeException(
+        "Deferred rebuild for bloomfilter datamap is currently not supported");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
index ddee6e5..9d5d741 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
@@ -16,8 +16,6 @@
  */
 package org.apache.carbondata.datamap.bloom;
 
-import java.util.Set;
-
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 
@@ -25,21 +23,13 @@ public class BloomDataMapModel extends DataMapModel {
 
   private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
 
-  private Set<String> indexedColumnNames;
-
   public BloomDataMapModel(String filePath,
-      Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache,
-      Set<String> indexedColumnNames) {
+      Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache) {
     super(filePath);
     this.cache = cache;
-    this.indexedColumnNames = indexedColumnNames;
   }
 
   public Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> getCache() {
     return cache;
   }
-
-  public Set<String> getIndexedColumnNames() {
-    return indexedColumnNames;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index dc24415..f72809c 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -19,20 +19,30 @@ package org.apache.carbondata.datamap.bloom;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Predicate;
 import org.apache.hadoop.util.bloom.CarbonBloomFilter;
 import org.apache.hadoop.util.bloom.Key;
 import org.apache.hadoop.util.hash.Hash;
@@ -55,10 +65,17 @@ public class BloomDataMapWriter extends DataMapWriter {
   private List<String> currentDMFiles;
   private List<DataOutputStream> currentDataOutStreams;
   protected List<CarbonBloomFilter> indexBloomFilters;
+  private KeyGenerator keyGenerator;
+  private ColumnarSplitter columnarSplitter;
+  // for the dict/sort/date column, they are encoded in MDK,
+  // this maps the index column name to the index in MDK
+  private Map<String, Integer> indexCol2MdkIdx;
+  // this gives the reverse map to indexCol2MdkIdx
+  private Map<Integer, String> mdkIdx2IndexCol;
 
   BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
-      Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp,
-      boolean compressBloom)
+      Segment segment, String shardName, SegmentProperties segmentProperties,
+      int bloomFilterSize, double bloomFilterFpp, boolean compressBloom)
       throws IOException {
     super(tablePath, dataMapName, indexColumns, segment, shardName);
     this.bloomFilterSize = bloomFilterSize;
@@ -69,6 +86,27 @@ public class BloomDataMapWriter extends DataMapWriter {
     indexBloomFilters = new ArrayList<>(indexColumns.size());
     initDataMapFile();
     resetBloomFilters();
+
+    keyGenerator = segmentProperties.getDimensionKeyGenerator();
+    columnarSplitter = segmentProperties.getFixedLengthKeySplitter();
+    this.indexCol2MdkIdx = new HashMap<>();
+    this.mdkIdx2IndexCol = new HashMap<>();
+    int idx = 0;
+    for (final CarbonDimension dimension : segmentProperties.getDimensions()) {
+      if (!dimension.isGlobalDictionaryEncoding() && !dimension.isDirectDictionaryEncoding()) {
+        continue;
+      }
+      boolean isExistInIndex = CollectionUtils.exists(indexColumns, new Predicate() {
+        @Override public boolean evaluate(Object object) {
+          return ((CarbonColumn) object).getColName().equalsIgnoreCase(dimension.getColName());
+        }
+      });
+      if (isExistInIndex) {
+        this.indexCol2MdkIdx.put(dimension.getColName(), idx);
+        this.mdkIdx2IndexCol.put(idx, dimension.getColName());
+      }
+      idx++;
+    }
   }
 
   @Override
@@ -119,22 +157,56 @@ public class BloomDataMapWriter extends DataMapWriter {
 
   @Override
   public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
-    List<CarbonColumn> indexColumns = getIndexColumns();
     for (int rowId = 0; rowId < pageSize; rowId++) {
       // for each indexed column, add the data to bloom filter
       for (int i = 0; i < indexColumns.size(); i++) {
         Object data = pages[i].getData(rowId);
         DataType dataType = indexColumns.get(i).getDataType();
         byte[] indexValue;
-        if (DataTypes.STRING == dataType) {
-          indexValue = getStringData(data);
-        } else if (DataTypes.BYTE_ARRAY == dataType) {
-          byte[] originValue = (byte[]) data;
-          // String and byte array is LV encoded, L is short type
-          indexValue = new byte[originValue.length - 2];
-          System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2);
-        } else {
+        // convert measure to bytes
+        // convert non-dict dimensions to simple bytes without length
+        // convert internal-dict dimensions to simple bytes without any encode
+        if (indexColumns.get(i).isMeasure()) {
           indexValue = CarbonUtil.getValueAsBytes(dataType, data);
+        } else {
+          if (indexColumns.get(i).hasEncoding(Encoding.DICTIONARY)
+              || indexColumns.get(i).hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            byte[] mdkBytes;
+            // this means that we need to pad some fake bytes
+            // to get the whole MDK in corresponding position
+            if (columnarSplitter.getBlockKeySize().length > indexCol2MdkIdx.size()) {
+              int totalSize = 0;
+              for (int size : columnarSplitter.getBlockKeySize()) {
+                totalSize += size;
+              }
+              mdkBytes = new byte[totalSize];
+              int startPos = 0;
+              int destPos = 0;
+              for (int keyIdx = 0; keyIdx < columnarSplitter.getBlockKeySize().length; keyIdx++) {
+                if (mdkIdx2IndexCol.containsKey(keyIdx)) {
+                  int size = columnarSplitter.getBlockKeySize()[keyIdx];
+                  System.arraycopy(data, startPos, mdkBytes, destPos, size);
+                  startPos += size;
+                }
+                destPos += columnarSplitter.getBlockKeySize()[keyIdx];
+              }
+            } else {
+              mdkBytes = (byte[]) data;
+            }
+            // for dict columns including dictionary and date columns
+            // decode value to get the surrogate key
+            int surrogateKey = (int) keyGenerator.getKey(mdkBytes,
+                indexCol2MdkIdx.get(indexColumns.get(i).getColName()));
+            // store the dictionary key in bloom
+            indexValue = CarbonUtil.getValueAsBytes(DataTypes.INT, surrogateKey);
+          } else if (DataTypes.VARCHAR == dataType) {
+            indexValue = DataConvertUtil.getRawBytesForVarchar((byte[]) data);
+          } else {
+            indexValue = DataConvertUtil.getRawBytes((byte[]) data);
+          }
+        }
+        if (indexValue.length == 0) {
+          indexValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
         }
         indexBloomFilters.get(i).add(new Key(indexValue));
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java
new file mode 100644
index 0000000..b40dfe2
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/DataConvertUtil.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.bloom;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public class DataConvertUtil {
+  /**
+   * get raw bytes from LV structure, L is short encoded
+   */
+  public static byte[] getRawBytes(byte[] lvData) {
+    byte[] indexValue = new byte[lvData.length - CarbonCommonConstants.SHORT_SIZE_IN_BYTE];
+    System.arraycopy(lvData, CarbonCommonConstants.SHORT_SIZE_IN_BYTE,
+        indexValue, 0, lvData.length - CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    return indexValue;
+  }
+
+  /**
+   * get raw bytes from LV structure, L is int encoded
+   */
+  public static byte[] getRawBytesForVarchar(byte[] lvData) {
+    byte[] indexValue = new byte[lvData.length - CarbonCommonConstants.INT_SIZE_IN_BYTE];
+    System.arraycopy(lvData, CarbonCommonConstants.INT_SIZE_IN_BYTE,
+        indexValue, 0, lvData.length - CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    return indexValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 84b9e65..bc65a93 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -82,14 +83,15 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
    * @return
    */
   @Override
-  public DataMapWriter createWriter(Segment segment, String shardName) {
+  public DataMapWriter createWriter(Segment segment, String shardName,
+      SegmentProperties segmentProperties) {
     return new MinMaxDataWriter(getCarbonTable(), getDataMapSchema(), segment, shardName,
         dataMapMeta.getIndexedColumns());
   }
 
   @Override
-  public DataMapBuilder createBuilder(Segment segment, String shardName)
-      throws IOException {
+  public DataMapBuilder createBuilder(Segment segment, String shardName,
+      SegmentProperties segmentProperties) throws IOException {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index cc14dc4..5393746 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -202,7 +203,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
    * Return a new write for this datamap
    */
   @Override
-  public DataMapWriter createWriter(Segment segment, String shardName) {
+  public DataMapWriter createWriter(Segment segment, String shardName,
+      SegmentProperties segmentProperties) {
     LOGGER.info("lucene data write to " + shardName);
     return new LuceneDataMapWriter(getCarbonTable().getTablePath(), dataMapName,
         dataMapMeta.getIndexedColumns(), segment, shardName, flushCacheSize,
@@ -210,7 +212,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
   }
 
   @Override
-  public DataMapBuilder createBuilder(Segment segment, String shardName) {
+  public DataMapBuilder createBuilder(Segment segment, String shardName,
+      SegmentProperties segmentProperties) {
     return new LuceneDataMapBuilder(getCarbonTable().getTablePath(), dataMapName,
         segment, shardName, dataMapMeta.getIndexedColumns(), flushCacheSize, storeBlockletWise);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 074c807..e141a09 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -56,7 +56,7 @@ class CGDataMapFactory(
   /**
    * Return a new write for this datamap
    */
-  override def createWriter(segment: Segment, shardName: String): DataMapWriter = {
+  override def createWriter(segment: Segment, shardName: String, segmentProperties: SegmentProperties): DataMapWriter = {
     new CGDataMapWriter(carbonTable, segment, shardName, dataMapSchema)
   }
 
@@ -149,7 +149,7 @@ class CGDataMapFactory(
   }
 
   override def createBuilder(segment: Segment,
-      shardName: String): DataMapBuilder = {
+      shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 4250269..e61d99c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
 import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -54,7 +55,7 @@ class C2DataMapFactory(
 
   override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
 
-  override def createWriter(segment: Segment, shardName: String): DataMapWriter =
+  override def createWriter(segment: Segment, shardName: String, segmentProperties: SegmentProperties): DataMapWriter =
     DataMapWriterSuite.dataMapWriterC2Mock(identifier, "testdm", segment, shardName)
 
   override def getMeta: DataMapMeta =
@@ -84,7 +85,7 @@ class C2DataMapFactory(
   }
 
   override def createBuilder(segment: Segment,
-      shardName: String): DataMapBuilder = {
+      shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 08d8911..20a76f8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -54,7 +54,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
   /**
    * Return a new write for this datamap
    */
-  override def createWriter(segment: Segment, dataWritePath: String): DataMapWriter = {
+  override def createWriter(segment: Segment, dataWritePath: String, segmentProperties: SegmentProperties): DataMapWriter = {
     new FGDataMapWriter(carbonTable, segment, dataWritePath, dataMapSchema)
   }
 
@@ -143,7 +143,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
   }
 
   override def createBuilder(segment: Segment,
-      shardName: String): DataMapBuilder = {
+      shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index 0c4f652..0cafe33 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, Coa
 import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
 import org.apache.carbondata.core.datamap.status.{DataMapStatus, DataMapStatusManager}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
+import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
@@ -237,7 +238,7 @@ class TestDataMapFactory(
     ???
   }
 
-  override def createWriter(segment: Segment, shardName: String): DataMapWriter = {
+  override def createWriter(segment: Segment, shardName: String, segmentProperties: SegmentProperties): DataMapWriter = {
     new DataMapWriter(carbonTable.getTablePath, "testdm", carbonTable.getIndexedColumns(dataMapSchema),
       segment, shardName) {
       override def onPageAdded(blockletId: Int, pageId: Int, pageSize: Int, pages: Array[ColumnPage]): Unit = { }
@@ -280,7 +281,7 @@ class TestDataMapFactory(
   }
 
   override def createBuilder(segment: Segment,
-      shardName: String): DataMapBuilder = {
+      shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = {
     return new DataMapBuilder {
       override def initialize(): Unit = { }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 1657a80..2441b53 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
+import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
@@ -307,7 +308,7 @@ class WaitingDataMapFactory(
 
   override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
 
-  override def createWriter(segment: Segment, shardName: String): DataMapWriter = {
+  override def createWriter(segment: Segment, shardName: String, segmentProperties: SegmentProperties): DataMapWriter = {
     new DataMapWriter(carbonTable.getTablePath, dataMapSchema.getDataMapName,
       carbonTable.getIndexedColumns(dataMapSchema), segment, shardName) {
       override def onPageAdded(blockletId: Int, pageId: Int, pageSize: Int, pages: Array[ColumnPage]): Unit = { }
@@ -353,7 +354,7 @@ class WaitingDataMapFactory(
   }
 
   override def createBuilder(segment: Segment,
-      shardName: String): DataMapBuilder = {
+      shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = {
     ???
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index d064306..c1c20d8 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -192,7 +192,7 @@ class IndexDataMapRebuildRDD[K, V](
 
       // we use task name as shard name to create the folder for this datamap
       val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath)
-      refresher = dataMapFactory.createBuilder(new Segment(segmentId), shardName)
+      refresher = dataMapFactory.createBuilder(new Segment(segmentId), shardName, null)
       refresher.initialize()
 
       var blockletId = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
new file mode 100644
index 0000000..ee84c02
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
@@ -0,0 +1,412 @@
+package org.apache.carbondata.datamap.bloom
+
+import java.io.File
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapTestUtil.deleteFile
+import org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapTestUtil.createFile
+import org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapTestUtil.checkBasicQuery
+
+class BloomCoarseGrainDataMapFunctionSuite  extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
+  val bigFile = s"$resourcesPath/bloom_datamap_function_test_big.csv"
+  val normalTable = "carbon_normal"
+  val bloomDMSampleTable = "carbon_bloom"
+  val dataMapName = "bloom_dm"
+
+  override protected def beforeAll(): Unit = {
+    deleteFile(bigFile)
+    new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
+    createFile(bigFile, line = 2000)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+
+  override def afterEach(): Unit = {
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+  }
+
+  test("test bloom datamap: index column is integer, dictionary, sort_column") {
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128', 'dictionary_include'='id', 'sort_columns'='id')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+         | USING 'bloomfilter'
+         | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    var map = DataMapStatusManager.readDataMapStatusMap()
+    assert(map.get(dataMapName).isEnabled)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+
+    map = DataMapStatusManager.readDataMapStatusMap()
+    assert(map.get(dataMapName).isEnabled)
+
+    sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
+    checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName)
+    sql(s"select * from $bloomDMSampleTable where id = 1").show(false)
+    sql(s"select * from $bloomDMSampleTable where city = 'city_1'").show(false)
+    checkBasicQuery(dataMapName, bloomDMSampleTable, normalTable)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+
+  test("test bloom datamap: index column is integer, dictionary, not sort_column") {
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128', 'dictionary_include'='id', 'sort_columns'='name')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+         | USING 'bloomfilter'
+         | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    var map = DataMapStatusManager.readDataMapStatusMap()
+    assert(map.get(dataMapName).isEnabled)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+
+    map = DataMapStatusManager.readDataMapStatusMap()
+    assert(map.get(dataMapName).isEnabled)
+
+    sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
+    checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName)
+    sql(s"select * from $bloomDMSampleTable where id = 1").show(false)
+    sql(s"select * from $bloomDMSampleTable where city = 'city_1'").show(false)
+    checkBasicQuery(dataMapName, bloomDMSampleTable, normalTable)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+
+  test("test bloom datamap: index column is integer, sort_column") {
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128', 'sort_columns'='id')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+         | USING 'bloomfilter'
+         | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    var map = DataMapStatusManager.readDataMapStatusMap()
+    assert(map.get(dataMapName).isEnabled)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable
+         | OPTIONS('header'='false')
+         """.stripMargin)
+
+    map = DataMapStatusManager.readDataMapStatusMap()
+    assert(map.get(dataMapName).isEnabled)
+
+    sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
+    checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName)
+    sql(s"select * from $bloomDMSampleTable where id = 1").show(false)
+    sql(s"select * from $bloomDMSampleTable where city = 'city_1'").show(false)
+    checkBasicQuery(dataMapName, bloomDMSampleTable, normalTable)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+
+  test("test bloom datamap: index column is float, not dictionary") {
+    val floatCsvPath = s"$resourcesPath/datasamplefordate.csv"
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(empno string, doj date, salary float)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('SORT_COLUMNS'='empno')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA INPATH '$floatCsvPath' INTO TABLE $normalTable OPTIONS(
+         | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(empno string, doj date, salary float)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('SORT_COLUMNS'='empno')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES (
+         | 'INDEX_COLUMNS'='salary')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA INPATH '$floatCsvPath' INTO TABLE $bloomDMSampleTable OPTIONS(
+         | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE')
+       """.stripMargin)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040.56'").show(false)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040'").show(false)
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040.56'"),
+      sql(s"SELECT * FROM $normalTable WHERE salary='1040.56'"))
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040'"),
+      sql(s"SELECT * FROM $normalTable WHERE salary='1040'"))
+  }
+
+  test("test bloom datamap: index column is float, dictionary") {
+    val floatCsvPath = s"$resourcesPath/datasamplefordate.csv"
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(empno string, doj date, salary float)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('SORT_COLUMNS'='empno')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA INPATH '$floatCsvPath' INTO TABLE $normalTable OPTIONS(
+         | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(empno string, doj date, salary float)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('SORT_COLUMNS'='empno', 'dictionary_include'='salary')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES (
+         | 'INDEX_COLUMNS'='salary')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA INPATH '$floatCsvPath' INTO TABLE $bloomDMSampleTable OPTIONS(
+         | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE')
+       """.stripMargin)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040.56'").show(false)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040'").show(false)
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040.56'"),
+      sql(s"SELECT * FROM $normalTable WHERE salary='1040.56'"))
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040'"),
+      sql(s"SELECT * FROM $normalTable WHERE salary='1040'"))
+  }
+
+  // since float cannot be sort_columns, we skip the test case
+
+  test("test bloom datamap: index column is date") {
+    val dateCsvPath = s"$resourcesPath/datasamplefordate.csv"
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(empno string, doj date, salary float)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('SORT_COLUMNS'='empno')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA INPATH '$dateCsvPath' INTO TABLE $normalTable OPTIONS(
+         | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(empno string, doj date, salary float)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('SORT_COLUMNS'='empno')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES (
+         | 'INDEX_COLUMNS'='doj')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA INPATH '$dateCsvPath' INTO TABLE $bloomDMSampleTable OPTIONS(
+         | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE')
+       """.stripMargin)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-14'").show(false)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-15'").show(false)
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-14'"),
+      sql(s"SELECT * FROM $normalTable WHERE doj='2016-03-14'"))
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-15'"),
+      sql(s"SELECT * FROM $normalTable WHERE doj='2016-03-15'"))
+  }
+
+  test("test bloom datamap: index column is date, dictionary, sort column") {
+    val dateCsvPath = s"$resourcesPath/datasamplefordate.csv"
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(empno string, doj date, salary float)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('SORT_COLUMNS'='empno')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA INPATH '$dateCsvPath' INTO TABLE $normalTable OPTIONS(
+         | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(empno string, doj date, salary float)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('SORT_COLUMNS'='empno', 'dictionary_include'='doj', 'sort_columns'='doj')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES (
+         | 'INDEX_COLUMNS'='doj')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA INPATH '$dateCsvPath' INTO TABLE $bloomDMSampleTable OPTIONS(
+         | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE')
+       """.stripMargin)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-14'").show(false)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-15'").show(false)
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-14'"),
+      sql(s"SELECT * FROM $normalTable WHERE doj='2016-03-14'"))
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-15'"),
+      sql(s"SELECT * FROM $normalTable WHERE doj='2016-03-15'"))
+  }
+
+  // since date cannot be dictionary_exclude, we skip the test case
+
+  // timestamp is naturally not dictionary
+  test("test bloom datamap: index column is timestamp") {
+    val timeStampData = s"$resourcesPath/timeStampFormatData1.csv"
+    sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS $normalTable (
+         | ID Int, date date, starttime Timestamp, country String, name String, phonetype String, serialname String, salary Int)
+         | STORED BY 'carbondata'
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$timeStampData' into table $normalTable
+         | OPTIONS('dateformat' = 'yyyy/MM/dd','timestampformat'='yyyy-MM-dd HH:mm:ss')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS $bloomDMSampleTable (
+         | ID Int, date date, starttime Timestamp, country String, name String, phonetype String, serialname String, salary Int)
+         | STORED BY 'carbondata'
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES (
+         | 'INDEX_COLUMNS'='starttime')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$timeStampData' into table $bloomDMSampleTable
+         | OPTIONS('dateformat' = 'yyyy/MM/dd','timestampformat'='yyyy-MM-dd HH:mm:ss')
+       """.stripMargin)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE starttime='2016-07-25 01:03:30.0'").show(false)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE starttime='2016-07-25 01:03:31.0'").show(false)
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE starttime='2016-07-25 01:03:30.0'"),
+      sql(s"SELECT * FROM $normalTable WHERE starttime='2016-07-25 01:03:30.0'"))
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE starttime='2016-07-25 01:03:31.0'"),
+      sql(s"SELECT * FROM $normalTable WHERE starttime='2016-07-25 01:03:31.0'"))
+  }
+
+  test("test bloom datamap: index column is timestamp, dictionary, sort_column") {
+    val timeStampData = s"$resourcesPath/timeStampFormatData1.csv"
+    sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS $normalTable (
+         | ID Int, date date, starttime Timestamp, country String, name String, phonetype String, serialname String, salary Int)
+         | STORED BY 'carbondata'
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$timeStampData' into table $normalTable
+         | OPTIONS('dateformat' = 'yyyy/MM/dd','timestampformat'='yyyy-MM-dd HH:mm:ss')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS $bloomDMSampleTable (
+         | ID Int, date date, starttime Timestamp, country String, name String, phonetype String, serialname String, salary Int)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('dictionary_column'='starttime', 'sort_columns'='starttime')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES (
+         | 'INDEX_COLUMNS'='starttime')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$timeStampData' into table $bloomDMSampleTable
+         | OPTIONS('dateformat' = 'yyyy/MM/dd','timestampformat'='yyyy-MM-dd HH:mm:ss')
+       """.stripMargin)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE starttime=null").show(false)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE starttime='2016-07-25 01:03:30.0'").show(false)
+    sql(s"SELECT * FROM $bloomDMSampleTable WHERE starttime='2016-07-25 01:03:31.0'").show(false)
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE starttime='2016-07-25 01:03:30.0'"),
+      sql(s"SELECT * FROM $normalTable WHERE starttime='2016-07-25 01:03:30.0'"))
+    checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE starttime='2016-07-25 01:03:31.0'"),
+      sql(s"SELECT * FROM $normalTable WHERE starttime='2016-07-25 01:03:31.0'"))
+  }
+
+  override def afterAll(): Unit = {
+    deleteFile(bigFile)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
index c9a4097..a8e4193 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -141,7 +141,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
     sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }
 
-  test("test create bloom datamap and REBUILD DATAMAP") {
+  ignore("test create bloom datamap and REBUILD DATAMAP") {
     sql(
       s"""
          | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
@@ -183,7 +183,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
     sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }
 
-  test("test create bloom datamap with DEFERRED REBUILD, query hit datamap") {
+  ignore("test create bloom datamap with DEFERRED REBUILD, query hit datamap") {
     sql(
       s"""
          | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
@@ -261,7 +261,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
     sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }
 
-  test("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") {
+  ignore("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") {
     sql(
       s"""
          | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapTestUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapTestUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapTestUtil.scala
new file mode 100644
index 0000000..add65d2
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapTestUtil.scala
@@ -0,0 +1,63 @@
+package org.apache.carbondata.datamap.bloom
+
+import java.io.{File, PrintWriter}
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{CarbonSession, DataFrame}
+
+object BloomCoarseGrainDataMapTestUtil extends QueryTest {
+
+  def createFile(fileName: String, line: Int = 10000, start: Int = 0): Unit = {
+    if (!new File(fileName).exists()) {
+      val write = new PrintWriter(new File(fileName))
+      for (i <- start until (start + line)) {
+        write.println(
+          s"$i,n$i,city_$i,${ Random.nextInt(80) }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }")
+      }
+      write.close()
+    }
+  }
+
+  def deleteFile(fileName: String): Unit = {
+    val file = new File(fileName)
+    if (file.exists()) {
+      file.delete()
+    }
+  }
+
+  private def checkSqlHitDataMap(sqlText: String, dataMapName: String, shouldHit: Boolean): DataFrame = {
+    if (shouldHit) {
+      assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isDataMapHit(sqlText, dataMapName))
+    } else {
+      assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isDataMapHit(sqlText, dataMapName))
+    }
+    sql(sqlText)
+  }
+
+  def checkBasicQuery(dataMapName: String, bloomDMSampleTable: String, normalTable: String, shouldHit: Boolean = true): Unit = {
+    checkAnswer(
+      checkSqlHitDataMap(s"select * from $bloomDMSampleTable where id = 1", dataMapName, shouldHit),
+      sql(s"select * from $normalTable where id = 1"))
+    checkAnswer(
+      checkSqlHitDataMap(s"select * from $bloomDMSampleTable where id = 999", dataMapName, shouldHit),
+      sql(s"select * from $normalTable where id = 999"))
+    checkAnswer(
+      checkSqlHitDataMap(s"select * from $bloomDMSampleTable where city = 'city_1'", dataMapName, shouldHit),
+      sql(s"select * from $normalTable where city = 'city_1'"))
+    checkAnswer(
+      checkSqlHitDataMap(s"select * from $bloomDMSampleTable where city = 'city_999'", dataMapName, shouldHit),
+      sql(s"select * from $normalTable where city = 'city_999'"))
+     checkAnswer(
+      sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+          s" from $bloomDMSampleTable"),
+      sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+          s" from $normalTable"))
+  }
+}


Mime
View raw message