carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [43/50] carbondata git commit: [CARBONDATA-2777] Fixed: NonTransactional tables, Select count(*) is not giving latest results for incremental load with same segment ID (UUID)
Date Mon, 30 Jul 2018 18:43:09 GMT
[CARBONDATA-2777] Fixed: NonTransactional tables, Select count(*) is not giving latest results
for incremental load with same segment ID (UUID)

problem:
[CARBONDATA-2777] NonTransactional tables, Select count(*) is not giving latest results for
incremental load with same segment ID (UUID)

solution:
For NonTransactional tables, segments need to be refreshed if the segment latest timestamp
got changed by the incremental load for count(*) flow also. This change is already present
in select * flow and missed here.

This closes #2547


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e23aea77
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e23aea77
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e23aea77

Branch: refs/heads/branch-1.4
Commit: e23aea776acac18e300e6d4d71612a6f8158d2a4
Parents: e698628
Author: ajantha-bhat <ajanthabhat@gmail.com>
Authored: Tue Jul 24 15:49:02 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Jul 31 00:11:26 2018 +0530

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableInputFormat.java      | 21 +++++++++++++++++++-
 .../TestNonTransactionalCarbonTable.scala       | 18 +++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e23aea77/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index bd6b775..f53a1d7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -590,7 +590,26 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T>
{
     // TODO: currently only batch segment is supported, add support for streaming table
     List<Segment> filteredSegment =
         getFilteredSegment(job, allSegments.getValidSegments(), false, readCommittedScope);
-
+    /* In the select * flow, getSplits() method was clearing the segmentMap if,
+    segment needs refreshing. same thing need for select count(*) flow also.
+    For NonTransactional table, one of the reason for a segment refresh is below scenario.
+    SDK is written one set of files with UUID, with same UUID it can write again.
+    So, latest files content should reflect the new count by refreshing the segment */
+    List<Segment> toBeCleanedSegments = new ArrayList<>();
+    for (Segment eachSegment : filteredSegment) {
+      boolean refreshNeeded = DataMapStoreManager.getInstance()
+          .getTableSegmentRefresher(getOrCreateCarbonTable(job.getConfiguration()))
+          .isRefreshNeeded(eachSegment,
+              updateStatusManager.getInvalidTimestampRange(eachSegment.getSegmentNo()));
+      if (refreshNeeded) {
+        toBeCleanedSegments.add(eachSegment);
+      }
+    }
+    if (toBeCleanedSegments.size() > 0) {
+      DataMapStoreManager.getInstance()
+          .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
+              toBeCleanedSegments);
+    }
     List<ExtendedBlocklet> blocklets =
         blockletMap.prune(filteredSegment, null, partitions);
     for (ExtendedBlocklet blocklet : blocklets) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e23aea77/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index a96c258..c7d9caa 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -350,6 +350,24 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll
{
     cleanTestData()
   }
 
+  test("test count star with multiple loads files with same schema and UUID") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildTestDataWithSameUUID(3, false, null, List("name"))
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql(s"""select count(*) from sdkOutputTable """), Seq(Row(3)))
+    buildTestDataWithSameUUID(3, false, null, List("name"))
+    // should reflect new count
+    checkAnswer(sql(s"""select count(*) from sdkOutputTable """), Seq(Row(6)))
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
   test("test create external table with sort columns") {
     buildTestDataWithSortColumns(List("age","name"))
     assert(new File(writerPath).exists())


Mime
View raw message