carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akash...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3620][CARBONDATA-3622] Update does not load cache in memory, behavior inconsistent with scenario when index server is not running
Date Sat, 11 Jan 2020 09:22:43 GMT
This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c1ab32  [CARBONDATA-3620][CARBONDATA-3622] Update does not load cache in memory,
behavior inconsistent with scenario when index server is not running
6c1ab32 is described below

commit 6c1ab32c0038a702cbe88aa0db873ea3011de2ca
Author: Vikram Ahuja <vikramahuja8803@gmail.com>
AuthorDate: Tue Dec 17 12:11:07 2019 +0530

    [CARBONDATA-3620][CARBONDATA-3622] Update does not load cache in memory, behavior inconsistent
with scenario when index server is not running
    
    Problems
    1: Segments for prepriming were never being sent to the index server after update success.
    2: While dropping a table the table is dropped first and then clear datamap is called,
If the table is
    being accessed somewhere while clearing datamaps in the index server then it will fail
and throw exception,
    thus the cache value will never be cleared.
    
    Solutions
    1: Segments for prepriming to the index server added after Update success.
    2: While dropping the table, to clear cache first and then drop the table.
    
    This closes #3511
---
 .../apache/carbondata/indexserver/DistributedCountRDD.scala    | 10 +++++-----
 .../org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala | 10 ++++++++--
 .../scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala  |  3 ++-
 3 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
index 930586f..c8b80af 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
@@ -64,6 +64,11 @@ class DistributedCountRDD(@transient ss: SparkSession, dataMapFormat: Distributa
       .newFixedThreadPool(numOfThreads, new CarbonThreadFactory("IndexPruningPool", true))
     implicit val ec: ExecutionContextExecutor = ExecutionContext
       .fromExecutor(service)
+    if (dataMapFormat.ifAsyncCall()) {
+      // to clear cache of invalid segments during pre-priming in index server
+      DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable,
+        dataMapFormat.getInvalidSegments)
+    }
     val futures = if (inputSplits.length <= numOfThreads) {
       inputSplits.map {
         split => generateFuture(Seq(split))
@@ -84,11 +89,6 @@ class DistributedCountRDD(@transient ss: SparkSession, dataMapFormat: Distributa
     } else {
       0L
     }
-    if (dataMapFormat.ifAsyncCall()) {
-      // to clear cache of invalid segments during pre-priming in index server
-      DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable,
-        dataMapFormat.getInvalidSegments)
-    }
     Iterator((executorIP + "_" + cacheSize.toString, results.map(_._2.toLong).sum.toString))
   }
 
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2b80985..5759042 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -455,20 +455,26 @@ object CarbonDataRDDFactory {
         if (resultSize == 0) {
           return null
         }
-        if (CarbonUpdateUtil.updateTableMetadataStatus(
+        if (!CarbonUpdateUtil.updateTableMetadataStatus(
           segmentDetails,
           carbonTable,
           updateModel.get.updatedTimeStamp + "",
           true,
           new util.ArrayList[Segment](0),
           new util.ArrayList[Segment](segmentFiles), "")) {
-        } else {
           LOGGER.error("Data update failed due to failure in table status updation.")
           updateModel.get.executorErrors.errorMsg = errorMessage
           updateModel.get.executorErrors.failureCauses = FailureCauses
             .STATUS_FILE_UPDATION_FAILURE
           return null
         }
+        // code to handle Pre-Priming cache for update command
+        if (!segmentFiles.isEmpty) {
+          val segmentsToPrePrime = segmentFiles.asScala.map(iterator => iterator.getSegmentNo).toSeq
+          DistributedRDDUtils
+            .triggerPrepriming(sqlContext.sparkSession, carbonTable, segmentsToPrePrime,
+              operationContext, hadoopConf, segmentsToPrePrime.toList)
+        }
       }
       return null
     }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index b2ba7f4..c00061a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -454,11 +454,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
   def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession)
{
     val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
+    // Clear both driver and executor cache.
+    DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
     val tableIdentifier = TableIdentifier(tableName, Option(dbName))
     sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-    DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
     SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
     removeTableFromMetadata(dbName, tableName)
   }


Mime
View raw message