carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: fix memory issue for dataloading
Date Fri, 17 Feb 2017 14:09:09 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master b67760184 -> a6c8d2a79


fix memory issue for dataloading

fix comment

fix comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/c5aba5f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/c5aba5f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/c5aba5f5

Branch: refs/heads/master
Commit: c5aba5f518a8ea5d5907273c1782df4f9015746a
Parents: b677601
Author: QiangCai <qiangcai@qq.com>
Authored: Thu Feb 9 00:22:03 2017 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri Feb 17 19:37:32 2017 +0530

----------------------------------------------------------------------
 conf/carbon.properties.template                 |  4 ++-
 .../core/constants/CarbonCommonConstants.java   |  9 ++++--
 .../carbondata/core/util/CarbonProperties.java  | 21 +++++++++++++
 .../processing/csvload/CSVInputFormat.java      | 12 ++++++++
 .../newflow/AbstractDataLoadProcessorStep.java  |  7 ++---
 .../processing/newflow/row/CarbonRowBatch.java  | 31 ++++++++++++++------
 .../sort/impl/ParallelReadMergeSorterImpl.java  | 12 ++++----
 ...arallelReadMergeSorterWithBucketingImpl.java |  7 ++---
 .../impl/UnsafeParallelReadMergeSorterImpl.java |  7 ++---
 .../holder/UnsafeSortTempFileChunkHolder.java   |  4 ++-
 .../steps/DataConverterProcessorStepImpl.java   |  7 ++---
 ...ConverterProcessorWithBucketingStepImpl.java |  7 ++---
 .../steps/DataWriterProcessorStepImpl.java      |  5 ++--
 .../newflow/steps/DummyClassForTest.java        |  5 ++--
 .../newflow/steps/InputProcessorStepImpl.java   |  3 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |  5 ++--
 .../sortandgroupby/sortdata/SortParameters.java | 10 +++++--
 .../sortdata/SortTempFileChunkHolder.java       |  6 +++-
 18 files changed, 111 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/conf/carbon.properties.template
----------------------------------------------------------------------
diff --git a/conf/carbon.properties.template b/conf/carbon.properties.template
index d7356a6..9b85c75 100644
--- a/conf/carbon.properties.template
+++ b/conf/carbon.properties.template
@@ -113,4 +113,6 @@ carbon.enable.quick.filter=false
 ##The property to set the date to be considered as start date for calculating the timestamp.
 #carbon.cutOffTimestamp=2000-01-01 00:00:00
 ##The property to set the timestamp (ie milis) conversion to the SECOND, MINUTE, HOUR or
DAY level.
-#carbon.timegranularity=SECOND
\ No newline at end of file
+#carbon.timegranularity=SECOND
+##the number of prefetched rows in sort step
+#carbon.prefetch.buffersize=1000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 437144c..f1faa57 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -267,7 +267,7 @@ public final class CarbonCommonConstants {
   /**
    * SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE
    */
-  public static final String CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE = "50000";
+  public static final String CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE = "16384";
   /**
    * Number of cores to be used while loading
    */
