carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject [04/20] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module
Date Sun, 01 Oct 2017 01:43:20 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
new file mode 100644
index 0000000..40fe8d5
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class UnCompressedTempSortFileWriter extends AbstractTempSortFileWriter {
+
+  /**
+   * UnCompressedTempSortFileWriter
+   *
+   * @param writeBufferSize
+   * @param dimensionCount
+   * @param measureCount
+   */
+  public UnCompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
+      int measureCount, int noDictionaryCount, int writeBufferSize) {
+    super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
+  }
+
+  public static void writeDataOutputStream(Object[][] records, DataOutputStream dataOutputStream,
+      int measureCount, int dimensionCount, int noDictionaryCount, int complexDimensionCount)
+      throws IOException {
+    Object[] row;
+    for (int recordIndex = 0; recordIndex < records.length; recordIndex++) {
+      row = records[recordIndex];
+      int fieldIndex = 0;
+
+      for (int counter = 0; counter < dimensionCount; counter++) {
+        dataOutputStream.writeInt((Integer) NonDictionaryUtil.getDimension(fieldIndex++, row));
+      }
+
+      //write byte[] of high card dims
+      if (noDictionaryCount > 0) {
+        dataOutputStream.write(NonDictionaryUtil.getByteArrayForNoDictionaryCols(row));
+      }
+      fieldIndex = 0;
+      for (int counter = 0; counter < complexDimensionCount; counter++) {
+        int complexByteArrayLength = ((byte[]) row[fieldIndex]).length;
+        dataOutputStream.writeInt(complexByteArrayLength);
+        dataOutputStream.write(((byte[]) row[fieldIndex++]));
+      }
+
+      for (int counter = 0; counter < measureCount; counter++) {
+        if (null != row[fieldIndex]) {
+          dataOutputStream.write((byte) 1);
+          dataOutputStream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+        } else {
+          dataOutputStream.write((byte) 0);
+        }
+
+        fieldIndex++;
+      }
+
+    }
+  }
+
+  /**
+   * Below method will be used to write the sort temp file
+   *
+   * @param records
+   */
+  public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
+    ByteArrayOutputStream blockDataArray = null;
+    DataOutputStream dataOutputStream = null;
+    int totalSize = 0;
+    int recordSize = 0;
+    try {
+      recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
+          * CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      totalSize = records.length * recordSize;
+
+      blockDataArray = new ByteArrayOutputStream(totalSize);
+      dataOutputStream = new DataOutputStream(blockDataArray);
+
+      writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
+          noDictionaryCount, complexDimensionCount);
+      stream.writeInt(records.length);
+      byte[] byteArray = blockDataArray.toByteArray();
+      stream.writeInt(byteArray.length);
+      stream.write(byteArray);
+
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException(e);
+    } finally {
+      CarbonUtil.closeStreams(blockDataArray);
+      CarbonUtil.closeStreams(dataOutputStream);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/exception/CarbonSortKeyAndGroupByException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/exception/CarbonSortKeyAndGroupByException.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/exception/CarbonSortKeyAndGroupByException.java
deleted file mode 100644
index d4e4c35..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/exception/CarbonSortKeyAndGroupByException.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.exception;
-
-import java.util.Locale;
-
-public class CarbonSortKeyAndGroupByException extends Exception {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonSortKeyAndGroupByException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonSortKeyAndGroupByException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param t
-   */
-  public CarbonSortKeyAndGroupByException(Throwable t) {
-    super(t);
-  }
-
-  /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/AbstractTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/AbstractTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/AbstractTempSortFileWriter.java
deleted file mode 100644
index bd2ccec..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/AbstractTempSortFileWriter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public abstract class AbstractTempSortFileWriter implements TempSortFileWriter {
-
-  /**
-   * writeFileBufferSize
-   */
-  protected int writeBufferSize;
-
-  /**
-   * Measure count
-   */
-  protected int measureCount;
-
-  /**
-   * Measure count
-   */
-  protected int dimensionCount;
-
-  /**
-   * complexDimension count
-   */
-  protected int complexDimensionCount;
-
-  /**
-   * stream
-   */
-  protected DataOutputStream stream;
-
-  /**
-   * noDictionaryCount
-   */
-  protected int noDictionaryCount;
-
-  /**
-   * AbstractTempSortFileWriter
-   *
-   * @param writeBufferSize
-   * @param dimensionCount
-   * @param measureCount
-   */
-  public AbstractTempSortFileWriter(int dimensionCount, int complexDimensionCount, int measureCount,
-      int noDictionaryCount, int writeBufferSize) {
-    this.writeBufferSize = writeBufferSize;
-    this.dimensionCount = dimensionCount;
-    this.complexDimensionCount = complexDimensionCount;
-    this.measureCount = measureCount;
-    this.noDictionaryCount = noDictionaryCount;
-  }
-
-  /**
-   * Below method will be used to initialize the stream and write the entry count
-   */
-  @Override public void initiaize(File file, int entryCount)
-      throws CarbonSortKeyAndGroupByException {
-    try {
-      stream = new DataOutputStream(
-          new BufferedOutputStream(new FileOutputStream(file), writeBufferSize));
-      stream.writeInt(entryCount);
-    } catch (FileNotFoundException e1) {
-      throw new CarbonSortKeyAndGroupByException(e1);
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
-    }
-  }
-
-  /**
-   * Below method will be used to close the stream
-   */
-  @Override public void finish() {
-    CarbonUtil.closeStreams(stream);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/CompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/CompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/CompressedTempSortFileWriter.java
deleted file mode 100644
index e4c851a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/CompressedTempSortFileWriter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class CompressedTempSortFileWriter extends AbstractTempSortFileWriter {
-
-  /**
-   * CompressedTempSortFileWriter
-   *
-   * @param writeBufferSize
-   * @param dimensionCount
-   * @param measureCount
-   */
-  public CompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
-      int measureCount, int noDictionaryCount, int writeBufferSize) {
-    super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
-  }
-
-  /**
-   * Below method will be used to write the sort temp file
-   *
-   * @param records
-   */
-  public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
-    DataOutputStream dataOutputStream = null;
-    ByteArrayOutputStream blockDataArray = null;
-    int totalSize = 0;
-    int recordSize = 0;
-    try {
-      recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
-          * CarbonCommonConstants.INT_SIZE_IN_BYTE);
-      totalSize = records.length * recordSize;
-
-      blockDataArray = new ByteArrayOutputStream(totalSize);
-      dataOutputStream = new DataOutputStream(blockDataArray);
-
-      UnCompressedTempSortFileWriter
-          .writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
-              noDictionaryCount, complexDimensionCount);
-
-      stream.writeInt(records.length);
-      byte[] byteArray = CompressorFactory.getInstance().getCompressor()
-          .compressByte(blockDataArray.toByteArray());
-      stream.writeInt(byteArray.length);
-      stream.write(byteArray);
-
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
-    } finally {
-      CarbonUtil.closeStreams(blockDataArray);
-      CarbonUtil.closeStreams(dataOutputStream);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
deleted file mode 100644
index 7c6a889..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.AbstractQueue;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class IntermediateFileMerger implements Runnable {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(IntermediateFileMerger.class.getName());
-
-  /**
-   * recordHolderHeap
-   */
-  private AbstractQueue<SortTempFileChunkHolder> recordHolderHeap;
-
-  /**
-   * fileCounter
-   */
-  private int fileCounter;
-
-  /**
-   * stream
-   */
-  private DataOutputStream stream;
-
-  /**
-   * totalNumberOfRecords
-   */
-  private int totalNumberOfRecords;
-
-  /**
-   * records
-   */
-  private Object[][] records;
-
-  /**
-   * entryCount
-   */
-  private int entryCount;
-
-  /**
-   * writer
-   */
-  private TempSortFileWriter writer;
-
-  /**
-   * totalSize
-   */
-  private int totalSize;
-
-  private SortParameters mergerParameters;
-
-  private File[] intermediateFiles;
-
-  private File outPutFile;
-
-  private boolean[] noDictionarycolumnMapping;
-
-  /**
-   * IntermediateFileMerger Constructor
-   */
-  public IntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles,
-      File outPutFile) {
-    this.mergerParameters = mergerParameters;
-    this.fileCounter = intermediateFiles.length;
-    this.intermediateFiles = intermediateFiles;
-    this.outPutFile = outPutFile;
-    noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
-  }
-
-  @Override
-  public void run() {
-    long intermediateMergeStartTime = System.currentTimeMillis();
-    int fileConterConst = fileCounter;
-    boolean isFailed = false;
-    try {
-      startSorting();
-      initialize();
-      while (hasNext()) {
-        writeDataTofile(next());
-      }
-      if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
-        if (entryCount > 0) {
-          if (entryCount < totalSize) {
-            Object[][] temp = new Object[entryCount][];
-            System.arraycopy(records, 0, temp, 0, entryCount);
-            records = temp;
-            this.writer.writeSortTempFile(temp);
-          } else {
-            this.writer.writeSortTempFile(records);
-          }
-        }
-      }
-      double intermediateMergeCostTime =
-          (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
-      LOGGER.info("============================== Intermediate Merge of " + fileConterConst +
-          " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
-    } catch (Exception e) {
-      LOGGER.error(e, "Problem while intermediate merging");
-      isFailed = true;
-    } finally {
-      records = null;
-      CarbonUtil.closeStreams(this.stream);
-      if (null != writer) {
-        writer.finish();
-      }
-      if (!isFailed) {
-        try {
-          finish();
-        } catch (CarbonSortKeyAndGroupByException e) {
-          LOGGER.error(e, "Problem while deleting the merge file");
-        }
-      } else {
-        if (outPutFile.delete()) {
-          LOGGER.error("Problem while deleting the merge file");
-        }
-      }
-    }
-  }
-
-  /**
-   * This method is responsible for initializing the out stream
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private void initialize() throws CarbonSortKeyAndGroupByException {
-    if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
-      try {
-        this.stream = new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(outPutFile),
-                mergerParameters.getFileWriteBufferSize()));
-        this.stream.writeInt(this.totalNumberOfRecords);
-      } catch (FileNotFoundException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
-      } catch (IOException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
-      }
-    } else {
-      writer = TempSortFileWriterFactory.getInstance()
-          .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
-              mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
-              mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getFileWriteBufferSize());
-      writer.initiaize(outPutFile, totalNumberOfRecords);
-
-      if (mergerParameters.isPrefetch()) {
-        totalSize = mergerParameters.getBufferSize();
-      } else {
-        totalSize = mergerParameters.getSortTempFileNoOFRecordsInCompression();
-      }
-    }
-  }
-
-  /**
-   * This method will be used to get the sorted record from file
-   *
-   * @return sorted record sorted record
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
-    Object[] row = null;
-
-    // poll the top object from heap
-    // heap maintains binary tree which is based on heap condition that will
-    // be based on comparator we are passing the heap
-    // when will call poll it will always delete root of the tree and then
-    // it does trickel down operation complexity is log(n)
-    SortTempFileChunkHolder poll = this.recordHolderHeap.poll();
-
-    // get the row from chunk
-    row = poll.getRow();
-
-    // check if there no entry present
-    if (!poll.hasNext()) {
-      // if chunk is empty then close the stream
-      poll.closeStream();
-
-      // change the file counter
-      --this.fileCounter;
-
-      // reaturn row
-      return row;
-    }
-
-    // read new row
-    poll.readRow();
-
-    // add to heap
-    this.recordHolderHeap.add(poll);
-
-    // return row
-    return row;
-  }
-
-  /**
-   * Below method will be used to start storing process This method will get
-   * all the temp files present in sort temp folder then it will create the
-   * record holder heap and then it will read first record from each file and
-   * initialize the heap
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private void startSorting() throws CarbonSortKeyAndGroupByException {
-    LOGGER.info("Number of temp file: " + this.fileCounter);
-
-    // create record holder heap
-    createRecordHolderQueue(intermediateFiles);
-
-    // iterate over file list and create chunk holder and add to heap
-    LOGGER.info("Started adding first record from each file");
-
-    SortTempFileChunkHolder sortTempFileChunkHolder = null;
-
-    for (File tempFile : intermediateFiles) {
-      // create chunk holder
-      sortTempFileChunkHolder =
-          new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
-              mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
-              mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getMeasureDataType(),
-              mergerParameters.getNoDictionaryDimnesionColumn(),
-              mergerParameters.getNoDictionarySortColumn());
-
-      // initialize
-      sortTempFileChunkHolder.initialize();
-      sortTempFileChunkHolder.readRow();
-      this.totalNumberOfRecords += sortTempFileChunkHolder.getEntryCount();
-
-      // add to heap
-      this.recordHolderHeap.add(sortTempFileChunkHolder);
-    }
-
-    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
-  }
-
-  /**
-   * This method will be used to create the heap which will be used to hold
-   * the chunk of data
-   *
-   * @param listFiles list of temp files
-   */
-  private void createRecordHolderQueue(File[] listFiles) {
-    // creating record holder heap
-    this.recordHolderHeap = new PriorityQueue<>(listFiles.length);
-  }
-
-  /**
-   * This method will be used to get the sorted row
-   *
-   * @return sorted row
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private Object[] next() throws CarbonSortKeyAndGroupByException {
-    return getSortedRecordFromFile();
-  }
-
-  /**
-   * This method will be used to check whether any more element is present or
-   * not
-   *
-   * @return more element is present
-   */
-  private boolean hasNext() {
-    return this.fileCounter > 0;
-  }
-
-  /**
-   * Below method will be used to write data to file
-   *
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
-  private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException {
-    if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
-      if (entryCount == 0) {
-        records = new Object[totalSize][];
-        records[entryCount++] = row;
-        return;
-      }
-
-      records[entryCount++] = row;
-      if (entryCount == totalSize) {
-        this.writer.writeSortTempFile(records);
-        entryCount = 0;
-        records = new Object[totalSize][];
-      }
-      return;
-    }
-    try {
-      DataType[] aggType = mergerParameters.getMeasureDataType();
-      int[] mdkArray = (int[]) row[0];
-      byte[][] nonDictArray = (byte[][]) row[1];
-      int mdkIndex = 0;
-      int nonDictKeyIndex = 0;
-      // write dictionary and non dictionary dimensions here.
-      for (boolean nodictinary : noDictionarycolumnMapping) {
-        if (nodictinary) {
-          byte[] col = nonDictArray[nonDictKeyIndex++];
-          stream.writeShort(col.length);
-          stream.write(col);
-        } else {
-          stream.writeInt(mdkArray[mdkIndex++]);
-        }
-      }
-
-      int fieldIndex = 0;
-      for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
-        if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
-          stream.write((byte) 1);
-          switch (aggType[counter]) {
-            case SHORT:
-              stream.writeShort((short)NonDictionaryUtil.getMeasure(fieldIndex, row));
-              break;
-            case INT:
-              stream.writeInt((int)NonDictionaryUtil.getMeasure(fieldIndex, row));
-              break;
-            case LONG:
-              stream.writeLong((long)NonDictionaryUtil.getMeasure(fieldIndex, row));
-              break;
-            case DOUBLE:
-              stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
-              break;
-            case DECIMAL:
-              byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
-              stream.writeInt(bigDecimalInBytes.length);
-              stream.write(bigDecimalInBytes);
-              break;
-            default:
-              throw new IllegalArgumentException("unsupported data type:" + aggType[counter]);
-          }
-        } else {
-          stream.write((byte) 0);
-        }
-        fieldIndex++;
-      }
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
-    }
-  }
-
-  private void finish() throws CarbonSortKeyAndGroupByException {
-    if (recordHolderHeap != null) {
-      int size = recordHolderHeap.size();
-      for (int i = 0; i < size; i++) {
-        recordHolderHeap.poll().closeStream();
-      }
-    }
-    try {
-      CarbonUtil.deleteFiles(intermediateFiles);
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
deleted file mode 100644
index 247251e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-
-public class NewRowComparator implements Comparator<Object[]> {
-
-  /**
-   * mapping of dictionary dimensions and no dictionary of sort_column.
-   */
-  private boolean[] noDictionarySortColumnMaping;
-
-  /**
-   * @param noDictionarySortColumnMaping
-   */
-  public NewRowComparator(boolean[] noDictionarySortColumnMaping) {
-    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
-  }
-
-  /**
-   * Below method will be used to compare two mdkey
-   */
-  public int compare(Object[] rowA, Object[] rowB) {
-    int diff = 0;
-
-    int index = 0;
-
-    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-
-      if (isNoDictionary) {
-        byte[] byteArr1 = (byte[]) rowA[index];
-
-        byte[] byteArr2 = (byte[]) rowB[index];
-
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
-        }
-      } else {
-        int dimFieldA = (int) rowA[index];
-        int dimFieldB = (int) rowB[index];
-        diff = dimFieldA - dimFieldB;
-        if (diff != 0) {
-          return diff;
-        }
-      }
-
-      index++;
-    }
-
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
deleted file mode 100644
index 241882e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.util.Comparator;
-
-/**
- * This class is used as comparator for comparing dims which are non high cardinality dims.
- * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
- */
-public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
-  /**
-   * dimension count
-   */
-  private int numberOfSortColumns;
-
-  /**
-   * RowComparatorForNormalDims Constructor
-   *
-   * @param numberOfSortColumns
-   */
-  public NewRowComparatorForNormalDims(int numberOfSortColumns) {
-    this.numberOfSortColumns = numberOfSortColumns;
-  }
-
-  /**
-   * Below method will be used to compare two surrogate keys
-   *
-   * @see Comparator#compare(Object, Object)
-   */
-  public int compare(Object[] rowA, Object[] rowB) {
-    int diff = 0;
-
-    for (int i = 0; i < numberOfSortColumns; i++) {
-
-      int dimFieldA = (int)rowA[i];
-      int dimFieldB = (int)rowB[i];
-      diff = dimFieldA - dimFieldB;
-      if (diff != 0) {
-        return diff;
-      }
-    }
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
deleted file mode 100644
index 11c42a9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-
-public class RowComparator implements Comparator<Object[]> {
-  /**
-   * noDictionaryCount represent number of no dictionary cols
-   */
-  private int noDictionaryCount;
-
-  /**
-   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
-   */
-  private boolean[] noDictionarySortColumnMaping;
-
-  /**
-   * @param noDictionarySortColumnMaping
-   * @param noDictionaryCount
-   */
-  public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) {
-    this.noDictionaryCount = noDictionaryCount;
-    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
-  }
-
-  /**
-   * Below method will be used to compare two mdkey
-   */
-  public int compare(Object[] rowA, Object[] rowB) {
-    int diff = 0;
-
-    int normalIndex = 0;
-    int noDictionaryindex = 0;
-
-    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-
-      if (isNoDictionary) {
-        byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-
-        ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
-
-        // extract a high card dims from complete byte[].
-        NonDictionaryUtil
-            .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1);
-
-        byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-
-        ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);
-
-        // extract a high card dims from complete byte[].
-        NonDictionaryUtil
-            .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2);
-
-        int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2);
-        if (difference != 0) {
-          return difference;
-        }
-        noDictionaryindex++;
-      } else {
-        int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA);
-        int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB);
-        diff = dimFieldA - dimFieldB;
-        if (diff != 0) {
-          return diff;
-        }
-        normalIndex++;
-      }
-
-    }
-
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
deleted file mode 100644
index be29bf8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-
-/**
- * This class is used as comparator for comparing dims which are non high cardinality dims.
- * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
- */
-public class RowComparatorForNormalDims implements Comparator<Object[]> {
-  /**
-   * dimension count
-   */
-  private int numberOfSortColumns;
-
-  /**
-   * RowComparatorForNormalDims Constructor
-   *
-   * @param numberOfSortColumns
-   */
-  public RowComparatorForNormalDims(int numberOfSortColumns) {
-    this.numberOfSortColumns = numberOfSortColumns;
-  }
-
-  /**
-   * Below method will be used to compare two surrogate keys
-   *
-   * @see Comparator#compare(Object, Object)
-   */
-  public int compare(Object[] rowA, Object[] rowB) {
-    int diff = 0;
-
-    for (int i = 0; i < numberOfSortColumns; i++) {
-
-      int dimFieldA = NonDictionaryUtil.getDimension(i, rowA);
-      int dimFieldB = NonDictionaryUtil.getDimension(i, rowB);
-
-      diff = dimFieldA - dimFieldB;
-      if (diff != 0) {
-        return diff;
-      }
-    }
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
deleted file mode 100644
index 71fc564..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class SortDataRows {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(SortDataRows.class.getName());
-  /**
-   * entryCount
-   */
-  private int entryCount;
-  /**
-   * record holder array
-   */
-  private Object[][] recordHolderList;
-  /**
-   * threadStatusObserver
-   */
-  private ThreadStatusObserver threadStatusObserver;
-  /**
-   * executor service for data sort holder
-   */
-  private ExecutorService dataSorterAndWriterExecutorService;
-  /**
-   * semaphore which will used for managing sorted data object arrays
-   */
-  private Semaphore semaphore;
-
-  private SortParameters parameters;
-
-  private int sortBufferSize;
-
-  private SortIntermediateFileMerger intermediateFileMerger;
-
-  private final Object addRowsLock = new Object();
-
-  public SortDataRows(SortParameters parameters,
-      SortIntermediateFileMerger intermediateFileMerger) {
-    this.parameters = parameters;
-
-    this.intermediateFileMerger = intermediateFileMerger;
-
-    int batchSize = CarbonProperties.getInstance().getBatchSize();
-
-    this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize);
-    // observer of writing file in thread
-    this.threadStatusObserver = new ThreadStatusObserver();
-  }
-
-  /**
-   * This method will be used to initialize
-   */
-  public void initialize() throws CarbonSortKeyAndGroupByException {
-
-    // create holder list which will hold incoming rows
-    // size of list will be sort buffer size + 1 to avoid creation of new
-    // array in list array
-    this.recordHolderList = new Object[sortBufferSize][];
-    // Delete if any older file exists in sort temp folder
-    deleteSortLocationIfExists();
-
-    // create new sort temp directory
-    CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
-    this.dataSorterAndWriterExecutorService =
-        Executors.newFixedThreadPool(parameters.getNumberOfCores());
-    semaphore = new Semaphore(parameters.getNumberOfCores());
-  }
-
-  /**
-   * This method will be used to add new row
-   *
-   * @param row new row
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
-  public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
-    // if record holder list size is equal to sort buffer size then it will
-    // sort the list and then write current list data to file
-    int currentSize = entryCount;
-
-    if (sortBufferSize == currentSize) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("************ Writing to temp file ********** ");
-      }
-      intermediateFileMerger.startMergingIfPossible();
-      Object[][] recordHolderListLocal = recordHolderList;
-      try {
-        semaphore.acquire();
-        dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
-      } catch (InterruptedException e) {
-        LOGGER.error(
-            "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
-        throw new CarbonSortKeyAndGroupByException(e.getMessage());
-      }
-      // create the new holder Array
-      this.recordHolderList = new Object[this.sortBufferSize][];
-      this.entryCount = 0;
-    }
-    recordHolderList[entryCount++] = row;
-  }
-
-  /**
-   * This method will be used to add new row
-   *
-   * @param rowBatch new rowBatch
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
-  public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
-    // if record holder list size is equal to sort buffer size then it will
-    // sort the list and then write current list data to file
-    synchronized (addRowsLock) {
-      int sizeLeft = 0;
-      if (entryCount + size >= sortBufferSize) {
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug("************ Writing to temp file ********** ");
-        }
-        intermediateFileMerger.startMergingIfPossible();
-        Object[][] recordHolderListLocal = recordHolderList;
-        sizeLeft = sortBufferSize - entryCount ;
-        if (sizeLeft > 0) {
-          System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
-        }
-        try {
-          semaphore.acquire();
-          dataSorterAndWriterExecutorService
-              .execute(new DataSorterAndWriter(recordHolderListLocal));
-        } catch (Exception e) {
-          LOGGER.error(
-              "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
-          throw new CarbonSortKeyAndGroupByException(e);
-        }
-        // create the new holder Array
-        this.recordHolderList = new Object[this.sortBufferSize][];
-        this.entryCount = 0;
-        size = size - sizeLeft;
-        if (size == 0) {
-          return;
-        }
-      }
-      System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
-      entryCount += size;
-    }
-  }
-
-  /**
-   * Below method will be used to start storing process This method will get
-   * all the temp files present in sort temp folder then it will create the
-   * record holder heap and then it will read first record from each file and
-   * initialize the heap
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  public void startSorting() throws CarbonSortKeyAndGroupByException {
-    LOGGER.info("File based sorting will be used");
-    if (this.entryCount > 0) {
-      Object[][] toSort;
-      toSort = new Object[entryCount][];
-      System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
-      if (parameters.getNumberOfNoDictSortColumns() > 0) {
-        Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn()));
-      } else {
-        Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
-      }
-      recordHolderList = toSort;
-
-      // create new file and choose folder randomly
-      String[] tmpLocation = parameters.getTempFileLocation();
-      String locationChosen = tmpLocation[new Random().nextInt(tmpLocation.length)];
-      File file = new File(
-          locationChosen + File.separator + parameters.getTableName() +
-              System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
-      writeDataTofile(recordHolderList, this.entryCount, file);
-
-    }
-
-    startFileBasedMerge();
-    this.recordHolderList = null;
-  }
-
-  /**
-   * Below method will be used to write data to file
-   *
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
-  private void writeDataTofile(Object[][] recordHolderList, int entryCountLocal, File file)
-      throws CarbonSortKeyAndGroupByException {
-    // stream
-    if (parameters.isSortFileCompressionEnabled() || parameters.isPrefetch()) {
-      writeSortTempFile(recordHolderList, entryCountLocal, file);
-      return;
-    }
-    writeData(recordHolderList, entryCountLocal, file);
-  }
-
-  private void writeSortTempFile(Object[][] recordHolderList, int entryCountLocal, File file)
-      throws CarbonSortKeyAndGroupByException {
-    TempSortFileWriter writer = null;
-
-    try {
-      writer = getWriter();
-      writer.initiaize(file, entryCountLocal);
-      writer.writeSortTempFile(recordHolderList);
-    } catch (CarbonSortKeyAndGroupByException e) {
-      LOGGER.error(e, "Problem while writing the sort temp file");
-      throw e;
-    } finally {
-      if (writer != null) {
-        writer.finish();
-      }
-    }
-  }
-
-  private void writeData(Object[][] recordHolderList, int entryCountLocal, File file)
-      throws CarbonSortKeyAndGroupByException {
-    DataOutputStream stream = null;
-    try {
-      // open stream
-      stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
-          parameters.getFileWriteBufferSize()));
-
-      // write number of entries to the file
-      stream.writeInt(entryCountLocal);
-      int complexDimColCount = parameters.getComplexDimColCount();
-      int dimColCount = parameters.getDimColCount() + complexDimColCount;
-      DataType[] type = parameters.getMeasureDataType();
-      boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
-      Object[] row = null;
-      for (int i = 0; i < entryCountLocal; i++) {
-        // get row from record holder list
-        row = recordHolderList[i];
-        int dimCount = 0;
-        // write dictionary and non dictionary dimensions here.
-        for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) {
-          if (noDictionaryDimnesionMapping[dimCount]) {
-            byte[] col = (byte[]) row[dimCount];
-            stream.writeShort(col.length);
-            stream.write(col);
-          } else {
-            stream.writeInt((int)row[dimCount]);
-          }
-        }
-        // write complex dimensions here.
-        for (; dimCount < dimColCount; dimCount++) {
-          byte[] value = (byte[])row[dimCount];
-          stream.writeShort(value.length);
-          stream.write(value);
-        }
-        // as measures are stored in separate array.
-        for (int mesCount = 0;
-             mesCount < parameters.getMeasureColCount(); mesCount++) {
-          Object value = row[mesCount + dimColCount];
-          if (null != value) {
-            stream.write((byte) 1);
-            switch (type[mesCount]) {
-              case SHORT:
-                stream.writeShort((Short) value);
-                break;
-              case INT:
-                stream.writeInt((Integer) value);
-                break;
-              case LONG:
-                stream.writeLong((Long) value);
-                break;
-              case DOUBLE:
-                stream.writeDouble((Double) value);
-                break;
-              case DECIMAL:
-                BigDecimal val = (BigDecimal) value;
-                byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
-                stream.writeInt(bigDecimalInBytes.length);
-                stream.write(bigDecimalInBytes);
-                break;
-              default:
-                throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
-            }
-          } else {
-            stream.write((byte) 0);
-          }
-        }
-      }
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
-    } finally {
-      // close streams
-      CarbonUtil.closeStreams(stream);
-    }
-  }
-
-  private TempSortFileWriter getWriter() {
-    TempSortFileWriter chunkWriter = null;
-    TempSortFileWriter writer = TempSortFileWriterFactory.getInstance()
-        .getTempSortFileWriter(parameters.isSortFileCompressionEnabled(),
-            parameters.getDimColCount(), parameters.getComplexDimColCount(),
-            parameters.getMeasureColCount(), parameters.getNoDictionaryCount(),
-            parameters.getFileWriteBufferSize());
-
-    if (parameters.isPrefetch() && !parameters.isSortFileCompressionEnabled()) {
-      chunkWriter = new SortTempFileChunkWriter(writer, parameters.getBufferSize());
-    } else {
-      chunkWriter =
-          new SortTempFileChunkWriter(writer, parameters.getSortTempFileNoOFRecordsInCompression());
-    }
-
-    return chunkWriter;
-  }
-
-  /**
-   * This method will be used to delete sort temp location is it is exites
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
-    CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
-  }
-
-  /**
-   * Below method will be used to start file based merge
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private void startFileBasedMerge() throws CarbonSortKeyAndGroupByException {
-    try {
-      dataSorterAndWriterExecutorService.shutdown();
-      dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
-    }
-  }
-
-  /**
-   * Observer class for thread execution
-   * In case of any failure we need stop all the running thread
-   */
-  private class ThreadStatusObserver {
-    /**
-     * Below method will be called if any thread fails during execution
-     *
-     * @param exception
-     * @throws CarbonSortKeyAndGroupByException
-     */
-    public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
-      dataSorterAndWriterExecutorService.shutdownNow();
-      intermediateFileMerger.close();
-      parameters.getObserver().setFailed(true);
-      LOGGER.error(exception);
-      throw new CarbonSortKeyAndGroupByException(exception);
-    }
-  }
-
-  /**
-   * This class is responsible for sorting and writing the object
-   * array which holds the records equal to given array size
-   */
-  private class DataSorterAndWriter implements Runnable {
-    private Object[][] recordHolderArray;
-
-    public DataSorterAndWriter(Object[][] recordHolderArray) {
-      this.recordHolderArray = recordHolderArray;
-    }
-
-    @Override
-    public void run() {
-      try {
-        long startTime = System.currentTimeMillis();
-        if (parameters.getNumberOfNoDictSortColumns() > 0) {
-          Arrays.sort(recordHolderArray,
-              new NewRowComparator(parameters.getNoDictionarySortColumn()));
-        } else {
-          Arrays.sort(recordHolderArray,
-              new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
-        }
-
-        // create a new file and choose folder randomly every time
-        String[] tmpFileLocation = parameters.getTempFileLocation();
-        String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)];
-        File sortTempFile = new File(
-            locationChosen + File.separator + parameters.getTableName() + System
-                .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
-        writeDataTofile(recordHolderArray, recordHolderArray.length, sortTempFile);
-        // add sort temp filename to and arrayList. When the list size reaches 20 then
-        // intermediate merging of sort temp files will be triggered
-        intermediateFileMerger.addFileToMerge(sortTempFile);
-        LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + (
-            System.currentTimeMillis() - startTime));
-      } catch (Throwable e) {
-        try {
-          threadStatusObserver.notifyFailed(e);
-        } catch (CarbonSortKeyAndGroupByException ex) {
-          LOGGER.error(ex);
-        }
-      } finally {
-        semaphore.release();
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
deleted file mode 100644
index 6bda88a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-/**
- * It does mergesort intermediate files to big file.
- */
-public class SortIntermediateFileMerger {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(SortIntermediateFileMerger.class.getName());
-
-  /**
-   * executorService
-   */
-  private ExecutorService executorService;
-  /**
-   * procFiles
-   */
-  private List<File> procFiles;
-
-  private SortParameters parameters;
-
-  private final Object lockObject = new Object();
-
-  public SortIntermediateFileMerger(SortParameters parameters) {
-    this.parameters = parameters;
-    // processed file list
-    this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
-  }
-
-  public void addFileToMerge(File sortTempFile) {
-    // add sort temp filename to and arrayList. When the list size reaches 20 then
-    // intermediate merging of sort temp files will be triggered
-    synchronized (lockObject) {
-      procFiles.add(sortTempFile);
-    }
-  }
-
-  public void startMergingIfPossible() {
-    File[] fileList;
-    if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
-      synchronized (lockObject) {
-        fileList = procFiles.toArray(new File[procFiles.size()]);
-        this.procFiles = new ArrayList<File>();
-      }
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
-      }
-      startIntermediateMerging(fileList);
-    }
-  }
-
-  /**
-   * Below method will be used to start the intermediate file merging
-   *
-   * @param intermediateFiles
-   */
-  private void startIntermediateMerging(File[] intermediateFiles) {
-    int index = new Random().nextInt(parameters.getTempFileLocation().length);
-    String chosenTempDir = parameters.getTempFileLocation()[index];
-    File file = new File(
-        chosenTempDir + File.separator + parameters.getTableName() + System
-            .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
-    IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
-    executorService.execute(merger);
-  }
-
-  public void finish() throws CarbonSortKeyAndGroupByException {
-    try {
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
-    }
-    procFiles.clear();
-    procFiles = null;
-  }
-
-  public void close() {
-    if (executorService.isShutdown()) {
-      executorService.shutdownNow();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
deleted file mode 100644
index fb2977e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ /dev/null
@@ -1,603 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.File;
-import java.io.Serializable;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.schema.metadata.SortObserver;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.apache.commons.lang3.StringUtils;
-
-public class SortParameters implements Serializable {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(SortParameters.class.getName());
-  /**
-   * tempFileLocation
-   */
-  private String[] tempFileLocation;
-  /**
-   * sortBufferSize
-   */
-  private int sortBufferSize;
-  /**
-   * measure count
-   */
-  private int measureColCount;
-  /**
-   * measure count
-   */
-  private int dimColCount;
-  /**
-   * measure count
-   */
-  private int complexDimColCount;
-  /**
-   * fileBufferSize
-   */
-  private int fileBufferSize;
-  /**
-   * numberOfIntermediateFileToBeMerged
-   */
-  private int numberOfIntermediateFileToBeMerged;
-  /**
-   * fileWriteBufferSize
-   */
-  private int fileWriteBufferSize;
-  /**
-   * observer
-   */
-  private SortObserver observer;
-  /**
-   * sortTempFileNoOFRecordsInCompression
-   */
-  private int sortTempFileNoOFRecordsInCompression;
-  /**
-   * isSortTempFileCompressionEnabled
-   */
-  private boolean isSortFileCompressionEnabled;
-  /**
-   * prefetch
-   */
-  private boolean prefetch;
-  /**
-   * bufferSize
-   */
-  private int bufferSize;
-
-  private String databaseName;
-
-  private String tableName;
-
-  private DataType[] measureDataType;
-
-  /**
-   * To know how many columns are of high cardinality.
-   */
-  private int noDictionaryCount;
-  /**
-   * partitionID
-   */
-  private String partitionID;
-  /**
-   * Id of the load folder
-   */
-  private String segmentId;
-  /**
-   * task id, each spark task has a unique id
-   */
-  private String taskNo;
-
-  /**
-   * This will tell whether dimension is dictionary or not.
-   */
-  private boolean[] noDictionaryDimnesionColumn;
-
-  private boolean[] noDictionarySortColumn;
-
-  private int numberOfSortColumns;
-
-  private int numberOfNoDictSortColumns;
-
-  private int numberOfCores;
-
-  private int batchSortSizeinMb;
-
-  public SortParameters getCopy() {
-    SortParameters parameters = new SortParameters();
-    parameters.tempFileLocation = tempFileLocation;
-    parameters.sortBufferSize = sortBufferSize;
-    parameters.measureColCount = measureColCount;
-    parameters.dimColCount = dimColCount;
-    parameters.complexDimColCount = complexDimColCount;
-    parameters.fileBufferSize = fileBufferSize;
-    parameters.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
-    parameters.fileWriteBufferSize = fileWriteBufferSize;
-    parameters.observer = observer;
-    parameters.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
-    parameters.isSortFileCompressionEnabled = isSortFileCompressionEnabled;
-    parameters.prefetch = prefetch;
-    parameters.bufferSize = bufferSize;
-    parameters.databaseName = databaseName;
-    parameters.tableName = tableName;
-    parameters.measureDataType = measureDataType;
-    parameters.noDictionaryCount = noDictionaryCount;
-    parameters.partitionID = partitionID;
-    parameters.segmentId = segmentId;
-    parameters.taskNo = taskNo;
-    parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
-    parameters.noDictionarySortColumn = noDictionarySortColumn;
-    parameters.numberOfSortColumns = numberOfSortColumns;
-    parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
-    parameters.numberOfCores = numberOfCores;
-    parameters.batchSortSizeinMb = batchSortSizeinMb;
-    return parameters;
-  }
-
-  public String[] getTempFileLocation() {
-    return tempFileLocation;
-  }
-
-  public void setTempFileLocation(String[] tempFileLocation) {
-    this.tempFileLocation = tempFileLocation;
-  }
-
-  public int getSortBufferSize() {
-    return sortBufferSize;
-  }
-
-  public void setSortBufferSize(int sortBufferSize) {
-    this.sortBufferSize = sortBufferSize;
-  }
-
-  public int getMeasureColCount() {
-    return measureColCount;
-  }
-
-  public void setMeasureColCount(int measureColCount) {
-    this.measureColCount = measureColCount;
-  }
-
-  public int getDimColCount() {
-    return dimColCount;
-  }
-
-  public void setDimColCount(int dimColCount) {
-    this.dimColCount = dimColCount;
-  }
-
-  public int getComplexDimColCount() {
-    return complexDimColCount;
-  }
-
-  public void setComplexDimColCount(int complexDimColCount) {
-    this.complexDimColCount = complexDimColCount;
-  }
-
-  public int getFileBufferSize() {
-    return fileBufferSize;
-  }
-
-  public void setFileBufferSize(int fileBufferSize) {
-    this.fileBufferSize = fileBufferSize;
-  }
-
-  public int getNumberOfIntermediateFileToBeMerged() {
-    return numberOfIntermediateFileToBeMerged;
-  }
-
-  public void setNumberOfIntermediateFileToBeMerged(int numberOfIntermediateFileToBeMerged) {
-    this.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
-  }
-
-  public int getFileWriteBufferSize() {
-    return fileWriteBufferSize;
-  }
-
-  public void setFileWriteBufferSize(int fileWriteBufferSize) {
-    this.fileWriteBufferSize = fileWriteBufferSize;
-  }
-
-  public SortObserver getObserver() {
-    return observer;
-  }
-
-  public void setObserver(SortObserver observer) {
-    this.observer = observer;
-  }
-
-  public int getSortTempFileNoOFRecordsInCompression() {
-    return sortTempFileNoOFRecordsInCompression;
-  }
-
-  public void setSortTempFileNoOFRecordsInCompression(int sortTempFileNoOFRecordsInCompression) {
-    this.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
-  }
-
-  public boolean isSortFileCompressionEnabled() {
-    return isSortFileCompressionEnabled;
-  }
-
-  public void setSortFileCompressionEnabled(boolean sortFileCompressionEnabled) {
-    isSortFileCompressionEnabled = sortFileCompressionEnabled;
-  }
-
-  public boolean isPrefetch() {
-    return prefetch;
-  }
-
-  public void setPrefetch(boolean prefetch) {
-    this.prefetch = prefetch;
-  }
-
-  public int getBufferSize() {
-    return bufferSize;
-  }
-
-  public void setBufferSize(int bufferSize) {
-    this.bufferSize = bufferSize;
-  }
-
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  public void setDatabaseName(String databaseName) {
-    this.databaseName = databaseName;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  public DataType[] getMeasureDataType() {
-    return measureDataType;
-  }
-
-  public void setMeasureDataType(DataType[] measureDataType) {
-    this.measureDataType = measureDataType;
-  }
-
-  public int getNoDictionaryCount() {
-    return noDictionaryCount;
-  }
-
-  public void setNoDictionaryCount(int noDictionaryCount) {
-    this.noDictionaryCount = noDictionaryCount;
-  }
-
-  public String getPartitionID() {
-    return partitionID;
-  }
-
-  public void setPartitionID(String partitionID) {
-    this.partitionID = partitionID;
-  }
-
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-
-  public String getTaskNo() {
-    return taskNo;
-  }
-
-  public void setTaskNo(String taskNo) {
-    this.taskNo = taskNo;
-  }
-
-  public boolean[] getNoDictionaryDimnesionColumn() {
-    return noDictionaryDimnesionColumn;
-  }
-
-  public void setNoDictionaryDimnesionColumn(boolean[] noDictionaryDimnesionColumn) {
-    this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
-  }
-
-  public int getNumberOfCores() {
-    return numberOfCores;
-  }
-
-  public void setNumberOfCores(int numberOfCores) {
-    this.numberOfCores = numberOfCores;
-  }
-
-  public int getNumberOfSortColumns() {
-    return numberOfSortColumns;
-  }
-
-  public void setNumberOfSortColumns(int numberOfSortColumns) {
-    this.numberOfSortColumns = Math.min(numberOfSortColumns, this.dimColCount);
-  }
-
-  public boolean[] getNoDictionarySortColumn() {
-    return noDictionarySortColumn;
-  }
-
-  public void setNoDictionarySortColumn(boolean[] noDictionarySortColumn) {
-    this.noDictionarySortColumn = noDictionarySortColumn;
-  }
-
-  public int getNumberOfNoDictSortColumns() {
-    return numberOfNoDictSortColumns;
-  }
-
-  public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
-    this.numberOfNoDictSortColumns = Math.min(numberOfNoDictSortColumns, noDictionaryCount);
-  }
-
-  public int getBatchSortSizeinMb() {
-    return batchSortSizeinMb;
-  }
-
-  public void setBatchSortSizeinMb(int batchSortSizeinMb) {
-    this.batchSortSizeinMb = batchSortSizeinMb;
-  }
-
-  public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
-    SortParameters parameters = new SortParameters();
-    CarbonTableIdentifier tableIdentifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    CarbonProperties carbonProperties = CarbonProperties.getInstance();
-    parameters.setDatabaseName(tableIdentifier.getDatabaseName());
-    parameters.setTableName(tableIdentifier.getTableName());
-    parameters.setPartitionID(configuration.getPartitionId());
-    parameters.setSegmentId(configuration.getSegmentId());
-    parameters.setTaskNo(configuration.getTaskNo());
-    parameters.setMeasureColCount(configuration.getMeasureCount());
-    parameters.setDimColCount(
-        configuration.getDimensionCount() - configuration.getComplexColumnCount());
-    parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
-    parameters.setComplexDimColCount(configuration.getComplexColumnCount());
-    parameters.setNoDictionaryDimnesionColumn(
-        CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
-    parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
-
-    parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
-    parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
-    setNoDictionarySortColumnMapping(parameters);
-    parameters.setObserver(new SortObserver());
-    // get sort buffer size
-    parameters.setSortBufferSize(Integer.parseInt(carbonProperties
-        .getProperty(CarbonCommonConstants.SORT_SIZE,
-            CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
-    LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
-    // set number of intermedaite file to merge
-    parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
-        .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
-            CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
-
-    LOGGER.info("Number of intermediate file to be merged: " + parameters
-        .getNumberOfIntermediateFileToBeMerged());
-
-    // get file buffer size
-    parameters.setFileBufferSize(CarbonDataProcessorUtil
-        .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
-            CarbonCommonConstants.CONSTANT_SIZE_TEN));
-
-    LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
-
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), configuration.getTaskNo(),
-            configuration.getPartitionId(), configuration.getSegmentId(), false, false);
-    String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
-        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-
-    parameters.setTempFileLocation(sortTempDirs);
-    LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
-
-    int numberOfCores;
-    try {
-      numberOfCores = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-      numberOfCores = numberOfCores / 2;
-    } catch (NumberFormatException exc) {
-      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-    }
-    parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
-
-    parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
-            CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
-
-    parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
-        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
-
-    int sortTempFileNoOFRecordsInCompression;
-    try {
-      sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ":Only Positive Integer value(greater than zero) is allowed.Default value will "
-            + "be used");
-
-        sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed. Default value will be used");
-
-      sortTempFileNoOFRecordsInCompression = Integer
-          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
-    parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
-
-    if (parameters.isSortFileCompressionEnabled()) {
-      LOGGER.info("Compression will be used for writing the sort temp File");
-    }
-
-    parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
-    parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
-        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
-        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
-
-    DataType[] measureDataType = configuration.getMeasureDataType();
-    parameters.setMeasureDataType(measureDataType);
-    return parameters;
-  }
-
-  /**
-   * this method will set the boolean mapping for no dictionary sort columns
-   *
-   * @param parameters
-   */
-  private static void setNoDictionarySortColumnMapping(SortParameters parameters) {
-    if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length) {
-      parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
-    } else {
-      boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()];
-      System
-          .arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0, noDictionarySortColumnTemp, 0,
-              parameters.getNumberOfSortColumns());
-      parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
-    }
-  }
-
-  public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
-      String tableName, int dimColCount, int complexDimColCount, int measureColCount,
-      int noDictionaryCount, String partitionID, String segmentId, String taskNo,
-      boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
-    SortParameters parameters = new SortParameters();
-    CarbonProperties carbonProperties = CarbonProperties.getInstance();
-    parameters.setDatabaseName(databaseName);
-    parameters.setTableName(tableName);
-    parameters.setPartitionID(partitionID);
-    parameters.setSegmentId(segmentId);
-    parameters.setTaskNo(taskNo);
-    parameters.setMeasureColCount(measureColCount);
-    parameters.setDimColCount(dimColCount - complexDimColCount);
-    parameters.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
-    parameters.setNoDictionaryCount(noDictionaryCount);
-    parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
-    parameters.setComplexDimColCount(complexDimColCount);
-    parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
-    parameters.setObserver(new SortObserver());
-    // get sort buffer size
-    parameters.setSortBufferSize(Integer.parseInt(carbonProperties
-        .getProperty(CarbonCommonConstants.SORT_SIZE,
-            CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
-    LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
-    // set number of intermedaite file to merge
-    parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
-        .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
-            CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
-
-    LOGGER.info("Number of intermediate file to be merged: " + parameters
-        .getNumberOfIntermediateFileToBeMerged());
-
-    // get file buffer size
-    parameters.setFileBufferSize(CarbonDataProcessorUtil
-        .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
-            CarbonCommonConstants.CONSTANT_SIZE_TEN));
-
-    LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
-
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId,
-            isCompactionFlow, false);
-    String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
-        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    parameters.setTempFileLocation(sortTempDirs);
-    LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
-
-    int numberOfCores;
-    try {
-      numberOfCores = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-      numberOfCores = numberOfCores / 2;
-    } catch (NumberFormatException exc) {
-      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-    }
-    parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
-
-    parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
-            CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
-
-    parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
-        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
-
-    int sortTempFileNoOFRecordsInCompression;
-    try {
-      sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ":Only Positive Integer value(greater than zero) is allowed.Default value will "
-            + "be used");
-
-        sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed. Default value will be used");
-
-      sortTempFileNoOFRecordsInCompression = Integer
-          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
-    parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
-
-    if (parameters.isSortFileCompressionEnabled()) {
-      LOGGER.info("Compression will be used for writing the sort temp File");
-    }
-
-    parameters.setPrefetch(CarbonCommonConstants. CARBON_PREFETCH_IN_MERGE_VALUE);
-    parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
-        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
-        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
-
-    DataType[] type = CarbonDataProcessorUtil
-        .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
-            parameters.getTableName());
-    parameters.setMeasureDataType(type);
-    setNoDictionarySortColumnMapping(parameters);
-    return parameters;
-  }
-
-}


Mime
View raw message