carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: Optimize data loading
Date Fri, 02 Dec 2016 08:08:41 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master f47bbc2c2 -> e7e370cac


Optimize data loading

Handled broadcast fails.

Updated as per comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/63434fac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/63434fac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/63434fac

Branch: refs/heads/master
Commit: 63434fac5f4dc2d7eb9d03819401e243744a5f48
Parents: f47bbc2
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Sun Nov 27 17:39:36 2016 +0530
Committer: jackylk <jacky.likun@huawei.com>
Committed: Fri Dec 2 15:52:02 2016 +0800

----------------------------------------------------------------------
 .../carbondata/common/CarbonIterator.java       |  14 +++
 .../AbstractDetailQueryResultIterator.java      |   2 +-
 .../carbondata/hadoop/csv/CSVInputFormat.java   |  42 +++++---
 .../recorditerator/RecordReaderIterator.java    |  31 +++++-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  40 ++++---
 .../processing/iterator/CarbonIterator.java     |  38 -------
 .../processing/newflow/DataLoadExecutor.java    |   5 +-
 .../newflow/DataLoadProcessBuilder.java         |   8 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |  90 ++++++++--------
 .../newflow/steps/InputProcessorStepImpl.java   | 105 ++++++++++++++-----
 .../sortandgroupby/sortdata/SortDataRows.java   |  40 +++++++
 11 files changed, 268 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java b/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java
index 9141bcd..b1a5b5a 100644
--- a/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java
+++ b/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java
@@ -35,4 +35,18 @@ public abstract class CarbonIterator<E> implements Iterator<E>
{
     throw new UnsupportedOperationException("remove");
   }
 
