carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [20/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
Date Thu, 08 Mar 2018 16:55:21 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
new file mode 100644
index 0000000..0ae0b93
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+
+public class RowComparator implements Comparator<Object[]> {
+  /**
+   * noDictionaryCount represent number of no dictionary cols
+   */
+  private int noDictionaryCount;
+
+  /**
+   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
+   */
+  private boolean[] noDictionarySortColumnMaping;
+
+  /**
+   * @param noDictionarySortColumnMaping
+   * @param noDictionaryCount
+   */
+  public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) {
+    this.noDictionaryCount = noDictionaryCount;
+    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(Object[] rowA, Object[] rowB) {
+    int diff = 0;
+
+    int normalIndex = 0;
+    int noDictionaryindex = 0;
+
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+
+      if (isNoDictionary) {
+        byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+
+        ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
+
+        // extract a high card dims from complete byte[].
+        NonDictionaryUtil
+            .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1);
+
+        byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+
+        ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);
+
+        // extract a high card dims from complete byte[].
+        NonDictionaryUtil
+            .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2);
+
+        int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2);
+        if (difference != 0) {
+          return difference;
+        }
+        noDictionaryindex++;
+      } else {
+        int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA);
+        int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB);
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+        normalIndex++;
+      }
+
+    }
+
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
new file mode 100644
index 0000000..0883ae1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+
+/**
+ * This class is used as comparator for comparing dims which are non high cardinality dims.
+ * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
+ */
+public class RowComparatorForNormalDims implements Comparator<Object[]> {
+  /**
+   * dimension count
+   */
+  private int numberOfSortColumns;
+
+  /**
+   * RowComparatorForNormalDims Constructor
+   *
+   * @param numberOfSortColumns
+   */
+  public RowComparatorForNormalDims(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
+  }
+
+  /**
+   * Below method will be used to compare two surrogate keys
+   *
+   * @see Comparator#compare(Object, Object)
+   */
+  public int compare(Object[] rowA, Object[] rowB) {
+    int diff = 0;
+
+    for (int i = 0; i < numberOfSortColumns; i++) {
+
+      int dimFieldA = NonDictionaryUtil.getDimension(i, rowA);
+      int dimFieldB = NonDictionaryUtil.getDimension(i, rowB);
+
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
+      }
+    }
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index a4ac0ea..88695b9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -37,8 +37,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -73,12 +71,12 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]>
{
    * tableName
    */
   private String tableName;
-  private SortParameters sortParameters;
-  private SortStepRowHandler sortStepRowHandler;
+
   /**
    * tempFileLocation
    */
   private String[] tempFileLocation;
+  private SortParameters sortParameters;
 
   private int maxThreadForSorting;
 
@@ -91,7 +89,6 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]>
{
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
     this.sortParameters = sortParameters;
-    this.sortStepRowHandler = new SortStepRowHandler(sortParameters);
     try {
       maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
@@ -110,7 +107,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]>
{
    */
   public void startFinalMerge() throws CarbonDataWriterException {
     List<File> filesToMerge = getFilesToMergeSort();
-    if (filesToMerge.size() == 0) {
+    if (filesToMerge.size() == 0)
+    {
       LOGGER.info("No file to merge in final merge stage");
       return;
     }
@@ -127,9 +125,11 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]>
{
 
     // get all the merged files
     List<File> files = new ArrayList<File>(tempFileLocation.length);
-    for (String tempLoc : tempFileLocation) {
+    for (String tempLoc : tempFileLocation)
+    {
       File[] subFiles = new File(tempLoc).listFiles(fileFilter);
-      if (null != subFiles && subFiles.length > 0) {
+      if (null != subFiles && subFiles.length > 0)
+      {
         files.addAll(Arrays.asList(subFiles));
       }
     }
@@ -226,14 +226,13 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]>
{
   }
 
   /**
-   * This method will be used to get the sorted sort temp row from the sort temp files
+   * This method will be used to get the sorted row
    *
    * @return sorted row
    * @throws CarbonSortKeyAndGroupByException
    */
   public Object[] next() {
-    IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
-    return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+    return getSortedRecordFromFile();
   }
 
   /**
@@ -242,8 +241,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]>
{
    * @return sorted record sorted record
    * @throws CarbonSortKeyAndGroupByException
    */
-  private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException
{
-    IntermediateSortTempRow row = null;
+  private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
+    Object[] row = null;
 
     // poll the top object from heap
     // heap maintains binary tree which is based on heap condition that will

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index c7efbd9..57a19bd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -20,7 +20,7 @@ package org.apache.carbondata.processing.sort.sortdata;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -32,10 +32,12 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -67,8 +69,7 @@ public class SortDataRows {
   private Semaphore semaphore;
 
   private SortParameters parameters;
-  private SortStepRowHandler sortStepRowHandler;
-  private ThreadLocal<ByteBuffer> rowBuffer;
+
   private int sortBufferSize;
 
   private SortIntermediateFileMerger intermediateFileMerger;
@@ -78,7 +79,7 @@ public class SortDataRows {
   public SortDataRows(SortParameters parameters,
       SortIntermediateFileMerger intermediateFileMerger) {
     this.parameters = parameters;
-    this.sortStepRowHandler = new SortStepRowHandler(parameters);
+
     this.intermediateFileMerger = intermediateFileMerger;
 
     int batchSize = CarbonProperties.getInstance().getBatchSize();
@@ -86,12 +87,6 @@ public class SortDataRows {
     this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize);
     // observer of writing file in thread
     this.threadStatusObserver = new ThreadStatusObserver();
-    this.rowBuffer = new ThreadLocal<ByteBuffer>() {
-      @Override protected ByteBuffer initialValue() {
-        byte[] backedArray = new byte[2 * 1024 * 1024];
-        return ByteBuffer.wrap(backedArray);
-      }
-    };
   }
 
   /**
@@ -135,7 +130,8 @@ public class SortDataRows {
         semaphore.acquire();
         dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
       } catch (InterruptedException e) {
-        LOGGER.error(e, "exception occurred while trying to acquire a semaphore lock: ");
+        LOGGER.error(e,
+            "exception occurred while trying to acquire a semaphore lock: ");
         throw new CarbonSortKeyAndGroupByException(e);
       }
       // create the new holder Array
@@ -162,7 +158,7 @@ public class SortDataRows {
         }
         intermediateFileMerger.startMergingIfPossible();
         Object[][] recordHolderListLocal = recordHolderList;
-        sizeLeft = sortBufferSize - entryCount;
+        sizeLeft = sortBufferSize - entryCount ;
         if (sizeLeft > 0) {
           System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
         }
@@ -216,6 +212,7 @@ public class SortDataRows {
           locationChosen + File.separator + parameters.getTableName() +
               System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
       writeDataToFile(recordHolderList, this.entryCount, file);
+
     }
 
     startFileBasedMerge();
@@ -223,7 +220,7 @@ public class SortDataRows {
   }
 
   /**
-   * Below method will be used to write data to sort temp file
+   * Below method will be used to write data to file
    *
    * @throws CarbonSortKeyAndGroupByException problem while writing
    */
@@ -236,9 +233,60 @@ public class SortDataRows {
           parameters.getFileWriteBufferSize(), parameters.getSortTempCompressorName());
       // write number of entries to the file
       stream.writeInt(entryCountLocal);
+      int complexDimColCount = parameters.getComplexDimColCount();
+      int dimColCount = parameters.getDimColCount() + complexDimColCount;
+      DataType[] type = parameters.getMeasureDataType();
+      boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
+      Object[] row = null;
       for (int i = 0; i < entryCountLocal; i++) {
-        sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToOutputStream(
-            recordHolderList[i], stream, rowBuffer.get());
+        // get row from record holder list
+        row = recordHolderList[i];
+        int dimCount = 0;
+        // write dictionary and non dictionary dimensions here.
+        for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) {
+          if (noDictionaryDimnesionMapping[dimCount]) {
+            byte[] col = (byte[]) row[dimCount];
+            stream.writeShort(col.length);
+            stream.write(col);
+          } else {
+            stream.writeInt((int)row[dimCount]);
+          }
+        }
+        // write complex dimensions here.
+        for (; dimCount < dimColCount; dimCount++) {
+          byte[] value = (byte[])row[dimCount];
+          stream.writeShort(value.length);
+          stream.write(value);
+        }
+        // as measures are stored in separate array.
+        for (int mesCount = 0;
+             mesCount < parameters.getMeasureColCount(); mesCount++) {
+          Object value = row[mesCount + dimColCount];
+          if (null != value) {
+            stream.write((byte) 1);
+            DataType dataType = type[mesCount];
+            if (dataType == DataTypes.BOOLEAN) {
+              stream.writeBoolean((boolean) value);
+            } else if (dataType == DataTypes.SHORT) {
+              stream.writeShort((Short) value);
+            } else if (dataType == DataTypes.INT) {
+              stream.writeInt((Integer) value);
+            } else if (dataType == DataTypes.LONG) {
+              stream.writeLong((Long) value);
+            } else if (dataType == DataTypes.DOUBLE) {
+              stream.writeDouble((Double) value);
+            } else if (DataTypes.isDecimal(dataType)) {
+              BigDecimal val = (BigDecimal) value;
+              byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+              stream.writeInt(bigDecimalInBytes.length);
+              stream.write(bigDecimalInBytes);
+            } else {
+              throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
+            }
+          } else {
+            stream.write((byte) 0);
+          }
+        }
       }
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
@@ -253,7 +301,7 @@ public class SortDataRows {
    *
    * @throws CarbonSortKeyAndGroupByException
    */
-  private void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
+  public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
     CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
   }
 
@@ -332,8 +380,7 @@ public class SortDataRows {
         // intermediate merging of sort temp files will be triggered
         intermediateFileMerger.addFileToMerge(sortTempFile);
         LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is:
" + (
-            System.currentTimeMillis() - startTime) + ", sort temp file size in MB is "
-            + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
+            System.currentTimeMillis() - startTime));
       } catch (Throwable e) {
         try {
           threadStatusObserver.notifyFailed(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 7e221a7..d726539 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -21,7 +21,6 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Comparator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -31,11 +30,14 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
 public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder>
{
@@ -69,13 +71,20 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   /**
    * return row
    */
-  private IntermediateSortTempRow returnRow;
+  private Object[] returnRow;
+  private int dimCnt;
+  private int noDictDimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private boolean[] isNoDictionarySortColumn;
+  private DataType[] measureDataTypes;
   private int readBufferSize;
   private String compressorName;
 
-  private IntermediateSortTempRow[] currentBuffer;
+  private Object[][] currentBuffer;
 
-  private IntermediateSortTempRow[] backupBuffer;
+  private Object[][] backupBuffer;
 
   private boolean isBackupFilled;
 
@@ -95,9 +104,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    * totalRecordFetch
    */
   private int totalRecordFetch;
-  private TableFieldStat tableFieldStat;
-  private SortStepRowHandler sortStepRowHandler;
-  private Comparator<IntermediateSortTempRow> comparator;
+
   /**
    * Constructor to initialize
    *
@@ -108,12 +115,16 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName)
{
     // set temp file
     this.tempFile = tempFile;
+    this.dimCnt = sortParameters.getDimColCount();
+    this.noDictDimCnt = sortParameters.getNoDictionaryCount();
+    this.complexCnt = sortParameters.getComplexDimColCount();
+    this.measureCnt = sortParameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = sortParameters.getNoDictionaryDimnesionColumn();
+    this.isNoDictionarySortColumn = sortParameters.getNoDictionarySortColumn();
+    this.measureDataTypes = sortParameters.getMeasureDataType();
     this.readBufferSize = sortParameters.getBufferSize();
     this.compressorName = sortParameters.getSortTempCompressorName();
-    this.tableFieldStat = new TableFieldStat(sortParameters);
-    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
-    this.comparator = new IntermediateSortTempRowComparator(
-        tableFieldStat.getIsSortColNoDictFlags());
+
     this.executorService = Executors
         .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
   }
@@ -167,12 +178,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     if (prefetch) {
       fillDataForPrefetch();
     } else {
-      try {
-        this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
-        this.numberOfObjectRead++;
-      } catch (IOException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while reading rows", e);
-      }
+      this.returnRow = getRowFromStream();
     }
   }
 
@@ -206,28 +212,86 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   }
 
   /**
-   * Read a batch of row from stream
-   *
+   * Reads row from file
    * @return Object[]
-   * @throws IOException if error occurs while reading from stream
+   * @throws CarbonSortKeyAndGroupByException
    */
-  private IntermediateSortTempRow[] readBatchedRowFromStream(int expected) throws IOException
{
-    IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
-    for (int i = 0; i < expected; i++) {
-      IntermediateSortTempRow holder
-          = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
-      holders[i] = holder;
+  private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
+    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+
+    Object[] holder = new Object[3];
+    int index = 0;
+    int nonDicIndex = 0;
+    int[] dim = new int[dimCnt - noDictDimCnt];
+    byte[][] nonDicArray = new byte[noDictDimCnt + complexCnt][];
+    Object[] measures = new Object[measureCnt];
+    try {
+      // read dimension values
+      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+        if (isNoDictionaryDimensionColumn[i]) {
+          short len = stream.readShort();
+          byte[] array = new byte[len];
+          stream.readFully(array);
+          nonDicArray[nonDicIndex++] = array;
+        } else {
+          dim[index++] = stream.readInt();
+        }
+      }
+
+      for (int i = 0; i < complexCnt; i++) {
+        short len = stream.readShort();
+        byte[] array = new byte[len];
+        stream.readFully(array);
+        nonDicArray[nonDicIndex++] = array;
+      }
+
+      index = 0;
+      // read measure values
+      for (int i = 0; i < measureCnt; i++) {
+        if (stream.readByte() == 1) {
+          DataType dataType = measureDataTypes[i];
+          if (dataType == DataTypes.BOOLEAN) {
+            measures[index++] = stream.readBoolean();
+          } else if (dataType == DataTypes.SHORT) {
+            measures[index++] = stream.readShort();
+          } else if (dataType == DataTypes.INT) {
+            measures[index++] = stream.readInt();
+          } else if (dataType == DataTypes.LONG) {
+            measures[index++] = stream.readLong();
+          } else if (dataType == DataTypes.DOUBLE) {
+            measures[index++] = stream.readDouble();
+          } else if (DataTypes.isDecimal(dataType)) {
+            int len = stream.readInt();
+            byte[] buff = new byte[len];
+            stream.readFully(buff);
+            measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
+          } else {
+            throw new IllegalArgumentException("unsupported data type:" + dataType);
+          }
+        } else {
+          measures[index++] = null;
+        }
+      }
+
+      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+
+      // increment number if record read
+      this.numberOfObjectRead++;
+    } catch (IOException e) {
+      LOGGER.error("Problme while reading the madkey fom sort temp file");
+      throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file
", e);
     }
-    this.numberOfObjectRead += expected;
-    return holders;
+
+    //return out row
+    return holder;
   }
 
   /**
-   * below method will be used to get the sort temp row
+   * below method will be used to get the row
    *
    * @return row
    */
-  public IntermediateSortTempRow getRow() {
+  public Object[] getRow() {
     return this.returnRow;
   }
 
@@ -266,7 +330,31 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   }
 
   @Override public int compareTo(SortTempFileChunkHolder other) {
-    return comparator.compare(returnRow, other.getRow());
+    int diff = 0;
+    int index = 0;
+    int noDictionaryIndex = 0;
+    int[] leftMdkArray = (int[]) returnRow[0];
+    int[] rightMdkArray = (int[]) other.returnRow[0];
+    byte[][] leftNonDictArray = (byte[][]) returnRow[1];
+    byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
+    for (boolean isNoDictionary : isNoDictionarySortColumn) {
+      if (isNoDictionary) {
+        diff = UnsafeComparer.INSTANCE
+            .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);
+        if (diff != 0) {
+          return diff;
+        }
+        noDictionaryIndex++;
+      } else {
+        diff = leftMdkArray[index] - rightMdkArray[index];
+        if (diff != 0) {
+          return diff;
+        }
+        index++;
+      }
+
+    }
+    return diff;
   }
 
   @Override public boolean equals(Object obj) {
@@ -284,7 +372,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
 
   @Override public int hashCode() {
     int hash = 0;
-    hash += tableFieldStat.hashCode();
+    hash += 31 * measureCnt;
+    hash += 31 * dimCnt;
+    hash += 31 * complexCnt;
     hash += tempFile.hashCode();
     return hash;
   }
@@ -324,12 +414,16 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   /**
    * This method will read the records from sort temp file and keep it in a buffer
    *
-   * @param numberOfRecords number of records to be read
-   * @return batch of intermediate sort temp row
-   * @throws IOException if error occurs while reading reading records
+   * @param numberOfRecords
+   * @return
+   * @throws CarbonSortKeyAndGroupByException
    */
-  private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords)
-      throws IOException {
-    return readBatchedRowFromStream(numberOfRecords);
+  private Object[][] prefetchRecordsFromFile(int numberOfRecords)
+      throws CarbonSortKeyAndGroupByException {
+    Object[][] records = new Object[numberOfRecords][];
+    for (int i = 0; i < numberOfRecords; i++) {
+      records[i] = getRowFromStream();
+    }
+    return records;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
deleted file mode 100644
index 0d1303a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * This class is used to hold field information for a table during data loading. These information
- * will be used to convert/construct/destruct row in sort process step. Because complex field
is
- * processed the same as no-dict-no-sort-simple-dimension, so we treat them as the same and
use
- * `no-dict-no-sort-dim` related variable to represent them in this class.
- */
-public class TableFieldStat implements Serializable {
-  private static final long serialVersionUID = 201712070950L;
-  private int dictSortDimCnt = 0;
-  private int dictNoSortDimCnt = 0;
-  private int noDictSortDimCnt = 0;
-  private int noDictNoSortDimCnt = 0;
-  // whether sort column is of dictionary type or not
-  private boolean[] isSortColNoDictFlags;
-  private int measureCnt;
-  private DataType[] measureDataType;
-
-  // indices for dict & sort dimension columns
-  private int[] dictSortDimIdx;
-  // indices for dict & no-sort dimension columns
-  private int[] dictNoSortDimIdx;
-  // indices for no-dict & sort dimension columns
-  private int[] noDictSortDimIdx;
-  // indices for no-dict & no-sort dimension columns, including complex columns
-  private int[] noDictNoSortDimIdx;
-  // indices for measure columns
-  private int[] measureIdx;
-
-  public TableFieldStat(SortParameters sortParameters) {
-    int noDictDimCnt = sortParameters.getNoDictionaryCount();
-    int complexDimCnt = sortParameters.getComplexDimColCount();
-    int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
-    this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
-    int sortColCnt = isSortColNoDictFlags.length;
-    for (boolean flag : isSortColNoDictFlags) {
-      if (flag) {
-        noDictSortDimCnt++;
-      } else {
-        dictSortDimCnt++;
-      }
-    }
-    this.measureCnt = sortParameters.getMeasureColCount();
-    this.measureDataType = sortParameters.getMeasureDataType();
-
-    // be careful that the default value is 0
-    this.dictSortDimIdx = new int[dictSortDimCnt];
-    this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt];
-    this.noDictSortDimIdx = new int[noDictSortDimCnt];
-    this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt];
-    this.measureIdx = new int[measureCnt];
-
-    int tmpNoDictSortCnt = 0;
-    int tmpNoDictNoSortCnt = 0;
-    int tmpDictSortCnt = 0;
-    int tmpDictNoSortCnt = 0;
-    boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
-
-    for (int i = 0; i < isDimNoDictFlags.length; i++) {
-      if (isDimNoDictFlags[i]) {
-        if (i < sortColCnt && isSortColNoDictFlags[i]) {
-          noDictSortDimIdx[tmpNoDictSortCnt++] = i;
-        } else {
-          noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
-        }
-      } else {
-        if (i < sortColCnt && !isSortColNoDictFlags[i]) {
-          dictSortDimIdx[tmpDictSortCnt++] = i;
-        } else {
-          dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
-        }
-      }
-    }
-    dictNoSortDimCnt = tmpDictNoSortCnt;
-
-    int base = isDimNoDictFlags.length;
-    // adding complex dimension columns
-    for (int i = 0; i < complexDimCnt; i++) {
-      noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = base + i;
-    }
-    noDictNoSortDimCnt = tmpNoDictNoSortCnt;
-
-    base += complexDimCnt;
-    // indices for measure columns
-    for (int i = 0; i < measureCnt; i++) {
-      measureIdx[i] = base + i;
-    }
-  }
-
-  public int getDictSortDimCnt() {
-    return dictSortDimCnt;
-  }
-
-  public int getDictNoSortDimCnt() {
-    return dictNoSortDimCnt;
-  }
-
-  public int getNoDictSortDimCnt() {
-    return noDictSortDimCnt;
-  }
-
-  public int getNoDictNoSortDimCnt() {
-    return noDictNoSortDimCnt;
-  }
-
-  public boolean[] getIsSortColNoDictFlags() {
-    return isSortColNoDictFlags;
-  }
-
-  public int getMeasureCnt() {
-    return measureCnt;
-  }
-
-  public DataType[] getMeasureDataType() {
-    return measureDataType;
-  }
-
-  public int[] getDictSortDimIdx() {
-    return dictSortDimIdx;
-  }
-
-  public int[] getDictNoSortDimIdx() {
-    return dictNoSortDimIdx;
-  }
-
-  public int[] getNoDictSortDimIdx() {
-    return noDictSortDimIdx;
-  }
-
-  public int[] getNoDictNoSortDimIdx() {
-    return noDictNoSortDimIdx;
-  }
-
-  public int[] getMeasureIdx() {
-    return measureIdx;
-  }
-
-  @Override public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof TableFieldStat)) return false;
-    TableFieldStat that = (TableFieldStat) o;
-    return dictSortDimCnt == that.dictSortDimCnt
-        && dictNoSortDimCnt == that.dictNoSortDimCnt
-        && noDictSortDimCnt == that.noDictSortDimCnt
-        && noDictNoSortDimCnt == that.noDictNoSortDimCnt
-        && measureCnt == that.measureCnt;
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt,
-        noDictNoSortDimCnt, measureCnt);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 64e50b0..efd715c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -393,8 +392,7 @@ public final class CarbonDataProcessorUtil {
    *
    * @return data directory path
    */
-  public static String createCarbonStoreLocation(String factStoreLocation,
-      String databaseName, String tableName, String segmentId) {
+  public static String createCarbonStoreLocation(String databaseName, String tableName, String
segmentId) {
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 3302094..70c26ab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -433,15 +433,6 @@ public final class CarbonLoaderUtil {
 
   }
 
-  public static String readCurrentTime() {
-    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    String date = null;
-
-    date = sdf.format(new Date());
-
-    return date;
-  }
-
   public static boolean isValidEscapeSequence(String escapeChar) {
     return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) ||
         escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||


Mime
View raw message