carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-2685][DataMap] Parallize datamap rebuild processing for segments
Date Tue, 10 Jul 2018 11:24:10 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 1dc86d291 -> 0112ed096


[CARBONDATA-2685][DataMap] Parallize datamap rebuild processing for segments

Currently in carbondata, while rebuilding datamap, one spark job will be
started for each segment and all the jobs are executed serailly. If we
have many historical segments, the rebuild will takes a lot of time.

Here we optimize the procedure for datamap rebuild and start one start
for each segments, all the tasks can be done in parallel in one spark
job.

This closes #2443


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0112ed09
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0112ed09
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0112ed09

Branch: refs/heads/master
Commit: 0112ed0964a8367485d273d630e64efb38004136
Parents: 1dc86d2
Author: xuchuanyin <xuchuanyin@hust.edu.cn>
Authored: Mon Jul 9 19:11:42 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Jul 10 19:23:53 2018 +0800

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       |   2 +-
 .../carbondata/hadoop/CarbonRecordReader.java   |  18 +++-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   1 -
 .../testsuite/datamap/FGDataMapTestCase.scala   |   2 +-
 .../org/apache/carbondata/spark/KeyVal.scala    |  11 +-
 .../datamap/IndexDataMapRebuildRDD.scala        | 105 ++++++++++---------
 6 files changed, 79 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0112ed09/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index dd8af27..574b4c6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -434,7 +434,7 @@ public final class DataMapStoreManager {
   }
 
   /**
-   * Clear the datamap/datamaps of a table from memory
+   * Clear the datamap/datamaps of a table from memory and disk
    *
    * @param identifier Table identifier
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0112ed09/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 6b56382..a54e7a4 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -50,6 +50,12 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T>
{
   protected QueryExecutor queryExecutor;
   private InputMetricsStats inputMetricsStats;
 
+  /**
+   * Whether to clear datamap when reader is closed. In some scenarios such as datamap rebuild,
+   * we will set it to true and will clear the datamap after rebuild
+   */
+  private boolean skipClearDataMapAtClose = false;
+
   public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
       InputMetricsStats inputMetricsStats) {
     this(queryModel, readSupport);
@@ -122,9 +128,11 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T>
{
         CarbonUtil.clearDictionaryCache(entry.getValue());
       }
     }
-    // Clear the datamap cache
-    DataMapStoreManager.getInstance()
-        .clearDataMaps(queryModel.getTable().getAbsoluteTableIdentifier());
+    if (!skipClearDataMapAtClose) {
+      // Clear the datamap cache
+      DataMapStoreManager.getInstance().clearDataMaps(
+          queryModel.getTable().getAbsoluteTableIdentifier());
+    }
     // close read support
     readSupport.close();
     try {
@@ -133,4 +141,8 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T>
{
       throw new IOException(e);
     }
   }
