carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/4] carbondata git commit: [CARBONDATA-2415] Support Refresh DataMap command for all Index datamap
Date Sun, 06 May 2018 08:50:16 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master f2fb06806 -> 9db662a2d


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala
deleted file mode 100644
index 2bc3eaf..0000000
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.lucene
-
-import java.text.SimpleDateFormat
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{CarbonInputMetrics, Partition, SparkContext, TaskContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.SparkSession
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.TaskMetricsMap
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection,
CarbonRecordReader}
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
-import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl}
-import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, CarbonSparkPartition}
-import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
-
-object LuceneDataMapRefreshRDD {
-
-  def refreshDataMap(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      schema: DataMapSchema
-  ): Unit = {
-    val tableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val segmentStatusManager = new SegmentStatusManager(tableIdentifier)
-    val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments()
-    val validSegments = validAndInvalidSegments.getValidSegments()
-    val indexedCarbonColumns =
-      LuceneDataMapFactoryBase.validateAndGetIndexedColumns(schema, carbonTable)
-
-    // loop all segments to rebuild DataMap
-    val tableInfo = carbonTable.getTableInfo()
-    validSegments.asScala.foreach { segment =>
-      val dataMapStorePath = LuceneDataMapWriter.genDataMapStorePath(
-        tableIdentifier.getTablePath(),
-        segment.getSegmentNo(),
-        schema.getDataMapName)
-      // if lucene datamap folder is exists, not require to build lucene datamap again
-      refreshOneSegment(sparkSession, tableInfo, dataMapStorePath, schema.getDataMapName,
-        indexedCarbonColumns, segment.getSegmentNo());
-    }
-  }
-
-  def refreshOneSegment(
-      sparkSession: SparkSession,
-      tableInfo: TableInfo,
-      dataMapStorePath: String,
-      dataMapName: String,
-      indexColumns: java.util.List[String],
-      segmentId: String): Unit = {
-
-    if (!FileFactory.isFileExist(dataMapStorePath)) {
-      if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath)))
{
-        try {
-          val status = new LuceneDataMapRefreshRDD[String, Boolean](
-            sparkSession.sparkContext,
-            new RefreshResultImpl(),
-            tableInfo,
-            dataMapStorePath,
-            dataMapName,
-            indexColumns.asScala.toArray,
-            segmentId
-          ).collect()
-
-          status.find(_._2 == false).foreach { task =>
-            throw new Exception(
-              s"Task Failed to refresh datamap $dataMapName on segment_$segmentId")
-          }
-        } catch {
-          case ex =>
-            // process failure
-            FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath))
-            throw new Exception(
-              s"Failed to refresh datamap $dataMapName on segment_$segmentId", ex)
-        }
-      }
-    }
-  }
-
-}
-
-class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]]
{
-  override def initialize(carbonColumns: Array[CarbonColumn],
-      carbonTable: CarbonTable): Unit = {
-  }
-
-  override def readRow(data: Array[Object]): Array[Object] = {
-    for (i <- 0 until dataTypes.length) {
-      if (dataTypes(i) == DataTypes.STRING) {
-        data(i) = data(i).toString
-      }
-    }
-    data
-  }
-
-  override def close(): Unit = {
-  }
-}
-
-class LuceneDataMapRefreshRDD[K, V](
-    sc: SparkContext,
-    result: RefreshResult[K, V],
-    @transient tableInfo: TableInfo,
-    dataMapStorePath: String,
-    dataMapName: String,
-    indexColumns: Array[String],
-    segmentId: String
-) extends CarbonRDDWithTableInfo[(K, V)](sc, Nil, tableInfo.serialize()) {
-
-  private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
-
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new util.Date())
-  }
-
-  override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)]
= {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    var status = false
-    val inputMetrics = new CarbonInputMetrics
-    TaskMetricsMap.getInstance().registerThreadCallback()
-    val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
-    inputMetrics.initBytesReadCallback(context, inputSplit)
-
-    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
-    val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
-    val format = createInputFormat(attemptContext)
-
-    val taskName = CarbonTablePath.getUniqueTaskName(inputSplit.getAllSplits.get(0).getBlockPath)
-
-    val tableInfo = getTableInfo
-    val identifier = tableInfo.getOrCreateAbsoluteTableIdentifier()
-
-    val columns = tableInfo.getFactTable.getListOfColumns.asScala
-    val dataTypes = indexColumns.map { columnName =>
-      columns.find(_.getColumnName.equals(columnName)).get.getDataType
-    }
-
-    val indexPath = LuceneDataMapWriter.genDataMapStorePathOnTaskId(identifier.getTablePath,
-      segmentId, dataMapName, taskName)
-
-    val model = format.createQueryModel(inputSplit, attemptContext)
-    // one query id per table
-    model.setQueryId(queryId)
-    model.setVectorReader(false)
-    model.setForcedDetailRawQuery(false)
-    model.setRequiredRowId(true)
-    var reader: CarbonRecordReader[Array[Object]] = null
-    var indexBuilder: LuceneIndexRefreshBuilder = null
-    try {
-      reader = new CarbonRecordReader(model, new OriginalReadSupport(dataTypes), inputMetrics)
-      reader.initialize(inputSplit, attemptContext)
-
-      indexBuilder = new LuceneIndexRefreshBuilder(indexPath, indexColumns, dataTypes)
-      indexBuilder.initialize()
-
-      while (reader.nextKeyValue()) {
-        indexBuilder.addDocument(reader.getCurrentValue)
-      }
-
-      indexBuilder.finish()
-
-      status = true
-    } finally {
-      if (reader != null) {
-        try {
-          reader.close()
-        } catch {
-          case ex =>
-            LOGGER.error(ex, "Failed to close reader")
-        }
-      }
-
-      if (indexBuilder != null) {
-        try {
-          indexBuilder.close()
-        } catch {
-          case ex =>
-            LOGGER.error(ex, "Failed to close index writer")
-        }
-      }
-    }
-
-    new Iterator[(K, V)] {
-
-      var finished = false
-
-      override def hasNext: Boolean = {
-        !finished
-      }
-
-      override def next(): (K, V) = {
-        finished = true
-        result.getKey(split.index.toString, status)
-      }
-    }
-  }
-
-
-  private def createInputFormat(
-      attemptContext: TaskAttemptContextImpl) = {
-    val format = new CarbonTableInputFormat[Object]
-    val tableInfo1 = getTableInfo
-    val conf = attemptContext.getConfiguration
-    CarbonInputFormat.setTableInfo(conf, tableInfo1)
-    CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
-    CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
-    CarbonInputFormat.setDataTypeConverter(conf, classOf[SparkDataTypeConverterImpl])
-
-    val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier()
-    CarbonInputFormat.setTablePath(
-      conf,
-      identifier.appendWithLocalPrefix(identifier.getTablePath))
-
-    CarbonInputFormat.setSegmentsToAccess(
-      conf,
-      Segment.toSegmentList(Array(segmentId), null))
-
-    CarbonInputFormat.setColumnProjection(
-      conf,
-      new CarbonProjection(indexColumns))
-    format
-  }
-
-  override protected def getPartitions = {
-    val conf = new Configuration()
-    val jobConf = new JobConf(conf)
-    SparkHadoopUtil.get.addCredentials(jobConf)
-    val job = Job.getInstance(jobConf)
-    job.getConfiguration.set("query.id", queryId)
-
-    val format = new CarbonTableInputFormat[Object]
-
-    CarbonInputFormat.setSegmentsToAccess(
-      job.getConfiguration,
-      Segment.toSegmentList(Array(segmentId), null))
-
-    CarbonInputFormat.setTableInfo(
-      job.getConfiguration,
-      tableInfo)
-    CarbonInputFormat.setTablePath(
-      job.getConfiguration,
-      tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath)
-    CarbonInputFormat.setDatabaseName(
-      job.getConfiguration,
-      tableInfo.getDatabaseName)
-    CarbonInputFormat.setTableName(
-      job.getConfiguration,
-      tableInfo.getFactTable.getTableName)
-
-    format
-      .getSplits(job)
-      .asScala
-      .map(_.asInstanceOf[CarbonInputSplit])
-      .groupBy(_.taskId)
-      .map { group =>
-        new CarbonMultiBlockSplit(
-          group._2.asJava,
-          group._2.flatMap(_.getLocations).toArray)
-      }
-      .zipWithIndex
-      .map { split =>
-        new CarbonSparkPartition(id, split._2, split._1)
-      }
-      .toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 3fe2c00..34a4013 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution.command.datamap
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -24,11 +25,11 @@ import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager, IndexDataMapProvider}
+import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
-import org.apache.carbondata.datamap.DataMapManager
+import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
 
 /**
  * Below command class will be used to create datamap on table
@@ -37,7 +38,7 @@ import org.apache.carbondata.datamap.DataMapManager
 case class CarbonCreateDataMapCommand(
     dataMapName: String,
     tableIdentifier: Option[TableIdentifier],
-    dmClassName: String,
+    dmProviderName: String,
     dmProperties: Map[String, String],
     queryString: Option[String],
     ifNotExistsSet: Boolean = false)
@@ -68,41 +69,39 @@ case class CarbonCreateDataMapCommand(
       }
     }
 
-    dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
-    // TODO: move this if logic inside lucene module
-    if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.getShortName)
||
-        dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.getClassName))
{
-      val datamaps = DataMapStoreManager.getInstance().getAllDataMap(mainTable).asScala
-      if (datamaps.nonEmpty) {
-        datamaps.foreach(datamap => {
-          val dmColumns = datamap.getDataMapSchema.getProperties.get("text_columns").trim
-            .toLowerCase.split(",").toSet
-          val existingColumns = dmProperties("text_columns").trim.toLowerCase().split(",").toSet
-          val duplicateDMColumn = dmColumns.intersect(existingColumns)
-          if (duplicateDMColumn.nonEmpty) {
-            throw new MalformedDataMapCommandException(
-              s"Create lucene datamap $dataMapName failed, datamap already exists on column(s)
" +
-              s"$duplicateDMColumn")
-          }
-        })
-      }
-    }
     if (mainTable != null &&
         mainTable.isStreamingTable &&
-        !(dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString)
-          || dataMapSchema.getProviderName
-            .equalsIgnoreCase(DataMapClassProvider.TIMESERIES.toString))) {
-      throw new MalformedCarbonCommandException(s"Streaming table does not support creating
${
-        dataMapSchema.getProviderName
-      } datamap")
+        !(dmProviderName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString)
+          || dmProviderName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.toString)))
{
+      throw new MalformedCarbonCommandException(s"Streaming table does not support creating
" +
+                                                s"$dmProviderName datamap")
     }
+
+    dataMapSchema = new DataMapSchema(dataMapName, dmProviderName)
     dataMapSchema.setProperties(new java.util.HashMap[String, String](
       dmProperties.map(x => (x._1.trim, x._2.trim)).asJava))
-    dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema, sparkSession)
-    dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull)
-    // TODO Currently this feature is only available for index datamaps
-    if (dataMapProvider.isInstanceOf[IndexDataMapProvider]) {
-      DataMapStatusManager.disableDataMap(dataMapName)
+
+    dataMapProvider = DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
+
+    // If it is index datamap, check whether the column has datamap created already
+    dataMapProvider match {
+      case provider: IndexDataMapProvider =>
+        val datamaps = DataMapStoreManager.getInstance.getAllDataMap(mainTable).asScala
+        val existingIndexColumn = mutable.Set[String]()
+        datamaps.foreach { datamap =>
+          datamap.getDataMapSchema.getIndexColumns.foreach(existingIndexColumn.add)
+        }
+
+        provider.getIndexedColumns.asScala.foreach { column =>
+          if (existingIndexColumn.contains(column.getColName)) {
+            throw new MalformedDataMapCommandException(String.format(
+              "column '%s' already has datamap created", column.getColName))
+          }
+        }
+        dataMapProvider.initMeta(queryString.orNull)
+        DataMapStatusManager.disableDataMap(dataMapName)
+      case _ =>
+        dataMapProvider.initMeta(queryString.orNull)
     }
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     LOGGER.audit(s"DataMap $dataMapName successfully added")
@@ -111,11 +110,11 @@ case class CarbonCreateDataMapCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dataMapProvider != null) {
-      dataMapProvider.initData(mainTable)
-      if (mainTable != null && mainTable.isAutoRefreshDataMap) {
-        if (!DataMapClassProvider.LUCENE.getShortName.equals(dataMapSchema.getProviderName))
{
-          dataMapProvider.rebuild(mainTable, dataMapSchema)
-        }
+      dataMapProvider.initData()
+      if (mainTable != null &&
+          mainTable.isAutoRefreshDataMap &&
+          !dataMapSchema.isIndexDataMap) {
+        dataMapProvider.rebuild()
       }
     }
     Seq.empty
@@ -123,7 +122,7 @@ case class CarbonCreateDataMapCommand(
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] =
{
     if (dataMapProvider != null) {
-      dataMapProvider.freeMeta(mainTable, dataMapSchema)
+      dataMapProvider.cleanMeta()
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
index 1c39e85..4f3b7bc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
@@ -21,11 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.DataCommand
 
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
-import org.apache.carbondata.datamap.DataMapManager
-import org.apache.carbondata.datamap.lucene.LuceneDataMapRefreshRDD
+import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRefreshRDD}
 
 /**
  * Refresh the datamaps through sync with main table data. After sync with parent table's
it enables
@@ -37,6 +35,7 @@ case class CarbonDataMapRefreshCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val schema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
+
     val table = tableIdentifier match {
       case Some(identifier) =>
         CarbonEnv.getCarbonTable(identifier)(sparkSession)
@@ -46,13 +45,9 @@ case class CarbonDataMapRefreshCommand(
           schema.getRelationIdentifier.getTableName
         )(sparkSession)
     }
-    // Sync the datamap with parent table
-    if (DataMapClassProvider.LUCENE.getShortName.endsWith(schema.getProviderName)) {
-      LuceneDataMapRefreshRDD.refreshDataMap(sparkSession, table, schema)
-    } else {
-      val provider = DataMapManager.get().getDataMapProvider(schema, sparkSession)
-      provider.rebuild(table, schema)
-    }
+    val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession)
+    provider.rebuild()
+
     // After sync success enable the datamap.
     DataMapStatusManager.enableDataMap(dataMapName)
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 19e6500..cf5a4ae 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -140,9 +140,10 @@ case class CarbonDropDataMapCommand(
                 dbName,
                 tableName))(sparkSession)
             if (dataMapProvider == null) {
-              dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema, sparkSession)
+              dataMapProvider =
+                DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
             }
-            dataMapProvider.freeMeta(mainTable, dataMapSchema)
+            dataMapProvider.cleanMeta()
 
             // fires the event after dropping datamap from main table schema
             val dropDataMapPostEvent =
@@ -181,15 +182,16 @@ case class CarbonDropDataMapCommand(
       Seq.empty
   }
 
-  private def dropDataMapFromSystemFolder(sparkSession: SparkSession) = {
+  private def dropDataMapFromSystemFolder(sparkSession: SparkSession): Unit = {
     try {
       if (dataMapSchema == null) {
         dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
       }
       if (dataMapSchema != null) {
-        dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema, sparkSession)
+        dataMapProvider =
+          DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
         DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName)
-        dataMapProvider.freeMeta(mainTable, dataMapSchema)
+        dataMapProvider.cleanMeta()
       }
     } catch {
       case e: Exception =>
@@ -202,7 +204,7 @@ case class CarbonDropDataMapCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
     if (dataMapProvider != null) {
-      dataMapProvider.freeData(mainTable, dataMapSchema)
+      dataMapProvider.cleanData()
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 540fdd2..62da7ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -147,7 +147,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
   /**
    * The syntax of datamap creation is as follows.
-   * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 'DataMapClassName'
+   * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 'DataMapProviderName'
    * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
    */
   protected lazy val createDataMap: Parser[LogicalPlan] =
