carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4046] Handled multiple partition columns for partition cache
Date Thu, 03 Dec 2020 14:20:15 GMT
This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b379ebe  [CARBONDATA-4046] Handled multiple partition columns for partition cache
b379ebe is described below

commit b379ebec3cc4f251f784f704cefff345ef17c032
Author: Nihal ojha <nihalnitn97@gmail.com>
AuthorDate: Tue Nov 3 10:47:02 2020 +0530

    [CARBONDATA-4046] Handled multiple partition columns for partition cache
    
    Why is this PR needed?
    1. Currently when property carbon.read.partition.hive.direct is false then
    select count * fails on table which contains multiple partition columns.
    2. Subtraction of the different data types.
    3. If the final cache is empty and the invalid segment list is non-empty
    then clear the cache.
    
    What changes were proposed in this PR?
    1. Handled multiple partition columns.
    2. Handled subtraction of the different data types.
    3. If the final cache is empty and the invalid segment list is non-empty then clear the
cache.
    
    This closes #4002
---
 .../apache/spark/util/PartitionCacheManager.scala  | 23 ++++++++++++++--------
 .../StandardPartitionTableLoadingTestCase.scala    | 15 ++++++++++++++
 2 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
index 411cbe2..39f33e5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
@@ -79,9 +79,10 @@ object PartitionCacheManager extends Cache[PartitionCacheKey,
           segmentFilePath.getAbsolutePath), segmentFileModifiedTime))
       }
     }.toMap
+    val invalidSegmentMap = validInvalidSegments.getInvalidSegments.asScala
+      .map(seg => (seg.getSegmentNo, seg)).toMap
     // remove all invalid segment entries from cache
-    val finalCache = cacheablePartitionSpecs --
-                     validInvalidSegments.getInvalidSegments.asScala.map(_.getSegmentNo)
+    val finalCache = cacheablePartitionSpecs -- invalidSegmentMap.keySet
     val cacheObject = CacheablePartitionSpec(finalCache)
     if (finalCache.nonEmpty) {
       // remove the existing cache as new cache values may be added.
@@ -92,6 +93,8 @@ object PartitionCacheManager extends Cache[PartitionCacheKey,
         cacheObject,
         cacheObject.getMemorySize,
         identifier.expirationTime)
+    } else if (invalidSegmentMap != null && invalidSegmentMap.nonEmpty) {
+      CACHE.remove(identifier.tableId)
     }
     finalCache.values.flatMap(_._1).toList.asJava
   }
@@ -112,14 +115,18 @@ object PartitionCacheManager extends Cache[PartitionCacheKey,
 
   private def readPartition(identifier: PartitionCacheKey, segmentFilePath: String) = {
     val segmentFile = SegmentFileStore.readSegmentFile(segmentFilePath)
+    val partitionPath = new mutable.StringBuilder()
+    var partitionSpec: Map[String, String] = Map()
     segmentFile.getLocationMap.values().asScala
-      .flatMap(_.getPartitions.asScala).toSet.map { uniquePartition: String =>
+      .flatMap(_.getPartitions.asScala).toSet.foreach { uniquePartition: String =>
+      partitionPath.append(CarbonCommonConstants.FILE_SEPARATOR).append(uniquePartition)
       val partitionSplit = uniquePartition.split("=")
-      val storageFormat = CatalogStorageFormat(
-        Some(new URI(identifier.tablePath + "/" + uniquePartition)),
-        None, None, None, compressed = false, Map())
-      CatalogTablePartition(Map(partitionSplit(0) -> partitionSplit(1)), storageFormat)
-    }.toSeq
+      partitionSpec = partitionSpec. +(partitionSplit(0) -> partitionSplit(1))
+    }
+    Seq(CatalogTablePartition(partitionSpec,
+      CatalogStorageFormat(
+        Some(new URI(identifier.tablePath + partitionPath)),
+        None, None, None, compressed = false, Map())))
   }
 
   override def put(key: PartitionCacheKey,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 128274e..6ab5e51 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -618,6 +618,21 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "true")
   }
 
+  test("test partition cache on multiple columns") {
+    CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "false")
+    sql("drop table if exists partition_cache")
+    sql("create table partition_cache(a string) partitioned by(b int, c String) stored as
carbondata")
+    sql("insert into partition_cache select 'k',1,'nihal'")
+    checkAnswer(sql("select count(*) from partition_cache where b = 1"), Seq(Row(1)))
+    sql("select * from partition_cache where b = 1").collect()
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "partition_cache")
+    val partitionSpecs: util.List[CatalogTablePartition] = PartitionCacheManager.getIfPresent(
+      PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, 1L))
+    assert(partitionSpecs.size == 1)
+    assert(partitionSpecs.get(0).spec.size == 2)
+    CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "true")
+  }
+
   test("test partition caching after load") {
     CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "false")
     sql("drop table if exists partition_cache")


Mime
View raw message