+  /**
+   * Initialize the iterator
+   */
+  public void initialize() {
+    // sub classes can overwrite to provide initialize logic to this method
+  }
+
+  /**
+   * Close the resources
+   */
+  public void close() {
+    // sub classes can overwrite to provide close logic to this method
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
index c8c61b0..07ccab4 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -114,7 +114,7 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator
{
       DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize());
       DataRefNode startDataBlock = finder
           .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
-      while (startDataBlock.nodeNumber() != blockInfo.getStartBlockletIndex()) {
+      while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
         startDataBlock = startDataBlock.getNextDataRefNode();
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
index 3ea96ac..ca27673 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
@@ -66,6 +66,8 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
   public static final String ESCAPE_DEFAULT = "\\";
   public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
   public static final boolean HEADER_PRESENT_DEFAULT = false;
+  public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
+  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
 
   @Override
   public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit
inputSplit,
@@ -85,10 +87,10 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
 
   /**
    * Sets the comment char to configuration. Default it is #.
-   * @param commentChar
    * @param configuration
+   * @param commentChar
    */
-  public static void setCommentCharacter(String commentChar, Configuration configuration)
{
+  public static void setCommentCharacter(Configuration configuration, String commentChar)
{
     if (commentChar != null && !commentChar.isEmpty()) {
       configuration.set(COMMENT, commentChar);
     }
@@ -96,10 +98,10 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
 
   /**
    * Sets the delimiter to configuration. Default it is ','
-   * @param delimiter
    * @param configuration
+   * @param delimiter
    */
-  public static void setCSVDelimiter(String delimiter, Configuration configuration) {
+  public static void setCSVDelimiter(Configuration configuration, String delimiter) {
     if (delimiter != null && !delimiter.isEmpty()) {
       configuration.set(DELIMITER, delimiter);
     }
@@ -107,10 +109,10 @@ public class CSVInputFormat extends FileInputFormat<NullWritable,
StringArrayWri
 
   /**
    * Sets the escape character to configuration. Default it is \
-   * @param escapeCharacter
    * @param configuration
+   * @param escapeCharacter
    */
-  public static void setEscapeCharacter(String escapeCharacter, Configuration configuration)
{
+  public static void setEscapeCharacter(Configuration configuration, String escapeCharacter)
{
     if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
       configuration.set(ESCAPE, escapeCharacter);
     }
@@ -118,26 +120,37 @@ public class CSVInputFormat extends FileInputFormat<NullWritable,
StringArrayWri
 
   /**
    * Whether header needs to read from csv or not. By default it is false.
-   * @param headerExtractEnable
    * @param configuration
+   * @param headerExtractEnable
    */
-  public static void setHeaderExtractionEnabled(boolean headerExtractEnable,
-      Configuration configuration) {
+  public static void setHeaderExtractionEnabled(Configuration configuration,
+      boolean headerExtractEnable) {
     configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
   }
 
   /**
    * Sets the quote character to configuration. Default it is "
-   * @param quoteCharacter
    * @param configuration
+   * @param quoteCharacter
    */
-  public static void setQuoteCharacter(String quoteCharacter, Configuration configuration)
{
+  public static void setQuoteCharacter(Configuration configuration, String quoteCharacter)
{
     if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
       configuration.set(QUOTE, quoteCharacter);
     }
   }
 
   /**
+   * Sets the read buffer size to configuration.
+   * @param configuration
+   * @param bufferSize
+   */
+  public static void setReadBufferSize(Configuration configuration, String bufferSize) {
+    if (bufferSize != null && !bufferSize.isEmpty()) {
+      configuration.set(READ_BUFFER_SIZE, bufferSize);
+    }
+  }
+
+  /**
    * Treats value as line in file. Key is null.
    */
   public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable>
{
@@ -163,8 +176,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
       Configuration job = context.getConfiguration();
       CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
       FileSystem fs = file.getFileSystem(job);
-      FSDataInputStream fileIn = fs.open(file);
-      InputStream inputStream = null;
+      int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT));
+      FSDataInputStream fileIn = fs.open(file, bufferSize);
+      InputStream inputStream;
       if (codec != null) {
         isCompressedInput = true;
         decompressor = CodecPool.getDecompressor(codec);
@@ -209,6 +223,8 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
       parserSettings.setIgnoreLeadingWhitespaces(false);
       parserSettings.setIgnoreTrailingWhitespaces(false);
       parserSettings.setSkipEmptyLines(false);
+      // TODO get from csv file.
+      parserSettings.setMaxColumns(1000);
       parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
       parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
       if (start == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
index 478af0a..a1bc1e9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
@@ -16,12 +16,16 @@
  */
 package org.apache.carbondata.hadoop.csv.recorditerator;
 
+import java.io.IOException;
+
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.hadoop.io.StringArrayWritable;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
  * It is wrapper iterator around @{@link RecordReader}.
@@ -38,8 +42,15 @@ public class RecordReaderIterator extends CarbonIterator<Object []>
{
    */
   private boolean isConsumed;
 
-  public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader)
{
+  private InputSplit split;
+
+  private TaskAttemptContext context;
+
+  public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader,
+      InputSplit split, TaskAttemptContext context) {
     this.recordReader = recordReader;
+    this.split = split;
+    this.context = context;
   }
 
   @Override
@@ -65,4 +76,22 @@ public class RecordReaderIterator extends CarbonIterator<Object []>
{
       throw new CarbonDataLoadingException(e);
     }
   }
+
+  @Override
+  public void initialize() {
+    try {
+      recordReader.initialize(split, context);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      recordReader.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 67d1ce0..44a2416 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -29,18 +29,18 @@ import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableCon
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
+import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.hadoop.csv.CSVInputFormat
 import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.DataLoadExecutor
 import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
 import org.apache.carbondata.spark.DataLoadResult
-import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -133,9 +133,13 @@ class NewCarbonDataLoadRDD[K, V](
           throw e
       }
 
-      def getInputIterators: Array[util.Iterator[Array[AnyRef]]] = {
+      def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
         val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, theSplit.index,
0)
-        val configuration: Configuration = confBroadcast.value.value
+        var configuration: Configuration = confBroadcast.value.value
+        // Broadcast fails in some cases WTF??
+        if (configuration == null) {
+          configuration = new Configuration()
+        }
         configureCSVInputFormat(configuration)
         val hadoopAttemptContext = newTaskAttemptContext(configuration, attemptId)
         val format = new CSVInputFormat
@@ -160,10 +164,11 @@ class NewCarbonDataLoadRDD[K, V](
               partitionID, split.partitionBlocksDetail.length)
           val readers =
           split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
-          readers.zipWithIndex.foreach { case (reader, index) =>
-            reader.initialize(split.partitionBlocksDetail(index), hadoopAttemptContext)
+          readers.zipWithIndex.map { case (reader, index) =>
+            new RecordReaderIterator(reader,
+              split.partitionBlocksDetail(index),
+              hadoopAttemptContext)
           }
-          readers.map(new RecordReaderIterator(_))
         } else {
           // for node partition
           val split = theSplit.asInstanceOf[CarbonNodePartition]
@@ -185,21 +190,22 @@ class NewCarbonDataLoadRDD[K, V](
           StandardLogService.setThreadName(blocksID, null)
           val readers =
             split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
-          readers.zipWithIndex.foreach { case (reader, index) =>
-            reader.initialize(split.nodeBlocksDetail(index), hadoopAttemptContext)
+          readers.zipWithIndex.map { case (reader, index) =>
+            new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
           }
-          readers.map(new RecordReaderIterator(_))
         }
       }
 
       def configureCSVInputFormat(configuration: Configuration): Unit = {
-        CSVInputFormat.setCommentCharacter(carbonLoadModel.getCommentChar, configuration)
-        CSVInputFormat.setCSVDelimiter(carbonLoadModel.getCsvDelimiter, configuration)
-        CSVInputFormat.setEscapeCharacter(carbonLoadModel.getEscapeChar, configuration)
-        CSVInputFormat.setHeaderExtractionEnabled(
-          carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty,
-          configuration)
-        CSVInputFormat.setQuoteCharacter(carbonLoadModel.getQuoteChar, configuration)
+        CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
+        CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
+        CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
+        CSVInputFormat.setHeaderExtractionEnabled(configuration,
+          carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
+        CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
+        CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
       }
 
       /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java
b/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java
deleted file mode 100644
index 35bba6f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.processing.iterator;
-
-public interface CarbonIterator<E> {
-  /**
-   * Returns <tt>true</tt> if the iteration has more elements. (In other
-   * words, returns <tt>true</tt> if <tt>next</tt> would return an
element
-   * rather than throwing an exception.)
-   *
-   * @return <tt>true</tt> if the iterator has more elements.
-   */
-  boolean hasNext();
-
-  /**
-   * Returns the next element in the iteration.
-   *
-   * @return the next element in the iteration.
-   */
-  E next();
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
index 746e0f2..fc24aa8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
@@ -16,8 +16,7 @@
  */
 package org.apache.carbondata.processing.newflow;
 
-import java.util.Iterator;
-
+import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
@@ -35,7 +34,7 @@ public class DataLoadExecutor {
       LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
 
   public void execute(CarbonLoadModel loadModel, String storeLocation,
-      Iterator<Object[]>[] inputIterators) throws Exception {
+      CarbonIterator<Object[]>[] inputIterators) throws Exception {
     AbstractDataLoadProcessorStep loadProcessorStep = null;
     try {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 92c677c..a5388d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -18,13 +18,14 @@ package org.apache.carbondata.processing.newflow;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
@@ -50,7 +51,7 @@ public final class DataLoadProcessBuilder {
       LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName());
 
   public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
-      Iterator[] inputIterators) throws Exception {
+      CarbonIterator[] inputIterators) throws Exception {
     CarbonDataLoadConfiguration configuration =
         createConfiguration(loadModel, storeLocation);
     // 1. Reads the data input iterators and parses the data.
@@ -133,6 +134,9 @@ public final class DataLoadProcessBuilder {
         loadModel.getBadRecordsAction().split(",")[1]);
     configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
         loadModel.getFactFilePath());
+    if(CarbonMetadata.getInstance().getCarbonTable(carbonTable.getTableUniqueName()) == null)
{
+      CarbonMetadata.getInstance().addCarbonTable(carbonTable);
+    }
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
     List<CarbonMeasure> measures =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index cd487ec..e2e995c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
@@ -90,25 +91,23 @@ public class ParallelReadMergeSorterImpl implements Sorter {
   @Override
   public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
       throws CarbonDataLoadingException {
-    SortDataRows[] sortDataRows = new SortDataRows[iterators.length];
+    SortDataRows sortDataRow = new SortDataRows(sortParameters, intermediateFileMerger);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
     try {
-      for (int i = 0; i < iterators.length; i++) {
-        sortDataRows[i] = new SortDataRows(sortParameters, intermediateFileMerger);
-        // initialize sort
-        sortDataRows[i].initialize();
-      }
+      sortDataRow.initialize();
     } catch (CarbonSortKeyAndGroupByException e) {
       throw new CarbonDataLoadingException(e);
     }
     this.executorService = Executors.newFixedThreadPool(iterators.length);
 
     try {
-      for (int i = 0; i < sortDataRows.length; i++) {
+      for (int i = 0; i < iterators.length; i++) {
         executorService.submit(
-            new SortIteratorThread(iterators[i], sortDataRows[i], sortParameters));
+            new SortIteratorThread(iterators[i], sortDataRow, sortParameters, batchSize));
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRow, sortParameters);
     } catch (Exception e) {
       throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
     }
@@ -121,9 +120,6 @@ public class ParallelReadMergeSorterImpl implements Sorter {
       throw new CarbonDataLoadingException(e);
     }
 
-    //TODO get the batch size from CarbonProperties
-    final int batchSize = 1000;
-
     // Creates the iterator to read from merge sorter.
     Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>()
{
 
@@ -151,6 +147,36 @@ public class ParallelReadMergeSorterImpl implements Sorter {
   }
 
   /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      // start sorting
+      sortDataRows.startSorting();
+
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
+              System.currentTimeMillis());
+      return false;
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  /**
    * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
    */
   private static class SortIteratorThread implements Callable<Void> {
@@ -161,11 +187,14 @@ public class ParallelReadMergeSorterImpl implements Sorter {
 
     private SortParameters parameters;
 
+    private Object[][] buffer;
+
     public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows,
-        SortParameters parameters) {
+        SortParameters parameters, int batchSize) {
       this.iterator = iterator;
       this.sortDataRows = sortDataRows;
       this.parameters = parameters;
+      this.buffer = new Object[batchSize][];
     }
 
     @Override
@@ -174,15 +203,17 @@ public class ParallelReadMergeSorterImpl implements Sorter {
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
           Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
+          int i = 0;
           while (batchIterator.hasNext()) {
             CarbonRow row = batchIterator.next();
             if (row != null) {
-              sortDataRows.addRow(row.getData());
+              buffer[i++] = row.getData();
             }
           }
+          if (i > 0) {
+            sortDataRows.addRowBatch(buffer, i);
+          }
         }
-
-        processRowToNextStep(sortDataRows);
       } catch (Exception e) {
         LOGGER.error(e);
         throw new CarbonDataLoadingException(e);
@@ -190,34 +221,5 @@ public class ParallelReadMergeSorterImpl implements Sorter {
       return null;
     }
 
-    /**
-     * Below method will be used to process data to next step
-     */
-    private boolean processRowToNextStep(SortDataRows sortDataRows)
-        throws CarbonDataLoadingException {
-      if (null == sortDataRows) {
-        LOGGER.info("Record Processed For table: " + parameters.getTableName());
-        LOGGER.info("Number of Records was Zero");
-        String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
-        LOGGER.info(logMessage);
-        return false;
-      }
-
-      try {
-        // start sorting
-        sortDataRows.startSorting();
-
-        // check any more rows are present
-        LOGGER.info("Record Processed For table: " + parameters.getTableName());
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-            .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-            .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
-                System.currentTimeMillis());
-        return false;
-      } catch (CarbonSortKeyAndGroupByException e) {
-        throw new CarbonDataLoadingException(e);
-      }
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
index 69bd84a..b979af6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
@@ -3,6 +3,11 @@ package org.apache.carbondata.processing.newflow.steps;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
@@ -27,33 +32,35 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
   private RowParser rowParser;
 
-  private Iterator<Object[]>[] inputIterators;
+  private CarbonIterator<Object[]>[] inputIterators;
+
+  /**
+   * executor service to execute the query
+   */
+  public ExecutorService executorService;
 
   public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
-      Iterator<Object[]>[] inputIterators) {
+      CarbonIterator<Object[]>[] inputIterators) {
     super(configuration, null);
     this.inputIterators = inputIterators;
   }
 
-  @Override
-  public DataField[] getOutput() {
+  @Override public DataField[] getOutput() {
     return configuration.getDataFields();
   }
 
-  @Override
-  public void initialize() throws CarbonDataLoadingException {
+  @Override public void initialize() throws CarbonDataLoadingException {
     rowParser = new RowParserImpl(getOutput(), configuration);
+    executorService = Executors.newCachedThreadPool();
   }
 
-
-
-  @Override
-  public Iterator<CarbonRowBatch>[] execute() {
+  @Override public Iterator<CarbonRowBatch>[] execute() {
     int batchSize = CarbonProperties.getInstance().getBatchSize();
-    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+    List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
     Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
     for (int i = 0; i < outIterators.length; i++) {
-      outIterators[i] = new InputProcessorIterator(readerIterators[i], rowParser, batchSize);
+      outIterators[i] =
+          new InputProcessorIterator(readerIterators[i], rowParser, batchSize, executorService);
     }
     return outIterators;
   }
@@ -62,14 +69,14 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
    * Partition input iterators equally as per the number of threads.
    * @return
    */
-  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
+  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
     // Get the number of cores configured in property.
     int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
     // Get the minimum of number of cores and iterators size to get the number of parallel
threads
     // to be launched.
     int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
 
-    List<Iterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+    List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
     for (int i = 0; i < parallelThreadNumber; i++) {
       iterators[i] = new ArrayList<>();
     }
@@ -80,20 +87,26 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
     return iterators;
   }
 
-  @Override
-  protected CarbonRow processRow(CarbonRow row) {
+  @Override protected CarbonRow processRow(CarbonRow row) {
     return null;
   }
 
+  @Override public void close() {
+    executorService.shutdown();
+    for (CarbonIterator inputIterator : inputIterators) {
+      inputIterator.close();
+    }
+  }
+
   /**
    * This iterator wraps the list of iterators and it starts iterating the each
    * iterator of the list one by one. It also parse the data while iterating it.
    */
   private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch>
{
 
-    private List<Iterator<Object[]>> inputIterators;
+    private List<CarbonIterator<Object[]>> inputIterators;
 
-    private Iterator<Object[]> currentIterator;
+    private CarbonIterator<Object[]> currentIterator;
 
     private int counter;
 
@@ -101,19 +114,28 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
     private RowParser rowParser;
 
-    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
-        RowParser rowParser, int batchSize) {
+    private Future<CarbonRowBatch> future;
+
+    private ExecutorService executorService;
+
+    private boolean nextBatch;
+
+    public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators,
+        RowParser rowParser, int batchSize, ExecutorService executorService) {
       this.inputIterators = inputIterators;
       this.batchSize = batchSize;
       this.rowParser = rowParser;
       this.counter = 0;
       // Get the first iterator from the list.
       currentIterator = inputIterators.get(counter++);
+      currentIterator.initialize();
+      this.executorService = executorService;
+      this.nextBatch = false;
     }
 
     @Override
     public boolean hasNext() {
-      return internalHasNext();
+      return nextBatch || internalHasNext();
     }
 
     private boolean internalHasNext() {
@@ -124,6 +146,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
         if (counter < inputIterators.size()) {
           // Get the next iterator from the list.
           currentIterator = inputIterators.get(counter++);
+          // Initialize the new iterator
+          currentIterator.initialize();
           hasNext = internalHasNext();
         }
       }
@@ -132,14 +156,39 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
     @Override
     public CarbonRowBatch next() {
-      // Create batch and fill it.
-      CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
-      int count = 0;
-      while (internalHasNext() && count < batchSize) {
-        carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
-        count++;
+      CarbonRowBatch result = null;
+      if (future == null) {
+        future = getCarbonRowBatch();
+      }
+      try {
+        result = future.get();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+      nextBatch = false;
+      if (hasNext()) {
+        nextBatch = true;
+        future = getCarbonRowBatch();
       }
-      return carbonRowBatch;
+
+      return result;
+    }
+
+    private Future<CarbonRowBatch> getCarbonRowBatch() {
+      return executorService.submit(new Callable<CarbonRowBatch>() {
+        @Override public CarbonRowBatch call() throws Exception {
+          // Create batch and fill it.
+          CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
+          int count = 0;
+          while (internalHasNext() && count < batchSize) {
+            carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
+            count++;
+          }
+          return carbonRowBatch;
+        }
+      });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 3a6afc7..7231775 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -74,6 +74,8 @@ public class SortDataRows {
 
   private SortIntermediateFileMerger intermediateFileMerger;
 
+  private final Object addRowsLock = new Object();
+
   public SortDataRows(SortParameters parameters,
       SortIntermediateFileMerger intermediateFileMerger) {
     this.parameters = parameters;
@@ -137,6 +139,44 @@ public class SortDataRows {
   }
 
   /**
+   * This method will be used to add new row
+   *
+   * @param rowBatch new rowBatch
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException
{
+    // if record holder list size is equal to sort buffer size then it will
+    // sort the list and then write current list data to file
+    synchronized (addRowsLock) {
+      if (entryCount + size >= sortBufferSize) {
+        LOGGER.debug("************ Writing to temp file ********** ");
+        intermediateFileMerger.startMergingIfPossible();
+        Object[][] recordHolderListLocal = recordHolderList;
+        int sizeLeft = sortBufferSize - entryCount ;
+        if (sizeLeft > 0) {
+          System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
+        }
+        try {
+          dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(recordHolderListLocal));
+        } catch (Exception e) {
+          LOGGER.error(
+              "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+          throw new CarbonSortKeyAndGroupByException(e);
+        }
+        // create the new holder Array
+        this.recordHolderList = new Object[this.sortBufferSize][];
+        this.entryCount = 0;
+        size = size - sizeLeft;
+        if (size == 0) {
+          return;
+        }
+      }
+      System.arraycopy(rowBatch, 0, recordHolderList, entryCount, size);
+      entryCount += size;
+    }
+  }
+
+  /**
    * Below method will be used to start storing process This method will get
    * all the temp files present in sort temp folder then it will create the
    * record holder heap and then it will read first record from each file and



Mime
View raw message