@@ -155,10 +155,11 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     opt(ontable) ~
     (USING ~> stringLit) ~ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~
")").? ~
     (AS ~> restInput).? <~ opt(";") ^^ {
-      case ifnotexists ~ dmname ~ tableIdent ~ className ~ dmprops ~ query =>
+      case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ dmprops ~ query =>
 
         val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String]
-        CarbonCreateDataMapCommand(dmname, tableIdent, className, map, query, ifnotexists.isDefined)
+        CarbonCreateDataMapCommand(dmname, tableIdent, dmProviderName, map, query,
+          ifnotexists.isDefined)
     }
 
   protected lazy val ontable: Parser[TableIdentifier] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
index 33de06f..a8e7b6c 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -25,6 +25,8 @@ import scala.util.Random
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.util.CarbonProperties
+
 class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
   val inputFile = s"$resourcesPath/bloom_datamap_input.csv"
   val normalTable = "carbon_normal"
@@ -33,12 +35,38 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll
{
   val lineNum = 500000
 
   override protected def beforeAll(): Unit = {
+    new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
     createFile(inputFile, line = lineNum, start = 0)
     sql(s"DROP TABLE IF EXISTS $normalTable")
     sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }
 
-  test("test bloom datamap") {
+  private def checkQuery = {
+    checkAnswer(
+      sql(s"select * from $bloomDMSampleTable where id = 1"),
+      sql(s"select * from $normalTable where id = 1"))
+    checkAnswer(
+      sql(s"select * from $bloomDMSampleTable where id = 999"),
+      sql(s"select * from $normalTable where id = 999"))
+    checkAnswer(
+      sql(s"select * from $bloomDMSampleTable where city = 'city_1'"),
+      sql(s"select * from $normalTable where city = 'city_1'"))
+    checkAnswer(
+      sql(s"select * from $bloomDMSampleTable where city = 'city_999'"),
+      sql(s"select * from $normalTable where city = 'city_999'"))
+    checkAnswer(
+      sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
+          s" count(distinct s1), count(distinct s2) from $bloomDMSampleTable"),
+      sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
+          s" count(distinct s1), count(distinct s2) from $normalTable"))
+    checkAnswer(
+      sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+          s" from $bloomDMSampleTable"),
+      sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+          s" from $normalTable"))
+  }
+
+  test("test create bloom datamap on table with existing data") {
     sql(
       s"""
          | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
@@ -54,44 +82,72 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll
{
     sql(
       s"""
          | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
-         | USING '${classOf[BloomCoarseGrainDataMapFactory].getName}'
-         | DMProperties('BLOOM_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+         | USING 'bloomfilter'
+         | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
       """.stripMargin)
 
+    // load two segments
+    (1 to 2).foreach { i =>
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable
+           | OPTIONS('header'='false')
+         """.stripMargin)
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable
+           | OPTIONS('header'='false')
+         """.stripMargin)
+    }
+
+    sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
+    checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName)
+    checkQuery
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+
+  test("test create bloom datamap and refresh datamap") {
     sql(
       s"""
-         | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable
-         | OPTIONS('header'='false')
-       """.stripMargin)
+         | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8
STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
     sql(
       s"""
-         | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable
-         | OPTIONS('header'='false')
-       """.stripMargin)
-
-    sql(s"show datamap on table $bloomDMSampleTable").show(false)
-    sql(s"select * from $bloomDMSampleTable where city = 'city_5'").show(false)
-    sql(s"select * from $bloomDMSampleTable limit 5").show(false)
-
-    checkExistence(sql(s"show datamap on table $bloomDMSampleTable"), true, dataMapName)
-//    checkAnswer(sql(s"show datamap on table $bloomDMSampleTable"),
-//      Row(dataMapName, classOf[BloomCoarseGrainDataMapFactory].getName, "(NA)"))
-    checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 1"),
-      sql(s"select * from $normalTable where id = 1"))
-    checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 999"),
-      sql(s"select * from $normalTable where id = 999"))
-    checkAnswer(sql(s"select * from $bloomDMSampleTable where city = 'city_1'"),
-      sql(s"select * from $normalTable where city = 'city_1'"))
-    checkAnswer(sql(s"select * from $bloomDMSampleTable where city = 'city_999'"),
-      sql(s"select * from $normalTable where city = 'city_999'"))
-    checkAnswer(sql(s"select count(distinct id), count(distinct name), count(distinct city),"
+
-                    s" count(distinct s1), count(distinct s2) from $bloomDMSampleTable"),
-      sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
-          s" count(distinct s1), count(distinct s2) from $normalTable"))
-    checkAnswer(sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)"
+
-                    s" from $bloomDMSampleTable"),
-      sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
-          s" from $normalTable"))
+         | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8
STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+
+    // load two segments
+    (1 to 2).foreach { i =>
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable
+           | OPTIONS('header'='false')
+         """.stripMargin)
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable
+           | OPTIONS('header'='false')
+         """.stripMargin)
+    }
+
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+         | USING 'bloomfilter'
+         | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    sql(s"REFRESH DATAMAP $dataMapName ON TABLE $bloomDMSampleTable")
+    sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
+    checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName)
+    checkQuery
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }
 
   // todo: will add more tests on bloom datamap, such as exception, delete datamap, show
profiler

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterException.java
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterException.java
new file mode 100644
index 0000000..6d062f1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.datamap;
+
+public class DataMapWriterException extends RuntimeException {
+  public DataMapWriterException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index a9e30d3..9c3d5d6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.processing.store.TablePage;
 
 /**
@@ -44,14 +45,13 @@ public class DataMapWriterListener {
   private static final LogService LOG = LogServiceFactory.getLogService(
       DataMapWriterListener.class.getCanonicalName());
 
-  // list indexed column name -> list of data map writer
-  private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>();
+  // list indexed column -> list of data map writer
+  private Map<List<CarbonColumn>, List<DataMapWriter>> registry = new ConcurrentHashMap<>();
 
   /**
    * register all datamap writer for specified table and segment
    */
-  public void registerAllWriter(CarbonTable carbonTable, String segmentId,
-      String dataWritePath) {
+  public void registerAllWriter(CarbonTable carbonTable, String segmentId, String taskNo)
{
     List<TableDataMap> tableIndices;
     try {
       tableIndices = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
@@ -62,7 +62,7 @@ public class DataMapWriterListener {
     if (tableIndices != null) {
       for (TableDataMap tableDataMap : tableIndices) {
         DataMapFactory factory = tableDataMap.getDataMapFactory();
-        register(factory, segmentId, dataWritePath);
+        register(factory, segmentId, taskNo);
       }
     }
   }
@@ -70,7 +70,7 @@ public class DataMapWriterListener {
   /**
    * Register a DataMapWriter
    */
-  private void register(DataMapFactory factory, String segmentId, String dataWritePath) {
+  private void register(DataMapFactory factory, String segmentId, String taskNo) {
     assert (factory != null);
     assert (segmentId != null);
     DataMapMeta meta = factory.getMeta();
@@ -78,9 +78,15 @@ public class DataMapWriterListener {
       // if data map does not have meta, no need to register
       return;
     }
-    List<String> columns = factory.getMeta().getIndexedColumns();
+    List<CarbonColumn> columns = factory.getMeta().getIndexedColumns();
     List<DataMapWriter> writers = registry.get(columns);
-    DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath);
+    DataMapWriter writer = null;
+    try {
+      writer = factory.createWriter(new Segment(segmentId), taskNo);
+    } catch (IOException e) {
+      LOG.error("Failed to create DataMapWriter: " + e.getMessage());
+      throw new DataMapWriterException(e);
+    }
     if (writers != null) {
       writers.add(writer);
     } else {
@@ -91,10 +97,10 @@ public class DataMapWriterListener {
     LOG.info("DataMapWriter " + writer + " added");
   }
 
-  public void onBlockStart(String blockId, String blockPath, String taskName) throws IOException
{
+  public void onBlockStart(String blockId) throws IOException {
     for (List<DataMapWriter> writers : registry.values()) {
       for (DataMapWriter writer : writers) {
-        writer.onBlockStart(blockId, taskName);
+        writer.onBlockStart(blockId);
       }
     }
   }
@@ -107,7 +113,7 @@ public class DataMapWriterListener {
     }
   }
 
-  public void onBlockletStart(int blockletId) {
+  public void onBlockletStart(int blockletId) throws IOException {
     for (List<DataMapWriter> writers : registry.values()) {
       for (DataMapWriter writer : writers) {
         writer.onBlockletStart(blockletId);
@@ -115,7 +121,7 @@ public class DataMapWriterListener {
     }
   }
 
-  public void onBlockletEnd(int blockletId) {
+  public void onBlockletEnd(int blockletId) throws IOException {
     for (List<DataMapWriter> writers : registry.values()) {
       for (DataMapWriter writer : writers) {
         writer.onBlockletEnd(blockletId);
@@ -130,16 +136,18 @@ public class DataMapWriterListener {
    * @param tablePage  page data
    */
   public void onPageAdded(int blockletId, int pageId, TablePage tablePage) throws IOException
{
-    Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
-    for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries)
{
-      List<String> indexedColumns = entry.getKey();
+    Set<Map.Entry<List<CarbonColumn>, List<DataMapWriter>>> entries
= registry.entrySet();
+    for (Map.Entry<List<CarbonColumn>, List<DataMapWriter>> entry : entries)
{
+      List<CarbonColumn> indexedColumns = entry.getKey();
       ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
       for (int i = 0; i < indexedColumns.size(); i++) {
-        pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
+        pages[i] = tablePage.getColumnPage(indexedColumns.get(i).getColName());
       }
       List<DataMapWriter> writers = entry.getValue();
+      int pageSize = pages[0].getPageSize();
+
       for (DataMapWriter writer : writers) {
-        writer.onPageAdded(blockletId, pageId, pages);
+        writer.onPageAdded(blockletId, pageId, pageSize, pages);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index ea75cd2..edf67a7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -154,7 +154,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     return null;
   }
 
-  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
+  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws
IOException {
     String[] storeLocation = getStoreLocation(tableIdentifier);
     CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
         configuration, storeLocation, 0, iteratorIndex);
@@ -302,7 +302,12 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     }
 
     @Override public void run() {
-      doExecute(this.iterator, iteratorIndex);
+      try {
+        doExecute(this.iterator, iteratorIndex);
+      } catch (IOException e) {
+        LOGGER.error(e);
+        throw new RuntimeException(e);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 2ec85fc..a725936 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
@@ -233,10 +232,8 @@ public class CarbonFactDataHandlerModel {
 
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
     carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(configuration.getSchemaUpdatedTimeStamp());
-    carbonFactDataHandlerModel.setDatabaseName(
-        identifier.getDatabaseName());
-    carbonFactDataHandlerModel
-        .setTableName(identifier.getTableName());
+    carbonFactDataHandlerModel.setDatabaseName(identifier.getDatabaseName());
+    carbonFactDataHandlerModel.setTableName(identifier.getTableName());
     carbonFactDataHandlerModel.setMeasureCount(measureCount);
     carbonFactDataHandlerModel.setStoreLocation(storeLocation);
     carbonFactDataHandlerModel.setDimLens(dimLens);
@@ -262,8 +259,14 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
 
     DataMapWriterListener listener = new DataMapWriterListener();
-    listener.registerAllWriter(configuration.getTableSpec().getCarbonTable(),
-        configuration.getSegmentId(), storeLocation[new Random().nextInt(storeLocation.length)]);
+    listener.registerAllWriter(
+        configuration.getTableSpec().getCarbonTable(),
+        configuration.getSegmentId(),
+        CarbonTablePath.getShardName(
+            carbonDataFileAttributes.getTaskId(),
+            bucketId,
+            0,
+            String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
 
@@ -328,7 +331,12 @@ public class CarbonFactDataHandlerModel {
     listener.registerAllWriter(
         loadModel.getCarbonDataLoadSchema().getCarbonTable(),
         loadModel.getSegmentId(),
-        tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]);
+        CarbonTablePath.getShardName(
+            CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(loadModel.getTaskNo()),
+            carbonFactDataHandlerModel.getBucketId(),
+            carbonFactDataHandlerModel.getTaskExtension(),
+            String.valueOf(loadModel.getFactTimeStamp())));
+
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     return carbonFactDataHandlerModel;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index e5c90e0..6e557cd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -250,8 +250,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
   private void notifyDataMapBlockStart() {
     if (listener != null) {
       try {
-        String taskName = CarbonTablePath.getUniqueTaskName(carbonDataFileName);
-        listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath, taskName);
+        listener.onBlockStart(carbonDataFileName);
       } catch (IOException e) {
         throw new CarbonDataWriterException("Problem while writing datamap", e);
       }


Mime
View raw message