carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-2633][BloomDataMap] Fix bugs in bloomfilter for dictionary/sort/date/TimeStamp column
Date Wed, 04 Jul 2018 06:14:11 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 133ec17e5 -> cd7c2102c


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index bfa498e..24fcf61 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -51,7 +52,8 @@ public class DataMapWriterListener {
   /**
    * register all datamap writer for specified table and segment
    */
-  public void registerAllWriter(CarbonTable carbonTable, String segmentId, String taskNo)
{
+  public void registerAllWriter(CarbonTable carbonTable, String segmentId,
+      String taskNo, SegmentProperties segmentProperties) {
     List<TableDataMap> tableIndices;
     try {
       tableIndices = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
@@ -65,7 +67,7 @@ public class DataMapWriterListener {
         // will rebuild the datamap manually
         if (!tableDataMap.getDataMapSchema().isLazy()) {
           DataMapFactory factory = tableDataMap.getDataMapFactory();
-          register(factory, segmentId, taskNo);
+          register(factory, segmentId, taskNo, segmentProperties);
         }
       }
     }
@@ -74,7 +76,8 @@ public class DataMapWriterListener {
   /**
    * Register a DataMapWriter
    */
-  private void register(DataMapFactory factory, String segmentId, String taskNo) {
+  private void register(DataMapFactory factory, String segmentId,
+      String taskNo, SegmentProperties segmentProperties) {
     assert (factory != null);
     assert (segmentId != null);
     DataMapMeta meta = factory.getMeta();
@@ -86,7 +89,7 @@ public class DataMapWriterListener {
     List<DataMapWriter> writers = registry.get(columns);
     DataMapWriter writer = null;
     try {
-      writer = factory.createWriter(new Segment(segmentId), taskNo);
+      writer = factory.createWriter(new Segment(segmentId), taskNo, segmentProperties);
     } catch (IOException e) {
       LOG.error("Failed to create DataMapWriter: " + e.getMessage());
       throw new DataMapWriterException(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
index fcabef5..2d96f6c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
@@ -25,12 +25,9 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 
 /**
  * This base abstract class for data loading.
@@ -159,20 +156,8 @@ public abstract class AbstractDataLoadProcessorStep {
    * @return
    */
   protected DataMapWriterListener getDataMapWriterListener(int bucketId) {
-    CarbonDataFileAttributes carbonDataFileAttributes =
-        new CarbonDataFileAttributes(Long.parseLong(configuration.getTaskNo()),
-            (Long) configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP));
-    DataMapWriterListener listener = new DataMapWriterListener();
-    listener.registerAllWriter(
-        configuration.getTableSpec().getCarbonTable(),
-        configuration.getSegmentId(),
-        CarbonTablePath.getShardName(
-            carbonDataFileAttributes.getTaskId(),
-            bucketId,
-            0,
-            String.valueOf(carbonDataFileAttributes.getFactTimeStamp()),
-            configuration.getSegmentId()));
-    return listener;
+    // todo: this method is useless, will remove it later
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
index 1ce8f9a..06f2ffc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
@@ -35,6 +35,11 @@ public interface FieldConverter {
   void convert(CarbonRow row, BadRecordLogHolder logHolder) throws CarbonDataLoadingException;
 
   /**
+   * It convert the literal value to carbon internal value
+   */
+  Object convert(Object value, BadRecordLogHolder logHolder) throws RuntimeException;
+
+  /**
    * This method clears all the dictionary caches being acquired.
    */
   void clear();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
index 4e46f9f..6fedb72 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
@@ -40,15 +40,20 @@ public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterI
   @Override
   public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
     Object object = row.getObject(index);
+    row.update(convert(object, logHolder), index);
+  }
+
+  @Override
+  public Object convert(Object value, BadRecordLogHolder logHolder) throws RuntimeException
{
     // TODO Its temporary, needs refactor here.
     ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
     try {
-      genericDataType.writeByteArray(object, dataOutputStream, logHolder);
+      genericDataType.writeByteArray(value, dataOutputStream, logHolder);
       dataOutputStream.close();
-      row.update(byteArray.toByteArray(), index);
+      return byteArray.toByteArray();
     } catch (Exception e) {
-      throw new CarbonDataLoadingException(object + "", e);
+      throw new CarbonDataLoadingException(value + "", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
index 0757f8a..167c19b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
@@ -35,11 +33,10 @@ import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
 import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary;
@@ -48,9 +45,6 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
 
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName());
-
   private BiDictionary<Integer, Object> dictionaryGenerator;
 
   private int index;
@@ -65,12 +59,13 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
 
   private boolean isEmptyBadRecord;
 
-  public DictionaryFieldConverterImpl(DataField dataField,
-      AbsoluteTableIdentifier absoluteTableIdentifier, String nullFormat, int index,
+  public DictionaryFieldConverterImpl(CarbonColumn carbonColumn,
+      String tableId, String nullFormat, int index,
       DictionaryClient client, boolean useOnePass, Map<Object, Integer> localCache,
       boolean isEmptyBadRecord, DictionaryColumnUniqueIdentifier identifier) throws IOException
{
     this.index = index;
-    this.carbonDimension = (CarbonDimension) dataField.getColumn();
+    assert carbonColumn instanceof CarbonDimension;
+    this.carbonDimension = (CarbonDimension) carbonColumn;
     this.nullFormat = nullFormat;
     this.isEmptyBadRecord = isEmptyBadRecord;
 
@@ -84,10 +79,9 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
         dictionary = cache.get(identifier);
       }
       dictionaryMessage = new DictionaryMessage();
-      dictionaryMessage.setColumnName(dataField.getColumn().getColName());
+      dictionaryMessage.setColumnName(carbonColumn.getColName());
       // for table initialization
-      dictionaryMessage
-          .setTableUniqueId(absoluteTableIdentifier.getCarbonTableIdentifier().getTableId());
+      dictionaryMessage.setTableUniqueId(tableId);
       dictionaryMessage.setData("0");
       // for generate dictionary
       dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
@@ -102,8 +96,18 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
   @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder)
       throws CarbonDataLoadingException {
     try {
+      row.update(convert(row.getString(index), logHolder), index);
+    } catch (RuntimeException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  @Override
+  public Object convert(Object value, BadRecordLogHolder logHolder)
+      throws RuntimeException {
+    try {
       String parsedValue = null;
-      String dimensionValue = row.getString(index);
+      String dimensionValue = (String) value;
       if (dimensionValue == null || dimensionValue.equals(nullFormat)) {
         parsedValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
       } else {
@@ -113,17 +117,18 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
         if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 &&
isEmptyBadRecord)) {
           String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
           if (null == message) {
-            message = CarbonDataProcessorUtil.prepareFailureReason(
-                carbonDimension.getColName(), carbonDimension.getDataType());
+            message = CarbonDataProcessorUtil
+                .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType());
             logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
-          } logHolder.setReason(message);
+          }
+          logHolder.setReason(message);
         }
-        row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY, index);
+        return CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
       } else {
-        row.update(dictionaryGenerator.getOrGenerateKey(parsedValue), index);
+        return dictionaryGenerator.getOrGenerateKey(parsedValue);
       }
     } catch (DictionaryGenerationException e) {
-      throw new CarbonDataLoadingException(e);
+      throw new RuntimeException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
index 64ddf27..2d0ff47 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.loading.converter.impl;
 
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -65,16 +66,23 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
   @Override
   public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
     String value = row.getString(index);
-    if (value == null) {
+    row.update(convert(value, logHolder), index);
+  }
+
+  @Override
+  public Object convert(Object value, BadRecordLogHolder logHolder)
+      throws RuntimeException {
+    String literalValue = (String) value;
+    if (literalValue == null) {
       logHolder.setReason(
           CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
-      row.update(1, index);
-    } else if (value.equals(nullFormat)) {
-      row.update(1, index);
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
+    } else if (literalValue.equals(nullFormat)) {
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
     } else {
-      int key = directDictionaryGenerator.generateDirectSurrogateKey(value);
-      if (key == 1) {
-        if ((value.length() > 0) || (value.length() == 0 && isEmptyBadRecord))
{
+      int key = directDictionaryGenerator.generateDirectSurrogateKey(literalValue);
+      if (key == CarbonCommonConstants.DIRECT_DICT_VALUE_NULL) {
+        if ((literalValue.length() > 0) || (literalValue.length() == 0 && isEmptyBadRecord))
{
           String message = logHolder.getColumnMessageMap().get(column.getColName());
           if (null == message) {
             message = CarbonDataProcessorUtil.prepareFailureReason(
@@ -84,7 +92,7 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
           logHolder.setReason(message);
         }
       }
-      row.update(key, index);
+      return key;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index c6cea65..470c092 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -83,7 +83,8 @@ public class FieldEncoderFactory {
             || dataField.getColumn().getColumnSchema().getParentColumnTableRelations().isEmpty())
{
           identifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
               dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType());
-          return new DictionaryFieldConverterImpl(dataField, absoluteTableIdentifier,
+          return new DictionaryFieldConverterImpl(dataField.getColumn(),
+              absoluteTableIdentifier.getCarbonTableIdentifier().getTableId(),
               nullFormat, index, client, useOnePass, localCache, isEmptyBadRecord,
               identifier);
         } else {
@@ -105,7 +106,8 @@ public class FieldEncoderFactory {
                   parentTableIdentifier);
           identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
               parentColumnIdentifier, dataField.getColumn().getDataType());
-          return new DictionaryFieldConverterImpl(dataField, parentAbsoluteTableIdentifier,
+          return new DictionaryFieldConverterImpl(dataField.getColumn(),
+              parentAbsoluteTableIdentifier.getCarbonTableIdentifier().getTableId(),
               nullFormat, index, null, false, null, isEmptyBadRecord, identifier);
         }
       } else if (dataField.getColumn().isComplex()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 724a312..9cbd607 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -63,17 +63,24 @@ public class MeasureFieldConverterImpl implements FieldConverter {
   public void convert(CarbonRow row, BadRecordLogHolder logHolder)
       throws CarbonDataLoadingException {
     String value = row.getString(index);
+    row.update(convert(value, logHolder), index);
+  }
+
+  @Override
+  public Object convert(Object value, BadRecordLogHolder logHolder)
+      throws RuntimeException {
+    String literalValue = (String) (value);
     Object output;
-    boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(value);
-    if (value == null || isNull) {
+    boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(literalValue);
+    if (literalValue == null || isNull) {
       String message = logHolder.getColumnMessageMap().get(measure.getColName());
       if (null == message) {
         message = CarbonDataProcessorUtil
             .prepareFailureReason(measure.getColName(), measure.getDataType());
         logHolder.getColumnMessageMap().put(measure.getColName(), message);
       }
-      row.update(null, index);
-    } else if (value.length() == 0) {
+      return null;
+    } else if (literalValue.length() == 0) {
       if (isEmptyBadRecord) {
         String message = logHolder.getColumnMessageMap().get(measure.getColName());
         if (null == message) {
@@ -83,30 +90,30 @@ public class MeasureFieldConverterImpl implements FieldConverter {
         }
         logHolder.setReason(message);
       }
-      row.update(null, index);
-    } else if (value.equals(nullformat)) {
-      row.update(null, index);
+      return null;
+    } else if (literalValue.equals(nullformat)) {
+      return null;
     } else {
       try {
         if (dataField.isUseActualData()) {
-          output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure,
true);
+          output =
+              DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure,
true);
         } else {
-          output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
+          output = DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure);
         }
-        row.update(output, index);
+        return output;
       } catch (NumberFormatException e) {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug("Can not convert value to Numeric type value. Value considered as
null.");
         }
         logHolder.setReason(
             CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
-        output = null;
-        row.update(output, index);
+        return null;
       }
     }
-
   }
 
+
   /**
    * Method to clean the dictionary cache. As in this MeasureFieldConverterImpl convert no
    * dictionary caches are acquired so nothing to clear. s

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
index 3018e49..1a2afd1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -54,12 +54,19 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter
{
 
   @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
     String dimensionValue = row.getString(index);
+    row.update(convert(dimensionValue, logHolder), index);
+  }
+
+  @Override
+  public Object convert(Object value, BadRecordLogHolder logHolder)
+      throws RuntimeException {
+    String dimensionValue = (String) value;
     if (null == dimensionValue && column.getDataType() != DataTypes.STRING) {
       logHolder.setReason(
           CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
-      updateWithNullValue(row);
+      return getNullValue();
     } else if (dimensionValue == null || dimensionValue.equals(nullformat)) {
-      updateWithNullValue(row);
+      return getNullValue();
     } else {
       String dateFormat = null;
       if (dataType == DataTypes.DATE) {
@@ -69,27 +76,27 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter
{
       }
       try {
         if (!dataField.isUseActualData()) {
-          byte[] value = DataTypeUtil
+          byte[] parsedValue = DataTypeUtil
               .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
           if (dataType == DataTypes.STRING
-              && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT)
{
+              && parsedValue.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT)
{
             throw new CarbonDataLoadingException(String.format(
                 "Dataload failed, String size cannot exceed %d bytes,"
                     + " please consider long string data type",
                 CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
           }
-          row.update(value, index);
+          return parsedValue;
         } else {
-          Object value = DataTypeUtil
+          Object parsedValue = DataTypeUtil
               .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
-          if (dataType == DataTypes.STRING
-              && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT)
{
+          if (dataType == DataTypes.STRING && parsedValue.toString().length()
+              > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
             throw new CarbonDataLoadingException(String.format(
                 "Dataload failed, String size cannot exceed %d bytes,"
                     + " please consider long string data type",
                 CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
           }
-          row.update(value, index);
+          return parsedValue;
         }
       } catch (CarbonDataLoadingException e) {
         throw e;
@@ -102,24 +109,22 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter
{
             logHolder.getColumnMessageMap().put(column.getColName(), message);
           }
           logHolder.setReason(message);
-          updateWithNullValue(row);
-        } else {
-          updateWithNullValue(row);
         }
       }
     }
+    return getNullValue();
   }
 
   @Override public void clear() {
   }
 
-  private void updateWithNullValue(CarbonRow row) {
+  private byte[] getNullValue() {
     if (dataField.isUseActualData()) {
-      row.update(null, index);
+      return null;
     } else if (dataType == DataTypes.STRING) {
-      row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
+      return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
     } else {
-      row.update(CarbonCommonConstants.EMPTY_BYTE_ARRAY, index);
+      return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd7c2102/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 6f8f987..266c75d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -282,7 +282,8 @@ public class CarbonFactDataHandlerModel {
               bucketId,
               0,
               String.valueOf(carbonDataFileAttributes.getFactTimeStamp()),
-              configuration.getSegmentId()));
+              configuration.getSegmentId()),
+          segmentProperties);
     }
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
@@ -353,7 +354,8 @@ public class CarbonFactDataHandlerModel {
             carbonFactDataHandlerModel.getBucketId(),
             carbonFactDataHandlerModel.getTaskExtension(),
             String.valueOf(loadModel.getFactTimeStamp()),
-            loadModel.getSegmentId()));
+            loadModel.getSegmentId()),
+        segmentProperties);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     setNumberOfCores(carbonFactDataHandlerModel);
     carbonFactDataHandlerModel


Mime
View raw message