carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kumarvisha...@apache.org
Subject carbondata git commit: [CARBONDATA-2895] Fix Query result count is more than actual csv rows with Batch-sort in save to disk (sort temp files) scenario
Date Wed, 05 Sep 2018 15:01:24 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 94d2089b2 -> 50248f51b


[CARBONDATA-2895] Fix Query result count is more than actual csv rows with Batch-sort in save
to disk (sort temp files) scenario

probelm: Query result mismatch with Batch-sort in save to disk (sort
temp files) scenario.

scenario:
a) Configure batchsort but give batch size more than
UnsafeMemoryManager.INSTANCE.getUsableMemory().
b) Load data that is greater than batch size. Observe that
unsafeMemoryManager save to disk happened as it cannot process one
batch.
c) so load happens in 2 batch.
d) When query the results. There result data rows is more than expected
data rows.

root cause:

For each batch, createSortDataRows() will be called.
Files saved to disk during sorting of previous batch was considered for
this batch.

solution:
Files saved to disk during sorting of previous batch ,should not be
considered for this batch.
Hence use batchID as rangeID field of sorttempfiles.
So getFilesToMergeSort() will select files of only this batch.

This closes #2664


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/50248f51
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/50248f51
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/50248f51

Branch: refs/heads/master
Commit: 50248f51bcaf44f37429d2420c6ecf5c815c3770
Parents: 94d2089
Author: ajantha-bhat <ajanthabhat@gmail.com>
Authored: Mon Aug 27 20:55:03 2018 +0530
Committer: kumarvishal09 <kumarvishal1802@gmail.com>
Committed: Wed Sep 5 20:30:59 2018 +0530

----------------------------------------------------------------------
 .../impl/UnsafeBatchParallelReadMergeSorterImpl.java | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/50248f51/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 5cb099e..1b1d383 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -62,12 +62,17 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
   private AtomicLong rowCounter;
 
+  /* will be incremented for each batch. This ID is used in sort temp files name,
+   to identify files of that batch */
+  private AtomicInteger batchId;
+
   public UnsafeBatchParallelReadMergeSorterImpl(AtomicLong rowCounter) {
     this.rowCounter = rowCounter;
   }
 
   @Override public void initialize(SortParameters sortParameters) {
     this.sortParameters = sortParameters;
+    batchId = new AtomicInteger(0);
 
   }
 
@@ -172,7 +177,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
   }
 
-  private static class SortBatchHolder
+  private class SortBatchHolder
       extends CarbonIterator<UnsafeSingleThreadFinalSortFilesMerger> {
 
     private SortParameters sortParameters;
@@ -193,7 +198,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
     private final Object lock = new Object();
 
-    public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
+    SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
         ThreadStatusObserver threadStatusObserver) {
       this.sortParameters = sortParameters.getCopy();
       this.iteratorCount = new AtomicInteger(numberOfThreads);
@@ -203,6 +208,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
     }
 
     private void createSortDataRows() {
+      // For each batch, createSortDataRows() will be called.
+      // Files saved to disk during sorting of previous batch,should not be considered
+      // for this batch.
+      // Hence use batchID as rangeID field of sorttempfiles.
+      // so getFilesToMergeSort() will select only this batch files.
+      this.sortParameters.setRangeId(batchId.incrementAndGet());
       int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
       setTempLocation(sortParameters);
       this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,


Mime
View raw message