carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [14/50] [abbrv] carbondata git commit: [CARBONDATA-1839] [DataLoad] Fix bugs and optimize in compressing sort temp files
Date Tue, 09 Jan 2018 04:01:42 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 5b9e091..57a19bd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -17,10 +17,8 @@
 
 package org.apache.carbondata.processing.sort.sortdata;
 
-import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
@@ -33,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -212,7 +211,7 @@ public class SortDataRows {
       File file = new File(
           locationChosen + File.separator + parameters.getTableName() +
               System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
-      writeDataTofile(recordHolderList, this.entryCount, file);
+      writeDataToFile(recordHolderList, this.entryCount, file);
 
     }
 
@@ -225,42 +224,13 @@ public class SortDataRows {
    *
    * @throws CarbonSortKeyAndGroupByException problem while writing
    */
-  private void writeDataTofile(Object[][] recordHolderList, int entryCountLocal, File file)
-      throws CarbonSortKeyAndGroupByException {
-    // stream
-    if (parameters.isSortFileCompressionEnabled() || parameters.isPrefetch()) {
-      writeSortTempFile(recordHolderList, entryCountLocal, file);
-      return;
-    }
-    writeData(recordHolderList, entryCountLocal, file);
-  }
-
-  private void writeSortTempFile(Object[][] recordHolderList, int entryCountLocal, File file)
-      throws CarbonSortKeyAndGroupByException {
-    TempSortFileWriter writer = null;
-
-    try {
-      writer = getWriter();
-      writer.initiaize(file, entryCountLocal);
-      writer.writeSortTempFile(recordHolderList);
-    } catch (CarbonSortKeyAndGroupByException e) {
-      LOGGER.error(e, "Problem while writing the sort temp file");
-      throw e;
-    } finally {
-      if (writer != null) {
-        writer.finish();
-      }
-    }
-  }
-
-  private void writeData(Object[][] recordHolderList, int entryCountLocal, File file)
+  private void writeDataToFile(Object[][] recordHolderList, int entryCountLocal, File file)
       throws CarbonSortKeyAndGroupByException {
     DataOutputStream stream = null;
     try {
       // open stream
-      stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
-          parameters.getFileWriteBufferSize()));
-
+      stream = FileFactory.getDataOutputStream(file.getPath(), FileFactory.FileType.LOCAL,
+          parameters.getFileWriteBufferSize(), parameters.getSortTempCompressorName());
       // write number of entries to the file
       stream.writeInt(entryCountLocal);
       int complexDimColCount = parameters.getComplexDimColCount();
@@ -326,24 +296,6 @@ public class SortDataRows {
     }
   }
 
-  private TempSortFileWriter getWriter() {
-    TempSortFileWriter chunkWriter = null;
-    TempSortFileWriter writer = TempSortFileWriterFactory.getInstance()
-        .getTempSortFileWriter(parameters.isSortFileCompressionEnabled(),
-            parameters.getDimColCount(), parameters.getComplexDimColCount(),
-            parameters.getMeasureColCount(), parameters.getNoDictionaryCount(),
-            parameters.getFileWriteBufferSize());
-
-    if (parameters.isPrefetch() && !parameters.isSortFileCompressionEnabled()) {
-      chunkWriter = new SortTempFileChunkWriter(writer, parameters.getBufferSize());
-    } else {
-      chunkWriter =
-          new SortTempFileChunkWriter(writer, parameters.getSortTempFileNoOFRecordsInCompression());
-    }
-
-    return chunkWriter;
-  }
-
   /**
    * This method will be used to delete sort temp location is it is exites
    *
@@ -423,7 +375,7 @@ public class SortDataRows {
         File sortTempFile = new File(
             locationChosen + File.separator + parameters.getTableName() + System
                 .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
-        writeDataTofile(recordHolderArray, recordHolderArray.length, sortTempFile);
+        writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile);
         // add sort temp filename to and arrayList. When the list size reaches 20 then
         // intermediate merging of sort temp files will be triggered
         intermediateFileMerger.addFileToMerge(sortTempFile);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 4da4c84..a2248ee 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -71,14 +71,7 @@ public class SortParameters implements Serializable {
    * observer
    */
   private SortObserver observer;
-  /**
-   * sortTempFileNoOFRecordsInCompression
-   */
-  private int sortTempFileNoOFRecordsInCompression;
-  /**
-   * isSortTempFileCompressionEnabled
-   */
-  private boolean isSortFileCompressionEnabled;
+  private String sortTempCompressorName;
   /**
    * prefetch
    */
@@ -137,8 +130,7 @@ public class SortParameters implements Serializable {
     parameters.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
     parameters.fileWriteBufferSize = fileWriteBufferSize;
     parameters.observer = observer;
-    parameters.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
-    parameters.isSortFileCompressionEnabled = isSortFileCompressionEnabled;
+    parameters.sortTempCompressorName = sortTempCompressorName;
     parameters.prefetch = prefetch;
     parameters.bufferSize = bufferSize;
     parameters.databaseName = databaseName;
@@ -229,20 +221,12 @@ public class SortParameters implements Serializable {
     this.observer = observer;
   }
 
-  public int getSortTempFileNoOFRecordsInCompression() {
-    return sortTempFileNoOFRecordsInCompression;
-  }
-
-  public void setSortTempFileNoOFRecordsInCompression(int sortTempFileNoOFRecordsInCompression)
{
-    this.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
+  public String getSortTempCompressorName() {
+    return sortTempCompressorName;
   }
 
-  public boolean isSortFileCompressionEnabled() {
-    return isSortFileCompressionEnabled;
-  }
-
-  public void setSortFileCompressionEnabled(boolean sortFileCompressionEnabled) {
-    isSortFileCompressionEnabled = sortFileCompressionEnabled;
+  public void setSortTempCompressorName(String sortTempCompressorName) {
+    this.sortTempCompressorName = sortTempCompressorName;
   }
 
   public boolean isPrefetch() {
@@ -425,36 +409,10 @@ public class SortParameters implements Serializable {
         .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
             CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
 
-    parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
-        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
-
-    int sortTempFileNoOFRecordsInCompression;
-    try {
-      sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ":Only Positive Integer value(greater than zero) is allowed.Default value will
"
-            + "be used");
-
-        sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed. Default value will be used");
-
-      sortTempFileNoOFRecordsInCompression = Integer
-          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
-    parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
-
-    if (parameters.isSortFileCompressionEnabled()) {
-      LOGGER.info("Compression will be used for writing the sort temp File");
+    parameters.setSortTempCompressorName(CarbonProperties.getInstance().getSortTempCompressor());
+    if (!parameters.sortTempCompressorName.isEmpty()) {
+      LOGGER.info(" Compression " + parameters.sortTempCompressorName
+          + " will be used for writing the sort temp File");
     }
 
     parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
@@ -538,36 +496,10 @@ public class SortParameters implements Serializable {
         .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
             CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
 
-    parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
-        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
-
-    int sortTempFileNoOFRecordsInCompression;
-    try {
-      sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ":Only Positive Integer value(greater than zero) is allowed.Default value will
"
-            + "be used");
-
-        sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed. Default value will be used");
-
-      sortTempFileNoOFRecordsInCompression = Integer
-          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
-    parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
-
-    if (parameters.isSortFileCompressionEnabled()) {
-      LOGGER.info("Compression will be used for writing the sort temp File");
+    parameters.setSortTempCompressorName(CarbonProperties.getInstance().getSortTempCompressor());
+    if (!parameters.sortTempCompressorName.isEmpty()) {
+      LOGGER.info(" Compression " + parameters.sortTempCompressorName
+          + " will be used for writing the sort temp File");
     }
 
     parameters.setPrefetch(CarbonCommonConstants. CARBON_PREFETCH_IN_MERGE_VALUE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 2f87cf7..d726539 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -17,10 +17,8 @@
 
 package org.apache.carbondata.processing.sort.sortdata;
 
-import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.concurrent.Callable;
@@ -31,6 +29,7 @@ import java.util.concurrent.Future;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
@@ -73,26 +72,15 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    * return row
    */
   private Object[] returnRow;
-
-  /**
-   * number of measures
-   */
-  private int measureCount;
-
-  /**
-   * number of dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * number of complexDimensionCount
-   */
-  private int complexDimensionCount;
-
-  /**
-   * fileBufferSize for file reader stream size
-   */
-  private int fileBufferSize;
+  private int dimCnt;
+  private int noDictDimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private boolean[] isNoDictionarySortColumn;
+  private DataType[] measureDataTypes;
+  private int readBufferSize;
+  private String compressorName;
 
   private Object[][] currentBuffer;
 
@@ -113,67 +101,32 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   private int prefetchRecordsProceesed;
 
   /**
-   * sortTempFileNoOFRecordsInCompression
-   */
-  private int sortTempFileNoOFRecordsInCompression;
-
-  /**
-   * isSortTempFileCompressionEnabled
-   */
-  private boolean isSortTempFileCompressionEnabled;
-
-  /**
    * totalRecordFetch
    */
   private int totalRecordFetch;
 
-  private int noDictionaryCount;
-
-  private DataType[] measureDataTypes;
-
-  /**
-   * to store whether dimension is of dictionary type or not
-   */
-  private boolean[] isNoDictionaryDimensionColumn;
-
-  /**
-   * to store whether sort column is of dictionary type or not
-   */
-  private boolean[] isNoDictionarySortColumn;
-
   /**
    * Constructor to initialize
    *
    * @param tempFile
-   * @param dimensionCount
-   * @param complexDimensionCount
-   * @param measureCount
-   * @param fileBufferSize
-   * @param noDictionaryCount
-   * @param measureDataTypes
-   * @param isNoDictionaryDimensionColumn
+   * @param sortParameters
+   * @param tableName
    */
-  public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
-      int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] measureDataTypes,
-      boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn,
-      String tableName) {
+  public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName)
{
     // set temp file
     this.tempFile = tempFile;
+    this.dimCnt = sortParameters.getDimColCount();
+    this.noDictDimCnt = sortParameters.getNoDictionaryCount();
+    this.complexCnt = sortParameters.getComplexDimColCount();
+    this.measureCnt = sortParameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = sortParameters.getNoDictionaryDimnesionColumn();
+    this.isNoDictionarySortColumn = sortParameters.getNoDictionarySortColumn();
+    this.measureDataTypes = sortParameters.getMeasureDataType();
+    this.readBufferSize = sortParameters.getBufferSize();
+    this.compressorName = sortParameters.getSortTempCompressorName();
 
-    // set measure and dimension count
-    this.measureCount = measureCount;
-    this.dimensionCount = dimensionCount;
-    this.complexDimensionCount = complexDimensionCount;
-
-    this.noDictionaryCount = noDictionaryCount;
-    // set mdkey length
-    this.fileBufferSize = fileBufferSize;
     this.executorService = Executors
         .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
-    this.measureDataTypes = measureDataTypes;
-
-    this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
-    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
   }
 
   /**
@@ -188,44 +141,14 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     bufferSize = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
             CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
-    this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
-    if (this.isSortTempFileCompressionEnabled) {
-      LOGGER.info("Compression was used while writing the sortTempFile");
-    }
-
-    try {
-      this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (this.sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ": Only Positive Integer value(greater than zero) is allowed.Default value
will"
-            + " be used");
-
-        this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed.Default value will be used");
-      this.sortTempFileNoOFRecordsInCompression = Integer
-          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
 
     initialise();
   }
 
   private void initialise() throws CarbonSortKeyAndGroupByException {
     try {
-      if (isSortTempFileCompressionEnabled) {
-        this.bufferSize = sortTempFileNoOFRecordsInCompression;
-      }
-      stream = new DataInputStream(
-          new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
+      stream = FileFactory.getDataInputStream(tempFile.getPath(), FileFactory.FileType.LOCAL,
+          readBufferSize, compressorName);
       this.entryCount = stream.readInt();
       if (prefetch) {
         new DataFetcher(false).call();
@@ -233,12 +156,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
         if (totalRecordFetch < this.entryCount) {
           submit = executorService.submit(new DataFetcher(true));
         }
-      } else {
-        if (isSortTempFileCompressionEnabled) {
-          new DataFetcher(false).call();
-        }
       }
-
     } catch (FileNotFoundException e) {
       LOGGER.error(e);
       throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
@@ -259,19 +177,6 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   public void readRow() throws CarbonSortKeyAndGroupByException {
     if (prefetch) {
       fillDataForPrefetch();
-    } else if (isSortTempFileCompressionEnabled) {
-      if (bufferRowCounter >= bufferSize) {
-        try {
-          new DataFetcher(false).call();
-          bufferRowCounter = 0;
-        } catch (Exception e) {
-          LOGGER.error(e);
-          throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading",
e);
-        }
-
-      }
-      prefetchRecordsProceesed++;
-      returnRow = currentBuffer[bufferRowCounter++];
     } else {
       this.returnRow = getRowFromStream();
     }
@@ -317,9 +222,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     Object[] holder = new Object[3];
     int index = 0;
     int nonDicIndex = 0;
-    int[] dim = new int[this.dimensionCount - this.noDictionaryCount];
-    byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][];
-    Object[] measures = new Object[this.measureCount];
+    int[] dim = new int[dimCnt - noDictDimCnt];
+    byte[][] nonDicArray = new byte[noDictDimCnt + complexCnt][];
+    Object[] measures = new Object[measureCnt];
     try {
       // read dimension values
       for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
@@ -333,7 +238,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
         }
       }
 
-      for (int i = 0; i < complexDimensionCount; i++) {
+      for (int i = 0; i < complexCnt; i++) {
         short len = stream.readShort();
         byte[] array = new byte[len];
         stream.readFully(array);
@@ -342,7 +247,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
 
       index = 0;
       // read measure values
-      for (int i = 0; i < this.measureCount; i++) {
+      for (int i = 0; i < measureCnt; i++) {
         if (stream.readByte() == 1) {
           DataType dataType = measureDataTypes[i];
           if (dataType == DataTypes.BOOLEAN) {
@@ -361,7 +266,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
             stream.readFully(buff);
             measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
           } else {
-            throw new IllegalArgumentException("unsupported data type:" + measureDataTypes[i]);
+            throw new IllegalArgumentException("unsupported data type:" + dataType);
           }
         } else {
           measures[index++] = null;
@@ -397,7 +302,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    * @return more row present in file
    */
   public boolean hasNext() {
-    if (prefetch || isSortTempFileCompressionEnabled) {
+    if (prefetch) {
       return this.prefetchRecordsProceesed < this.entryCount;
     }
     return this.numberOfObjectRead < this.entryCount;
@@ -467,10 +372,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
 
   @Override public int hashCode() {
     int hash = 0;
-    hash += 31 * measureCount;
-    hash += 31 * dimensionCount;
-    hash += 31 * complexDimensionCount;
-    hash += 31 * noDictionaryCount;
+    hash += 31 * measureCnt;
+    hash += 31 * dimCnt;
+    hash += 31 * complexCnt;
     hash += tempFile.hashCode();
     return hash;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
deleted file mode 100644
index 025aef8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.io.File;
-
-import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-
-public class SortTempFileChunkWriter implements TempSortFileWriter {
-  /**
-   * writer
-   */
-  private TempSortFileWriter writer;
-
-  /**
-   * recordPerLeaf
-   */
-  private int recordPerLeaf;
-
-  /**
-   * CarbonCompressedSortTempFileChunkWriter
-   *
-   * @param writer
-   */
-  public SortTempFileChunkWriter(TempSortFileWriter writer, int recordPerLeaf) {
-    this.writer = writer;
-    this.recordPerLeaf = recordPerLeaf;
-  }
-
-  /**
-   * initialize
-   */
-  public void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException
{
-    this.writer.initiaize(file, entryCount);
-  }
-
-  /**
-   * finish
-   */
-  public void finish() {
-    this.writer.finish();
-  }
-
-  /**
-   * Below method will be used to write the sort temp file chunk by chunk
-   */
-  public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException
{
-    int recordCount = 0;
-    Object[][] tempRecords;
-    while (recordCount < records.length) {
-      if (records.length - recordCount < recordPerLeaf) {
-        recordPerLeaf = records.length - recordCount;
-      }
-      tempRecords = new Object[recordPerLeaf][];
-      System.arraycopy(records, recordCount, tempRecords, 0, recordPerLeaf);
-      recordCount += recordPerLeaf;
-      this.writer.writeSortTempFile(tempRecords);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
deleted file mode 100644
index 0de9af7..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-public interface TempSortFileReader {
-  /**
-   * below method will be used to close the file holder
-   */
-  void finish();
-
-  /**
-   * Below method will be used to get the row
-   */
-  Object[][] getRow();
-
-  /**
-   * Below method will be used to get the total row count in temp file
-   *
-   * @return
-   */
-  int getEntryCount();
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
deleted file mode 100644
index 4e4a8e7..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.io.File;
-
-import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-
-public interface TempSortFileWriter {
-  /**
-   * Method will be used to initialize
-   *
-   * @param file
-   * @param entryCount
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException;
-
-  /**
-   * Method will be used to finish
-   */
-  void finish();
-
-  /**
-   * Below method will be used to write the sort temp file
-   *
-   * @param records
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
deleted file mode 100644
index 259ab9f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-public final class TempSortFileWriterFactory {
-  private static final TempSortFileWriterFactory WRITERFACTORY = new TempSortFileWriterFactory();
-
-  private TempSortFileWriterFactory() {
-
-  }
-
-  public static TempSortFileWriterFactory getInstance() {
-    return WRITERFACTORY;
-  }
-
-  public TempSortFileWriter getTempSortFileWriter(boolean isCompressionEnabled, int dimensionCount,
-      int complexDimensionCount, int measureCount, int noDictionaryCount, int writeBufferSize)
{
-    if (isCompressionEnabled) {
-      return new CompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
-          noDictionaryCount, writeBufferSize);
-    } else {
-      return new UnCompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
-          noDictionaryCount, writeBufferSize);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
deleted file mode 100644
index 40fe8d5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-
-public class UnCompressedTempSortFileWriter extends AbstractTempSortFileWriter {
-
-  /**
-   * UnCompressedTempSortFileWriter
-   *
-   * @param writeBufferSize
-   * @param dimensionCount
-   * @param measureCount
-   */
-  public UnCompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
-      int measureCount, int noDictionaryCount, int writeBufferSize) {
-    super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
-  }
-
-  public static void writeDataOutputStream(Object[][] records, DataOutputStream dataOutputStream,
-      int measureCount, int dimensionCount, int noDictionaryCount, int complexDimensionCount)
-      throws IOException {
-    Object[] row;
-    for (int recordIndex = 0; recordIndex < records.length; recordIndex++) {
-      row = records[recordIndex];
-      int fieldIndex = 0;
-
-      for (int counter = 0; counter < dimensionCount; counter++) {
-        dataOutputStream.writeInt((Integer) NonDictionaryUtil.getDimension(fieldIndex++,
row));
-      }
-
-      //write byte[] of high card dims
-      if (noDictionaryCount > 0) {
-        dataOutputStream.write(NonDictionaryUtil.getByteArrayForNoDictionaryCols(row));
-      }
-      fieldIndex = 0;
-      for (int counter = 0; counter < complexDimensionCount; counter++) {
-        int complexByteArrayLength = ((byte[]) row[fieldIndex]).length;
-        dataOutputStream.writeInt(complexByteArrayLength);
-        dataOutputStream.write(((byte[]) row[fieldIndex++]));
-      }
-
-      for (int counter = 0; counter < measureCount; counter++) {
-        if (null != row[fieldIndex]) {
-          dataOutputStream.write((byte) 1);
-          dataOutputStream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex,
row));
-        } else {
-          dataOutputStream.write((byte) 0);
-        }
-
-        fieldIndex++;
-      }
-
-    }
-  }
-
-  /**
-   * Below method will be used to write the sort temp file
-   *
-   * @param records
-   */
-  public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException
{
-    ByteArrayOutputStream blockDataArray = null;
-    DataOutputStream dataOutputStream = null;
-    int totalSize = 0;
-    int recordSize = 0;
-    try {
-      recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
-          * CarbonCommonConstants.INT_SIZE_IN_BYTE);
-      totalSize = records.length * recordSize;
-
-      blockDataArray = new ByteArrayOutputStream(totalSize);
-      dataOutputStream = new DataOutputStream(blockDataArray);
-
-      writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
-          noDictionaryCount, complexDimensionCount);
-      stream.writeInt(records.length);
-      byte[] byteArray = blockDataArray.toByteArray();
-      stream.writeInt(byteArray.length);
-      stream.write(byteArray);
-
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
-    } finally {
-      CarbonUtil.closeStreams(blockDataArray);
-      CarbonUtil.closeStreams(dataOutputStream);
-    }
-  }
-}


Mime
View raw message