carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3843] Support merging index for streaming table
Date Tue, 21 Jul 2020 07:19:30 GMT
This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a0ed65  [CARBONDATA-3843] Support merging index for streaming table
1a0ed65 is described below

commit 1a0ed65270acc6694ed52b13aaddf55bfcfe0422
Author: ajantha-bhat <ajanthabhat@gmail.com>
AuthorDate: Tue Jun 2 20:00:38 2020 +0530

    [CARBONDATA-3843] Support merging index for streaming table
    
    Why is this PR needed?
    Merge index is not created for normal segment (created by load, insert, compaction or
handoff) on streaming table.
    
    What changes were proposed in this PR?
    For a streaming table other than streaming segment (Row_V1), allow merge index creation
for all kinds of segments.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes  #3785
---
 docs/ddl-of-carbondata.md                          |   3 +-
 .../spark/sql/events/MergeIndexEventListener.scala | 134 +++++++++++----------
 .../CarbonAlterTableCompactionCommand.scala        |   5 -
 .../CarbonAlterTableAddHivePartitionCommand.scala  |  11 +-
 .../CarbonIndexFileMergeTestCase.scala             |  64 ++++++++--
 5 files changed, 134 insertions(+), 83 deletions(-)

diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 3165f4e..e7cfb0c 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -751,8 +751,9 @@ Users can specify which columns to include and exclude for local dictionary
gene
      ```
 
      **NOTE:**
+     * Merge index is supported on streaming table from carbondata 2.0.1 version.
+     But streaming segments (ROW_V1) cannot create merge index.
 
-     * Merge index is not supported on streaming table.
 
    - #### SET and UNSET
    
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 2995edc..4e06ff0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -30,9 +30,8 @@ import org.apache.spark.util.MergeIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatusManager}
 import org.apache.carbondata.core.util.{DataLoadMetrics, ObjectSerializationUtil}
 import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
@@ -62,7 +61,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging
{
             .asInstanceOf[util.List[String]]
         }
         val tempPath = operationContext.getProperty("tempPath")
-        if(!carbonTable.isStreamingSink) {
+        val loadMetaDetails = loadModel.getCurrentLoadMetadataDetail
+        if (loadMetaDetails != null && !loadMetaDetails.getFileFormat.equals(FileFormat.ROW_V1))
{
           if (null != compactedSegments && !compactedSegments.isEmpty) {
             MergeIndexUtil.mergeIndexFilesForCompactedSegments(sparkSession,
               carbonTable,
@@ -104,73 +104,77 @@ class MergeIndexEventListener extends OperationEventListener with Logging
{
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
-        if (!carbonMainTable.isStreamingSink) {
-          LOGGER.info(s"Merge Index request received for table " +
-                      s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName
}")
-          val lock = CarbonLockFactory.getCarbonLockObj(
-            carbonMainTable.getAbsoluteTableIdentifier,
-            LockUsage.COMPACTION_LOCK)
+        LOGGER.info(s"Merge Index request received for table " +
+                    s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName
}")
+        val lock = CarbonLockFactory.getCarbonLockObj(
+          carbonMainTable.getAbsoluteTableIdentifier,
+          LockUsage.COMPACTION_LOCK)
 
-          try {
-            if (lock.lockWithRetries()) {
-              LOGGER.info("Acquired the compaction lock for table" +
-                          s" ${ carbonMainTable.getDatabaseName }.${
-                            carbonMainTable
-                              .getTableName
-                          }")
-              val segmentsToMerge =
-                if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
-                  val validSegments =
-                    CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
-                  val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
-                  validSegments.foreach { segment =>
+        try {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock for table" +
+                        s" ${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName}")
+            val loadFolderDetailsArray = SegmentStatusManager
+              .readLoadMetadata(carbonMainTable.getMetadataPath)
+            val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
+              String]()
+            var streamingSegment: Set[String] = Set[String]()
+            loadFolderDetailsArray.foreach(loadMetadataDetails => {
+              if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
+                streamingSegment += loadMetadataDetails.getLoadName
+              }
+              segmentFileNameMap
+                .put(loadMetadataDetails.getLoadName,
+                  String.valueOf(loadMetadataDetails.getLoadStartTime))
+            })
+            val segmentsToMerge =
+              if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
+                val validSegments =
+                  CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
+                val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
+                validSegments.foreach { segment =>
+                  // do not add ROW_V1 format
+                  if (!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1))
{
                     validSegmentIds += segment.getSegmentNo
                   }
-                  validSegmentIds
-                } else {
-                  alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get
                 }
-
-              val loadFolderDetailsArray = SegmentStatusManager
-                .readLoadMetadata(carbonMainTable.getMetadataPath)
-              val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
-                String]()
-              loadFolderDetailsArray.foreach(loadMetadataDetails => {
-                segmentFileNameMap
-                  .put(loadMetadataDetails.getLoadName,
-                    String.valueOf(loadMetadataDetails.getLoadStartTime))
-              })
-              // in case of merge index file creation using Alter DDL command
-              // readFileFooterFromCarbonDataFile flag should be true. This flag is check
for legacy
-              // store (store <= 1.1 version) and create merge Index file as per new store
so that
-              // old store is also upgraded to new store
-              val startTime = System.currentTimeMillis()
-              CarbonMergeFilesRDD.mergeIndexFiles(
-                sparkSession = sparkSession,
-                segmentIds = segmentsToMerge,
-                segmentFileNameToSegmentIdMap = segmentFileNameMap,
-                tablePath = carbonMainTable.getTablePath,
-                carbonTable = carbonMainTable,
-                mergeIndexProperty = true,
-                readFileFooterFromCarbonDataFile = true)
-              LOGGER.info("Total time taken for merge index "
-                          + (System.currentTimeMillis() - startTime) + "ms")
-              // clear Block index Cache
-              MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge)
-              val requestMessage = "Compaction request completed for table " +
-                s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
-              LOGGER.info(requestMessage)
-            } else {
-              val lockMessage = "Not able to acquire the compaction lock for table " +
-                                s"${ carbonMainTable.getDatabaseName }." +
-                                s"${ carbonMainTable.getTableName}"
-              LOGGER.error(lockMessage)
-              CarbonException.analysisException(
-                "Table is already locked for compaction. Please try after some time.")
-            }
-          } finally {
-            lock.unlock()
+                validSegmentIds
+              } else {
+                alterTableMergeIndexEvent.alterTableModel
+                  .customSegmentIds
+                  .get
+                  .filterNot(streamingSegment.contains(_))
+              }
+            // in case of merge index file creation using Alter DDL command
+            // readFileFooterFromCarbonDataFile flag should be true. This flag is check for
legacy
+            // store (store <= 1.1 version) and create merge Index file as per new store
so that
+            // old store is also upgraded to new store
+            val startTime = System.currentTimeMillis()
+            CarbonMergeFilesRDD.mergeIndexFiles(
+              sparkSession = sparkSession,
+              segmentIds = segmentsToMerge,
+              segmentFileNameToSegmentIdMap = segmentFileNameMap,
+              tablePath = carbonMainTable.getTablePath,
+              carbonTable = carbonMainTable,
+              mergeIndexProperty = true,
+              readFileFooterFromCarbonDataFile = true)
+            LOGGER.info("Total time taken for merge index "
+                        + (System.currentTimeMillis() - startTime) + "ms")
+            // clear Block index Cache
+            MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge)
+            val requestMessage = "Compaction request completed for table " +
+              s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
+            LOGGER.info(requestMessage)
+          } else {
+            val lockMessage = "Not able to acquire the compaction lock for table " +
+                              s"${ carbonMainTable.getDatabaseName }." +
+                              s"${ carbonMainTable.getTableName}"
+            LOGGER.error(lockMessage)
+            CarbonException.analysisException(
+              "Table is already locked for compaction. Please try after some time.")
           }
+        } finally {
+          lock.unlock()
         }
     }
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 2224943..dc50cf5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -131,11 +131,6 @@ case class CarbonAlterTableCompactionCommand(
       }
       Seq.empty
     } else if (compactionType == CompactionType.SEGMENT_INDEX) {
-      if (table.isStreamingSink) {
-        throw new MalformedCarbonCommandException(
-          "Unsupported alter operation on carbon table: Merge index is not supported on streaming"
+
-          " table")
-      }
       val version = CarbonUtil.getFormatVersion(table)
       val isOlderVersion = version == ColumnarFormatVersion.V1 ||
                            version == ColumnarFormatVersion.V2
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index a080db6..09614a8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableMergeIndexEvent, OperationContext, OperationListenerBus,
PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
@@ -160,6 +160,13 @@ case class CarbonAlterTableAddHivePartitionCommand(
         // carbon index files, and it is not good for query performance since all index files
         // need to be read to spark driver.
         // So, here trigger to merge the index files by sending an event
+        val customSegmentIds = if (loadModel.getCurrentLoadMetadataDetail
+          .getFileFormat
+          .equals(FileFormat.ROW_V1)) {
+          Some(Seq("").toList)
+        } else {
+          Some(Seq(loadModel.getSegmentId).toList)
+        }
         val alterTableModel = AlterTableModel(
           dbName = Some(table.getDatabaseName),
           tableName = table.getTableName,
@@ -167,7 +174,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
           compactionType = "", // to trigger index merge, this is not required
           factTimeStamp = Some(System.currentTimeMillis()),
           alterSql = null,
-          customSegmentIds = Some(Seq(loadModel.getSegmentId).toList))
+          customSegmentIds = customSegmentIds)
         val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, alterTableModel)
         OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new OperationContext)
       }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index bb2c63f..6a079b5 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -426,14 +426,58 @@ class CarbonIndexFileMergeTestCase
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
-    assert(getIndexFileCount("default_streamingTable", "0") >= 1)
-    val exceptionMessage = intercept[Exception] {
-      sql("alter table streamingTable compact 'segment_index'")
-    }.getMessage
-    assert(exceptionMessage.contains("Unsupported alter operation on carbon table: Merge
index is not supported on streaming table"))
+    // check for one merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT)
== 1)
     sql("DROP TABLE IF EXISTS streamingTable")
   }
 
+  test("Verify alter table index merge for streaming table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("DROP TABLE IF EXISTS streamingTable")
+    sql(
+      """
+        | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age INT)
+        | STORED AS carbondata
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
+    // check for zero merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT)
== 0)
+    // check for one index file
+    assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.INDEX_FILE_EXT)
== 1)
+    sql("alter table streamingTable compact 'segment_index'")
+    sql("alter table streamingTable compact 'segment_index' where segment.id in (0)")
+    // check for one merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT)
== 1)
+    sql("DROP TABLE IF EXISTS streamingTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+  }
+
+  test("Verify alter table index merge for streaming table with custom segment") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("DROP TABLE IF EXISTS streamingTable")
+    sql(
+      """
+        | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age INT)
+        | STORED AS carbondata
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
+    // check for zero merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT)
== 0)
+    // check for one index file
+    assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.INDEX_FILE_EXT)
== 1)
+    sql("alter table streamingTable compact 'segment_index' where segment.id in (0)")
+    // check for one merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT)
== 1)
+    sql("DROP TABLE IF EXISTS streamingTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+  }
+
   test("verify driver cache gets updated after creating merge Index file") {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
@@ -471,7 +515,9 @@ class CarbonIndexFileMergeTestCase
     identifiers.forall(identifier => identifier.getMergeIndexFileName == null)
   }
 
-  private def getIndexFileCount(tableName: String, segment: String): Int = {
+  private def getIndexFileCount(tableName: String,
+      segment: String,
+      extension: String = CarbonTablePath.INDEX_FILE_EXT): Int = {
     val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
     val path = CarbonTablePath
       .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
@@ -479,15 +525,13 @@ class CarbonIndexFileMergeTestCase
       FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath)
         .listFiles(true, new CarbonFileFilter {
           override def accept(file: CarbonFile): Boolean = {
-            file.getName.endsWith(CarbonTablePath
-              .INDEX_FILE_EXT)
+            file.getName.endsWith(extension)
           }
         })
     } else {
       FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
         override def accept(file: CarbonFile): Boolean = {
-          file.getName.endsWith(CarbonTablePath
-            .INDEX_FILE_EXT)
+          file.getName.endsWith(extension)
         }
       })
     }


Mime
View raw message