+
+  public void setSkipClearDataMapAtClose(boolean skipClearDataMapAtClose) {
+    this.skipClearDataMapAtClose = skipClearDataMapAtClose;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0112ed09/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index bdbd43b..d4d78c9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -74,7 +74,6 @@ class C2DataMapFactory(
    * delete datamap data if any
    */
   override def deleteDatamapData(): Unit = {
-    ???
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0112ed09/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 6f8310c..02ada2e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -132,7 +132,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
    * delete datamap data if any
    */
   override def deleteDatamapData(): Unit = {
-    ???
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0112ed09/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index bd28bed..c4b1144 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -141,9 +141,14 @@ class RestructureResultImpl extends RestructureResult[Int, Boolean] {
 }
 
 trait RefreshResult[K, V] extends Serializable {
-  def getKey(key: String, value: Boolean): (K, V)
+  /**
+   * Previously index datamap refresh is per segment, for CARBONDATA-2685 it will refresh
+   * all segments in a batch. The structure is taskNo -> (segmentNo, status)
+   */
+  def getKey(key: String, value: (String, Boolean)): (K, V)
 }
 
-class RefreshResultImpl extends RefreshResult[String, Boolean] {
-  override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value)
+class RefreshResultImpl extends RefreshResult[String, (String, Boolean)] {
+  override def getKey(key: String,
+      value: (String, Boolean)): (String, (String, Boolean)) = (key, value)
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0112ed09/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 05f472f..688656d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -59,6 +59,7 @@ import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
  * Helper object to rebuild the index DataMap
  */
 object IndexDataMapRebuildRDD {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   /**
    * Rebuild the datamap for all existing data in the table
@@ -78,56 +79,56 @@ object IndexDataMapRebuildRDD {
       tableIdentifier,
       mutable.Seq[String](schema.getDataMapName))
     OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, operationContext)
-    // loop all segments to rebuild DataMap
-    validSegments.asScala.foreach { segment =>
-      // if lucene datamap folder is exists, not require to build lucene datamap again
-      refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName,
-        indexedCarbonColumns, segment);
-    }
-    val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession,
-      tableIdentifier)
-    OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext)
-  }
 
-  private def refreshOneSegment(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      dataMapName: String,
-      indexColumns: java.util.List[CarbonColumn],
-      segment: Segment): Unit = {
+    val segments2DmStorePath = validSegments.asScala.map { segment =>
+      val dataMapStorePath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath,
+        segment.getSegmentNo, schema.getDataMapName)
+      segment -> dataMapStorePath
+    }.filter(p => !FileFactory.isFileExist(p._2)).toMap
 
-    val dataMapStorePath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath,
-      segment.getSegmentNo, dataMapName)
+    segments2DmStorePath.foreach { case (_, dmPath) =>
+      if (!FileFactory.mkdirs(dmPath, FileFactory.getFileType(dmPath))) {
+        throw new IOException(
+          s"Failed to create directory $dmPath for rebuilding datamap ${ schema.getDataMapName
}")
+      }
+    }
 
-    if (!FileFactory.isFileExist(dataMapStorePath)) {
-      if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath)))
{
-        try {
-          val status = new IndexDataMapRebuildRDD[String, Boolean](
-            sparkSession,
-            new RefreshResultImpl(),
-            carbonTable.getTableInfo,
-            dataMapName,
-            indexColumns.asScala.toArray,
-            segment
-          ).collect()
-
-          status.find(_._2 == false).foreach { task =>
-            throw new Exception(
-              s"Task Failed to rebuild datamap $dataMapName on segment_${segment.getSegmentNo}")
-          }
-        } catch {
-          case ex: Throwable =>
-            // process failure
-            FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath))
-            throw new Exception(
-              s"Failed to refresh datamap $dataMapName on segment_${segment.getSegmentNo}",
ex)
+    val status = new IndexDataMapRebuildRDD[String, (String, Boolean)](
+      sparkSession,
+      new RefreshResultImpl(),
+      carbonTable.getTableInfo,
+      schema.getDataMapName,
+      indexedCarbonColumns.asScala.toArray,
+      segments2DmStorePath.keySet
+    ).collect
+
+    // for failed segments, clean the result
+    val failedSegments = status
+      .find { case (taskId, (segmentId, rebuildStatus)) =>
+        !rebuildStatus
+      }
+      .map { task =>
+        val segmentId = task._2._1
+        val dmPath = segments2DmStorePath.filter(p => p._1.getSegmentNo.equals(segmentId)).values
+        val cleanResult = dmPath.map(p =>
+          FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(p)))
+        if (cleanResult.exists(!_)) {
+          LOGGER.error(s"Failed to clean up datamap store for segment_$segmentId")
+          false
+        } else {
+          true
         }
-      } else {
-        throw new IOException(s"Failed to create directory $dataMapStorePath")
       }
+
+    if (failedSegments.nonEmpty) {
+      throw new Exception(s"Failed to refresh datamap ${ schema.getDataMapName }")
     }
-  }
+    DataMapStoreManager.getInstance().clearDataMaps(tableIdentifier)
 
+    val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession,
+      tableIdentifier)
+    OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext)
+  }
 }
 
 class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]]
{
@@ -254,7 +255,7 @@ class IndexDataMapRebuildRDD[K, V](
     @transient tableInfo: TableInfo,
     dataMapName: String,
     indexColumns: Array[CarbonColumn],
-    segment: Segment
+    segments: Set[Segment]
 ) extends CarbonRDDWithTableInfo[(K, V)](
   session.sparkContext, Nil, tableInfo.serialize()) {
 
@@ -274,11 +275,12 @@ class IndexDataMapRebuildRDD[K, V](
     val inputMetrics = new CarbonInputMetrics
     TaskMetricsMap.getInstance().registerThreadCallback()
     val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+    val segment = inputSplit.getAllSplits.get(0).getSegment
     inputMetrics.initBytesReadCallback(context, inputSplit)
 
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
-    val format = createInputFormat(attemptContext)
+    val format = createInputFormat(segment, attemptContext)
 
     val model = format.createQueryModel(inputSplit, attemptContext)
     // one query id per table
@@ -290,8 +292,7 @@ class IndexDataMapRebuildRDD[K, V](
     var refresher: DataMapBuilder = null
     try {
       val segmentPropertiesFetcher = DataMapStoreManager.getInstance().getDataMap(carbonTable,
-        BlockletDataMapFactory.DATA_MAP_SCHEMA)
-        .getDataMapFactory
+        BlockletDataMapFactory.DATA_MAP_SCHEMA).getDataMapFactory
         .asInstanceOf[SegmentPropertiesFetcher]
       val segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment)
 
@@ -308,6 +309,8 @@ class IndexDataMapRebuildRDD[K, V](
       }
       reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics)
       reader.initialize(inputSplit, attemptContext)
+      // skip clear datamap and we will do this adter rebuild
+      reader.setSkipClearDataMapAtClose(true)
 
       var blockletId = 0
       var firstRow = true
@@ -360,13 +363,13 @@ class IndexDataMapRebuildRDD[K, V](
 
       override def next(): (K, V) = {
         finished = true
-        result.getKey(split.index.toString, status)
+        result.getKey(split.index.toString, (segment.getSegmentNo, status))
       }
     }
   }
 
 
-  private def createInputFormat(
+  private def createInputFormat(segment: Segment,
       attemptContext: TaskAttemptContextImpl) = {
     val format = new CarbonTableInputFormat[Object]
     val tableInfo1 = getTableInfo
@@ -405,7 +408,7 @@ class IndexDataMapRebuildRDD[K, V](
 
     CarbonInputFormat.setSegmentsToAccess(
       job.getConfiguration,
-      Segment.toSegmentList(Array(segment.getSegmentNo), null))
+      Segment.toSegmentList(segments.map(_.getSegmentNo).toArray, null))
 
     CarbonInputFormat.setTableInfo(
       job.getConfiguration,
@@ -424,7 +427,7 @@ class IndexDataMapRebuildRDD[K, V](
       .getSplits(job)
       .asScala
       .map(_.asInstanceOf[CarbonInputSplit])
-      .groupBy(_.taskId)
+      .groupBy(p => (p.getSegmentId, p.taskId))
       .map { group =>
         new CarbonMultiBlockSplit(
           group._2.asJava,


Mime
View raw message