@@ -638,10 +638,15 @@ public final class CarbonCommonConstants {
    * for dimensions , one of ignore dictionary dimensions , one for measures.
    */
   public static final int ARRAYSIZE = 3;
+
   /**
    * CARBON_PREFETCH_BUFFERSIZE
    */
-  public static final int CARBON_PREFETCH_BUFFERSIZE = 20000;
+  public static final String CARBON_PREFETCH_BUFFERSIZE = "carbon.prefetch.buffersize";
+  /**
+   * CARBON_PREFETCH_BUFFERSIZE DEFAULT VALUE
+   */
+  public static final String CARBON_PREFETCH_BUFFERSIZE_DEFAULT = "1000";
   /**
    * CARBON_PREFETCH_IN_MERGE
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 8f5ad25..962d352 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -84,6 +84,27 @@ public final class CarbonProperties {
     validateHighCardinalityInRowCountPercentage();
     validateCarbonDataFileVersion();
     validateExecutorStartUpTime();
+    validatePrefetchBufferSize();
+  }
+
+  private void validatePrefetchBufferSize() {
+    String prefetchBufferSizeStr =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
+
+    if (null == prefetchBufferSizeStr || prefetchBufferSizeStr.length() == 0) {
+      carbonProperties.setProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+          CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT);
+    } else {
+      try {
+        Integer.parseInt(prefetchBufferSizeStr);
+      } catch (NumberFormatException e) {
+        LOGGER.info("The prefetch buffer size value \"" + prefetchBufferSizeStr
+            + "\" is invalid. Using the default value \""
+            + CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT + "\"");
+        carbonProperties.setProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+            CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT);
+      }
+    }
   }
 
   private void validateBadRecordsLocation() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
index f38175d..269a127 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
@@ -231,6 +231,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (csvParser == null) {
+        return false;
+      }
       columns = csvParser.parseNext();
       if (columns == null) {
         value = null;
@@ -275,9 +278,18 @@ public class CSVInputFormat extends FileInputFormat<NullWritable,
StringArrayWri
         if (reader != null) {
           reader.close();
         }
+        if (boundedInputStream != null) {
+          boundedInputStream.close();
+        }
       } finally {
+        reader = null;
+        boundedInputStream = null;
+        csvParser = null;
+        filePosition = null;
+        value = null;
         if (decompressor != null) {
           CodecPool.returnDecompressor(decompressor);
+          decompressor = null;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
index 18d6aeb..21030cb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
@@ -128,10 +128,9 @@ public abstract class AbstractDataLoadProcessorStep {
    * @return processed row.
    */
   protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
-    CarbonRowBatch newBatch = new CarbonRowBatch();
-    Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
-    while (batchIterator.hasNext()) {
-      newBatch.addRow(processRow(batchIterator.next()));
+    CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+    while (rowBatch.hasNext()) {
+      newBatch.addRow(processRow(rowBatch.next()));
     }
     return newBatch;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
index 941b51d..3c6a7d3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
@@ -17,27 +17,40 @@
 
 package org.apache.carbondata.processing.newflow.row;
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 
 /**
  * Batch of rows.
  */
-public class CarbonRowBatch {
+public class CarbonRowBatch implements Iterator<CarbonRow> {
 
-  private List<CarbonRow> rowBatch = new ArrayList<>();
+  private CarbonRow[] rowBatch;
 
-  public void addRow(CarbonRow carbonRow) {
-    rowBatch.add(carbonRow);
+  private int size = 0;
+
+  private int index = 0;
+
+  public CarbonRowBatch(int batchSize) {
+    this.rowBatch = new CarbonRow[batchSize];
   }
 
-  public Iterator<CarbonRow> getBatchIterator() {
-    return rowBatch.iterator();
+  public void addRow(CarbonRow carbonRow) {
+    rowBatch[size++] = carbonRow;
   }
 
   public int getSize() {
-    return rowBatch.size();
+    return size;
+  }
+
+  @Override public boolean hasNext() {
+    return index < size;
+  }
+
+  @Override public CarbonRow next() {
+    return rowBatch[index++];
   }
 
+  @Override public void remove() {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 16c5122..8c04f8f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -111,6 +111,7 @@ public class ParallelReadMergeSorterImpl implements Sorter {
     }
     try {
       intermediateFileMerger.finish();
+      intermediateFileMerger = null;
       finalMerger.startFinalMerge();
     } catch (CarbonDataWriterException e) {
       throw new CarbonDataLoadingException(e);
@@ -129,7 +130,7 @@ public class ParallelReadMergeSorterImpl implements Sorter {
       @Override
       public CarbonRowBatch next() {
         int counter = 0;
-        CarbonRowBatch rowBatch = new CarbonRowBatch();
+        CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
         while (finalMerger.hasNext() && counter < batchSize) {
           rowBatch.addRow(new CarbonRow(finalMerger.next()));
           counter++;
@@ -141,7 +142,9 @@ public class ParallelReadMergeSorterImpl implements Sorter {
   }
 
   @Override public void close() {
-    intermediateFileMerger.close();
+    if (intermediateFileMerger != null) {
+      intermediateFileMerger.close();
+    }
   }
 
   /**
@@ -200,10 +203,9 @@ public class ParallelReadMergeSorterImpl implements Sorter {
       try {
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
-          Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
           int i = 0;
-          while (batchIterator.hasNext()) {
-            CarbonRow row = batchIterator.next();
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
             if (row != null) {
               buffer[i++] = row.getData();
             }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 245302f..813d83d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -209,10 +209,9 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter
{
       try {
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
-          Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
           int i = 0;
-          while (batchIterator.hasNext()) {
-            CarbonRow row = batchIterator.next();
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
             if (row != null) {
               SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
               synchronized (sortDataRow) {
@@ -257,7 +256,7 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter
{
 
     @Override public CarbonRowBatch next() {
       int counter = 0;
-      CarbonRowBatch rowBatch = new CarbonRowBatch();
+      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
       while (finalMerger.hasNext() && counter < batchSize) {
         rowBatch.addRow(new CarbonRow(finalMerger.next()));
         counter++;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index d40b763..18cf314 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -124,7 +124,7 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter {
 
       @Override public CarbonRowBatch next() {
         int counter = 0;
-        CarbonRowBatch rowBatch = new CarbonRowBatch();
+        CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
         while (finalMerger.hasNext() && counter < batchSize) {
           rowBatch.addRow(new CarbonRow(finalMerger.next()));
           counter++;
@@ -194,10 +194,9 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter {
       try {
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
-          Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
           int i = 0;
-          while (batchIterator.hasNext()) {
-            CarbonRow row = batchIterator.next();
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
             if (row != null) {
               buffer[i++] = row.getData();
             }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index a9a63d3..0c89821 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -167,7 +167,9 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder
{
     prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
             CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
-    bufferSize = CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE;
+    bufferSize = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+            CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
     this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
             CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 1a6535f..275f017 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -89,10 +89,9 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
    * @return processed row.
    */
   protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter)
{
-    CarbonRowBatch newBatch = new CarbonRowBatch();
-    Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
-    while (batchIterator.hasNext()) {
-      newBatch.addRow(localConverter.convert(batchIterator.next()));
+    CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+    while (rowBatch.hasNext()) {
+      newBatch.addRow(localConverter.convert(rowBatch.next()));
     }
     rowCounter.getAndAdd(newBatch.getSize());
     return newBatch;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index 0223b04..fef4aaa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -113,10 +113,9 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
    * @return processed row.
    */
   protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter)
{
-    CarbonRowBatch newBatch = new CarbonRowBatch();
-    Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
-    while (batchIterator.hasNext()) {
-      CarbonRow next = batchIterator.next();
+    CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+    while (rowBatch.hasNext()) {
+      CarbonRow next = rowBatch.next();
       CarbonRow convertRow = localConverter.convert(next);
       convertRow.bucketNumber = (short) partitioner.getPartition(next.getData());
       newBatch.addRow(convertRow);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index 710cc4f..dfc03b9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -179,10 +179,9 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
   private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
       throws CarbonDataLoadingException {
-    Iterator<CarbonRow> iterator = batch.getBatchIterator();
     try {
-      while (iterator.hasNext()) {
-        CarbonRow row = iterator.next();
+      while (batch.hasNext()) {
+        CarbonRow row = batch.next();
         readCounter++;
         Object[] outputRow;
         // adding one for the high cardinality dims byte array.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
index a7d8e7f..8b8000b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
@@ -89,9 +89,8 @@ class DummyThread implements Callable<Void> {
     try {
       while (iterator.hasNext()) {
         CarbonRowBatch batch = iterator.next();
-        Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
-        while (batchIterator.hasNext()) {
-          CarbonRow row = batchIterator.next();
+        while (batch.hasNext()) {
+          CarbonRow row = batch.next();
           // do nothing
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
index 0097690..7c50a10 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
@@ -175,6 +175,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
       boolean hasNext = currentIterator.hasNext();
       // If iterator is finished then check for next iterator.
       if (!hasNext) {
+        currentIterator.close();
         // Check next iterator is available in the list.
         if (counter < inputIterators.size()) {
           // Get the next iterator from the list.
@@ -228,7 +229,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
     private CarbonRowBatch getBatch() {
       // Create batch and fill it.
-      CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
+      CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
       int count = 0;
       while (internalHasNext() && count < batchSize) {
         carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 1384ec9..a29b426 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -149,11 +149,12 @@ public class SortDataRows {
     // if record holder list size is equal to sort buffer size then it will
     // sort the list and then write current list data to file
     synchronized (addRowsLock) {
+      int sizeLeft = 0;
       if (entryCount + size >= sortBufferSize) {
         LOGGER.debug("************ Writing to temp file ********** ");
         intermediateFileMerger.startMergingIfPossible();
         Object[][] recordHolderListLocal = recordHolderList;
-        int sizeLeft = sortBufferSize - entryCount ;
+        sizeLeft = sortBufferSize - entryCount ;
         if (sizeLeft > 0) {
           System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
         }
@@ -172,7 +173,7 @@ public class SortDataRows {
           return;
         }
       }
-      System.arraycopy(rowBatch, 0, recordHolderList, entryCount, size);
+      System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
       entryCount += size;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 39a9bc3..dc40efe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -425,7 +425,9 @@ public class SortParameters {
     }
 
     parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
-    parameters.setBufferSize(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
+    parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
+        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
     char[] aggType = CarbonDataProcessorUtil
         .getAggType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
@@ -525,8 +527,10 @@ public class SortParameters {
       LOGGER.info("Compression will be used for writing the sort temp File");
     }
 
-    parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
-    parameters.setBufferSize(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
+    parameters.setPrefetch(CarbonCommonConstants. CARBON_PREFETCH_IN_MERGE_VALUE);
+    parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
+        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
     char[] aggType = CarbonDataProcessorUtil
         .getAggType(parameters.getMeasureColCount(), parameters.getDatabaseName(),

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index f088e75..416a445 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -178,7 +178,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
             CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
-    bufferSize = CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE;
+    bufferSize = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+            CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
     this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
             CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
@@ -464,6 +466,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   public void closeStream() {
     CarbonUtil.closeStreams(stream);
     executorService.shutdown();
+    this.backupBuffer = null;
+    this.currentBuffer = null;
   }
 
   /**


Mime
View raw message