carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [21/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
Date Thu, 08 Mar 2018 16:55:22 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 7ea5cb3..e5583c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -19,20 +19,35 @@ package org.apache.carbondata.processing.loading.sort.unsafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.math.BigDecimal;
+import java.util.Arrays;
 
+import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
  */
 public class UnsafeCarbonRowPage {
+
+  private boolean[] noDictionaryDimensionMapping;
+
+  private boolean[] noDictionarySortColumnMapping;
+
+  private int dimensionSize;
+
+  private int measureSize;
+
+  private DataType[] measureDataType;
+
+  private long[] nullSetWords;
+
   private IntPointerBuffer buffer;
 
   private int lastSize;
@@ -47,14 +62,16 @@ public class UnsafeCarbonRowPage {
 
   private long taskId;
 
-  private TableFieldStat tableFieldStat;
-  private SortStepRowHandler sortStepRowHandler;
-
-  public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
-      boolean saveToDisk, long taskId) {
-    this.tableFieldStat = tableFieldStat;
-    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
+      MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
+    this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+    this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
+    this.dimensionSize = dimensionSize;
+    this.measureSize = measureSize;
+    this.measureDataType = type;
     this.saveToDisk = saveToDisk;
+    this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
     this.taskId = taskId;
     buffer = new IntPointerBuffer(this.taskId);
     this.dataBlock = memoryBlock;
@@ -63,44 +80,255 @@ public class UnsafeCarbonRowPage {
     this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
   }
 
-  public int addRow(Object[] row, ByteBuffer rowBuffer) {
-    int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer);
+  public int addRow(Object[] row) {
+    int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
     buffer.set(lastSize);
     lastSize = lastSize + size;
     return size;
   }
 
-  /**
-   * add raw row as intermidiate sort temp row to page
-   *
-   * @param row
-   * @param address
-   * @return
-   */
-  private int addRow(Object[] row, long address, ByteBuffer rowBuffer) {
-    return sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row,
-        dataBlock.getBaseObject(), address, rowBuffer);
+  private int addRow(Object[] row, long address) {
+    if (row == null) {
+      throw new RuntimeException("Row is null ??");
+    }
+    int dimCount = 0;
+    int size = 0;
+    Object baseObject = dataBlock.getBaseObject();
+    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+      if (noDictionaryDimensionMapping[dimCount]) {
+        byte[] col = (byte[]) row[dimCount];
+        CarbonUnsafe.getUnsafe()
+            .putShort(baseObject, address + size, (short) col.length);
+        size += 2;
+        CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+            address + size, col.length);
+        size += col.length;
+      } else {
+        int value = (int) row[dimCount];
+        CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
+        size += 4;
+      }
+    }
+
+    // write complex dimensions here.
+    for (; dimCount < dimensionSize; dimCount++) {
+      byte[] col = (byte[]) row[dimCount];
+      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length);
+      size += 2;
+      CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+          address + size, col.length);
+      size += col.length;
+    }
+    Arrays.fill(nullSetWords, 0);
+    int nullSetSize = nullSetWords.length * 8;
+    int nullWordLoc = size;
+    size += nullSetSize;
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      Object value = row[mesCount + dimensionSize];
+      if (null != value) {
+        DataType dataType = measureDataType[mesCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          Boolean bval = (Boolean) value;
+          CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, bval);
+          size += 1;
+        } else if (dataType == DataTypes.SHORT) {
+          Short sval = (Short) value;
+          CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
+          size += 2;
+        } else if (dataType == DataTypes.INT) {
+          Integer ival = (Integer) value;
+          CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
+          size += 4;
+        } else if (dataType == DataTypes.LONG) {
+          Long val = (Long) value;
+          CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
+          size += 8;
+        } else if (dataType == DataTypes.DOUBLE) {
+          Double doubleVal = (Double) value;
+          CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
+          size += 8;
+        } else if (DataTypes.isDecimal(dataType)) {
+          BigDecimal decimalVal = (BigDecimal) value;
+          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+          CarbonUnsafe.getUnsafe()
+              .putShort(baseObject, address + size, (short) bigDecimalInBytes.length);
+          size += 2;
+          CarbonUnsafe.getUnsafe()
+              .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+                  address + size, bigDecimalInBytes.length);
+          size += bigDecimalInBytes.length;
+        } else {
+          throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
+        }
+        set(nullSetWords, mesCount);
+      } else {
+        unset(nullSetWords, mesCount);
+      }
+    }
+    CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
+        address + nullWordLoc, nullSetSize);
+    return size;
   }
 
-  /**
-   * get one row from memory address
-   * @param address address
-   * @return one row
-   */
-  public IntermediateSortTempRow getRow(long address) {
-    return sortStepRowHandler.readIntermediateSortTempRowFromUnsafeMemory(
-        dataBlock.getBaseObject(), address);
+  public Object[] getRow(long address, Object[] rowToFill) {
+    int dimCount = 0;
+    int size = 0;
+
+    Object baseObject = dataBlock.getBaseObject();
+    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+      if (noDictionaryDimensionMapping[dimCount]) {
+        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+        byte[] col = new byte[aShort];
+        size += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                col.length);
+        size += col.length;
+        rowToFill[dimCount] = col;
+      } else {
+        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+        size += 4;
+        rowToFill[dimCount] = anInt;
+      }
+    }
+
+    // write complex dimensions here.
+    for (; dimCount < dimensionSize; dimCount++) {
+      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+      byte[] col = new byte[aShort];
+      size += 2;
+      CarbonUnsafe.getUnsafe()
+          .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+      size += col.length;
+      rowToFill[dimCount] = col;
+    }
+
+    int nullSetSize = nullSetWords.length * 8;
+    Arrays.fill(nullSetWords, 0);
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+            nullSetSize);
+    size += nullSetSize;
+
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      if (isSet(nullSetWords, mesCount)) {
+        DataType dataType = measureDataType[mesCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          Boolean bval = CarbonUnsafe.getUnsafe().getBoolean(baseObject, address + size);
+          size += 1;
+          rowToFill[dimensionSize + mesCount] = bval;
+        } else if (dataType == DataTypes.SHORT) {
+          Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+          size += 2;
+          rowToFill[dimensionSize + mesCount] = sval;
+        } else if (dataType == DataTypes.INT) {
+          Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+          size += 4;
+          rowToFill[dimensionSize + mesCount] = ival;
+        } else if (dataType == DataTypes.LONG) {
+          Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+          size += 8;
+          rowToFill[dimensionSize + mesCount] = val;
+        } else if (dataType == DataTypes.DOUBLE) {
+          Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+          size += 8;
+          rowToFill[dimensionSize + mesCount] = doubleVal;
+        } else if (DataTypes.isDecimal(dataType)) {
+          short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+          byte[] bigDecimalInBytes = new byte[aShort];
+          size += 2;
+          CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+          size += bigDecimalInBytes.length;
+          rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+        } else {
+          throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
+        }
+      } else {
+        rowToFill[dimensionSize + mesCount] = null;
+      }
+    }
+    return rowToFill;
   }
 
-  /**
-   * write a row to stream
-   * @param address address of a row
-   * @param stream stream
-   * @throws IOException
-   */
-  public void writeRow(long address, DataOutputStream stream) throws IOException {
-    sortStepRowHandler.writeIntermediateSortTempRowFromUnsafeMemoryToStream(
-        dataBlock.getBaseObject(), address, stream);
+  public void fillRow(long address, DataOutputStream stream) throws IOException {
+    int dimCount = 0;
+    int size = 0;
+
+    Object baseObject = dataBlock.getBaseObject();
+    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+      if (noDictionaryDimensionMapping[dimCount]) {
+        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+        byte[] col = new byte[aShort];
+        size += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                col.length);
+        size += col.length;
+        stream.writeShort(aShort);
+        stream.write(col);
+      } else {
+        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+        size += 4;
+        stream.writeInt(anInt);
+      }
+    }
+
+    // write complex dimensions here.
+    for (; dimCount < dimensionSize; dimCount++) {
+      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+      byte[] col = new byte[aShort];
+      size += 2;
+      CarbonUnsafe.getUnsafe()
+          .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+      size += col.length;
+      stream.writeShort(aShort);
+      stream.write(col);
+    }
+
+    int nullSetSize = nullSetWords.length * 8;
+    Arrays.fill(nullSetWords, 0);
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+            nullSetSize);
+    size += nullSetSize;
+    for (int i = 0; i < nullSetWords.length; i++) {
+      stream.writeLong(nullSetWords[i]);
+    }
+
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      if (isSet(nullSetWords, mesCount)) {
+        DataType dataType = measureDataType[mesCount];
+        if (dataType == DataTypes.SHORT) {
+          short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+          size += 2;
+          stream.writeShort(sval);
+        } else if (dataType == DataTypes.INT) {
+          int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+          size += 4;
+          stream.writeInt(ival);
+        } else if (dataType == DataTypes.LONG) {
+          long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+          size += 8;
+          stream.writeLong(val);
+        } else if (dataType == DataTypes.DOUBLE) {
+          double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+          size += 8;
+          stream.writeDouble(doubleVal);
+        } else if (DataTypes.isDecimal(dataType)) {
+          short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+          byte[] bigDecimalInBytes = new byte[aShort];
+          size += 2;
+          CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+          size += bigDecimalInBytes.length;
+          stream.writeShort(aShort);
+          stream.write(bigDecimalInBytes);
+        } else {
+          throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
+        }
+      }
+    }
   }
 
   public void freeMemory() {
@@ -134,8 +362,27 @@ public class UnsafeCarbonRowPage {
     return dataBlock;
   }
 
-  public TableFieldStat getTableFieldStat() {
-    return tableFieldStat;
+  public static void set(long[] words, int index) {
+    int wordOffset = (index >> 6);
+    words[wordOffset] |= (1L << index);
+  }
+
+  public static void unset(long[] words, int index) {
+    int wordOffset = (index >> 6);
+    words[wordOffset] &= ~(1L << index);
+  }
+
+  public static boolean isSet(long[] words, int index) {
+    int wordOffset = (index >> 6);
+    return ((words[wordOffset] & (1L << index)) != 0);
+  }
+
+  public boolean[] getNoDictionaryDimensionMapping() {
+    return noDictionaryDimensionMapping;
+  }
+
+  public boolean[] getNoDictionarySortColumnMapping() {
+    return noDictionarySortColumnMapping;
   }
 
   public void setNewDataBlock(MemoryBlock newMemoryBlock) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 5d038d3..4dd5e44 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.loading.sort.unsafe;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -42,14 +41,13 @@ import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
-import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
 import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
 import org.apache.carbondata.processing.loading.sort.unsafe.sort.TimSort;
 import org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDataFormat;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 public class UnsafeSortDataRows {
@@ -71,8 +69,7 @@ public class UnsafeSortDataRows {
    */
 
   private SortParameters parameters;
-  private TableFieldStat tableFieldStat;
-  private ThreadLocal<ByteBuffer> rowBuffer;
+
   private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
 
   private UnsafeCarbonRowPage rowPage;
@@ -97,13 +94,7 @@ public class UnsafeSortDataRows {
   public UnsafeSortDataRows(SortParameters parameters,
       UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
     this.parameters = parameters;
-    this.tableFieldStat = new TableFieldStat(parameters);
-    this.rowBuffer = new ThreadLocal<ByteBuffer>() {
-      @Override protected ByteBuffer initialValue() {
-        byte[] backedArray = new byte[2 * 1024 * 1024];
-        return ByteBuffer.wrap(backedArray);
-      }
-    };
+
     this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
 
     // observer of writing file in thread
@@ -136,7 +127,11 @@ public class UnsafeSortDataRows {
     if (isMemoryAvailable) {
       UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
     }
-    this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+    this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+        parameters.getNoDictionarySortColumn(),
+        parameters.getDimColCount() + parameters.getComplexDimColCount(),
+        parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
+        !isMemoryAvailable, taskId);
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
 
@@ -183,7 +178,7 @@ public class UnsafeSortDataRows {
   private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
     for (int i = 0; i < size; i++) {
       if (rowPage.canAdd()) {
-        bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
+        bytesAdded += rowPage.addRow(rowBatch[i]);
       } else {
         try {
           if (enableInMemoryIntermediateMerge) {
@@ -199,8 +194,15 @@ public class UnsafeSortDataRows {
           if (!saveToDisk) {
             UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
           }
-          rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
-          bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
+          rowPage = new UnsafeCarbonRowPage(
+                  parameters.getNoDictionaryDimnesionColumn(),
+                  parameters.getNoDictionarySortColumn(),
+                  parameters.getDimColCount() + parameters.getComplexDimColCount(),
+                  parameters.getMeasureColCount(),
+                  parameters.getMeasureDataType(),
+                  memoryBlock,
+                  saveToDisk, taskId);
+          bytesAdded += rowPage.addRow(rowBatch[i]);
         } catch (Exception e) {
           LOGGER.error(
                   "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -218,7 +220,7 @@ public class UnsafeSortDataRows {
     // if record holder list size is equal to sort buffer size then it will
     // sort the list and then write current list data to file
     if (rowPage.canAdd()) {
-      rowPage.addRow(row, rowBuffer.get());
+      rowPage.addRow(row);
     } else {
       try {
         if (enableInMemoryIntermediateMerge) {
@@ -233,8 +235,13 @@ public class UnsafeSortDataRows {
         if (!saveToDisk) {
           UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
         }
-        rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
-        rowPage.addRow(row, rowBuffer.get());
+        rowPage = new UnsafeCarbonRowPage(
+            parameters.getNoDictionaryDimnesionColumn(),
+            parameters.getNoDictionarySortColumn(),
+            parameters.getDimColCount(), parameters.getMeasureColCount(),
+            parameters.getMeasureDataType(), memoryBlock,
+            saveToDisk, taskId);
+        rowPage.addRow(row);
       } catch (Exception e) {
         LOGGER.error(
             "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -262,7 +269,7 @@ public class UnsafeSortDataRows {
             new UnsafeRowComparator(rowPage));
       } else {
         timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparatorForNormalDims(rowPage));
+            new UnsafeRowComparatorForNormalDIms(rowPage));
       }
       unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
     } else {
@@ -288,9 +295,10 @@ public class UnsafeSortDataRows {
       // write number of entries to the file
       stream.writeInt(actualSize);
       for (int i = 0; i < actualSize; i++) {
-        rowPage.writeRow(
-            rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(), stream);
+        rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(),
+            stream);
       }
+
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
     } finally {
@@ -359,7 +367,7 @@ public class UnsafeSortDataRows {
               new UnsafeRowComparator(page));
         } else {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new UnsafeRowComparatorForNormalDims(page));
+              new UnsafeRowComparatorForNormalDIms(page));
         }
         if (page.isSaveToDisk()) {
           // create a new file every time
@@ -372,8 +380,7 @@ public class UnsafeSortDataRows {
           writeDataToFile(page, sortTempFile);
           LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
               + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
-              + sortTempFile + ", sort temp file size in MB is "
-              + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
+              + sortTempFile);
           page.freeMemory();
           // add sort temp filename to and arrayList. When the list size reaches 20 then
           // intermediate merging of sort temp files will be triggered

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
index 33342dc..d02be9b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -23,25 +23,63 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
+
+  /**
+   * mapping of dictionary and no dictionary of sort_columns.
+   */
+  private boolean[] noDictionarySortColumnMaping;
+
   private Object baseObject;
-  private TableFieldStat tableFieldStat;
-  private int dictSizeInMemory;
 
   public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
+    this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
     this.baseObject = rowPage.getDataBlock().getBaseObject();
-    this.tableFieldStat = rowPage.getTableFieldStat();
-    this.dictSizeInMemory = (tableFieldStat.getDictSortDimCnt()
-        + tableFieldStat.getDictNoSortDimCnt()) * 4;
   }
 
   /**
    * Below method will be used to compare two mdkey
    */
   public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
-    return compare(rowL, baseObject, rowR, baseObject);
+    int diff = 0;
+    long rowA = rowL.address;
+    long rowB = rowR.address;
+    int sizeA = 0;
+    int sizeB = 0;
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+      if (isNoDictionary) {
+        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + sizeA);
+        byte[] byteArr1 = new byte[aShort1];
+        sizeA += 2;
+        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1,
+            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
+        sizeA += aShort1;
+
+        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + sizeB);
+        byte[] byteArr2 = new byte[aShort2];
+        sizeB += 2;
+        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2,
+            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
+        sizeB += aShort2;
+
+        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+        if (difference != 0) {
+          return difference;
+        }
+      } else {
+        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+        sizeA += 4;
+        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+        sizeB += 4;
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+    }
+
+    return diff;
   }
 
   /**
@@ -52,40 +90,35 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     int diff = 0;
     long rowA = rowL.address;
     long rowB = rowR.address;
-    int sizeInDictPartA = 0;
-
-    int sizeInNonDictPartA = 0;
-    int sizeInDictPartB = 0;
-    int sizeInNonDictPartB = 0;
-    for (boolean isNoDictionary : tableFieldStat.getIsSortColNoDictFlags()) {
+    int sizeA = 0;
+    int sizeB = 0;
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
       if (isNoDictionary) {
-        short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL,
-            rowA + dictSizeInMemory + sizeInNonDictPartA);
-        byte[] byteArr1 = new byte[lengthA];
-        sizeInNonDictPartA += 2;
+        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + sizeA);
+        byte[] byteArr1 = new byte[aShort1];
+        sizeA += 2;
         CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA,
-                byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
-        sizeInNonDictPartA += lengthA;
+            .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                aShort1);
+        sizeA += aShort1;
 
-        short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR,
-            rowB + dictSizeInMemory + sizeInNonDictPartB);
-        byte[] byteArr2 = new byte[lengthB];
-        sizeInNonDictPartB += 2;
+        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + sizeB);
+        byte[] byteArr2 = new byte[aShort2];
+        sizeB += 2;
         CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB,
-                byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
-        sizeInNonDictPartB += lengthB;
+            .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                aShort2);
+        sizeB += aShort2;
 
         int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
         if (difference != 0) {
           return difference;
         }
       } else {
-        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeInDictPartA);
-        sizeInDictPartA += 4;
-        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeInDictPartB);
-        sizeInDictPartB += 4;
+        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeA);
+        sizeA += 4;
+        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeB);
+        sizeB += 4;
         diff = dimFieldA - dimFieldB;
         if (diff != 0) {
           return diff;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
new file mode 100644
index 0000000..483dcb2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> {
+
+  private Object baseObject;
+
+  private int numberOfSortColumns;
+
+  public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
+    this.baseObject = rowPage.getDataBlock().getBaseObject();
+    this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+    int diff = 0;
+    long rowA = rowL.address;
+    long rowB = rowR.address;
+    int sizeA = 0;
+    int sizeB = 0;
+    for (int i = 0; i < numberOfSortColumns; i++) {
+      int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+      sizeA += 4;
+      int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+      sizeB += 4;
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
+      }
+    }
+
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
deleted file mode 100644
index e9cfb1c..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
-
-public class UnsafeRowComparatorForNormalDims implements Comparator<UnsafeCarbonRow> {
-
-  private Object baseObject;
-
-  private int numberOfSortColumns;
-
-  public UnsafeRowComparatorForNormalDims(UnsafeCarbonRowPage rowPage) {
-    this.baseObject = rowPage.getDataBlock().getBaseObject();
-    this.numberOfSortColumns = rowPage.getTableFieldStat().getIsSortColNoDictFlags().length;
-  }
-
-  /**
-   * Below method will be used to compare two mdkey
-   */
-  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
-    int diff = 0;
-    long rowA = rowL.address;
-    long rowB = rowR.address;
-    int sizeA = 0;
-    int sizeB = 0;
-    for (int i = 0; i < numberOfSortColumns; i++) {
-      int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
-      sizeA += 4;
-      int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
-      sizeB += 4;
-      diff = dimFieldA - dimFieldB;
-      if (diff != 0) {
-        return diff;
-      }
-    }
-
-    return diff;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
index d790c41..686e855 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
 /**
@@ -29,7 +28,7 @@ public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> {
 
   void readRow()  throws CarbonSortKeyAndGroupByException;
 
-  IntermediateSortTempRow getRow();
+  Object[] getRow();
 
   int numberOfRows();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index a776db1..6b0cfa6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -19,10 +19,9 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
-import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
 
 public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
 
@@ -39,18 +38,21 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
 
   private UnsafeCarbonRowPage[] rowPages;
 
-  private IntermediateSortTempRowComparator comparator;
+  private NewRowComparator comparator;
 
-  private IntermediateSortTempRow currentRow;
+  private Object[] currentRow;
+
+  private int columnSize;
 
   public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
-      boolean[] noDictSortColumnMapping) {
+      boolean[] noDictSortColumnMapping, int columnSize) {
     this.actualSize = merger.getEntryCount();
     this.mergedAddresses = merger.getMergedAddresses();
     this.rowPageIndexes = merger.getRowPageIndexes();
     this.rowPages = merger.getUnsafeCarbonRowPages();
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping);
+    this.comparator = new NewRowComparator(noDictSortColumnMapping);
+    this.columnSize = columnSize;
   }
 
   public boolean hasNext() {
@@ -61,11 +63,12 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
   }
 
   public void readRow() {
-    currentRow = rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter]);
+    currentRow = new Object[columnSize];
+    rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow);
     counter++;
   }
 
-  public IntermediateSortTempRow getRow() {
+  public Object[] getRow() {
     return currentRow;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index cbcbbae..6f05088 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -19,9 +19,8 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
 
 public class UnsafeInmemoryHolder implements SortTempChunkHolder {
 
@@ -34,18 +33,21 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
 
   private UnsafeCarbonRowPage rowPage;
 
-  private IntermediateSortTempRow currentRow;
+  private Object[] currentRow;
 
   private long address;
 
-  private IntermediateSortTempRowComparator comparator;
+  private NewRowComparator comparator;
 
-  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage) {
+  private int columnSize;
+
+  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
+      int numberOfSortColumns) {
     this.actualSize = rowPage.getBuffer().getActualSize();
     this.rowPage = rowPage;
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new IntermediateSortTempRowComparator(
-        rowPage.getTableFieldStat().getIsSortColNoDictFlags());
+    this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
+    this.columnSize = columnSize;
   }
 
   public boolean hasNext() {
@@ -56,12 +58,13 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
   }
 
   public void readRow() {
+    currentRow = new Object[columnSize];
     address = rowPage.getBuffer().get(counter);
-    currentRow = rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset());
+    rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow);
     counter++;
   }
 
-  public IntermediateSortTempRow getRow() {
+  public Object[] getRow() {
     return currentRow;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 527452a..11b3d43 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -31,14 +31,15 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
@@ -62,15 +63,21 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    * entry count
    */
   private int entryCount;
+
   /**
    * return row
    */
-  private IntermediateSortTempRow returnRow;
+  private Object[] returnRow;
+  private int dimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private DataType[] measureDataTypes;
   private int readBufferSize;
   private String compressorName;
-  private IntermediateSortTempRow[] currentBuffer;
+  private Object[][] currentBuffer;
 
-  private IntermediateSortTempRow[] backupBuffer;
+  private Object[][] backupBuffer;
 
   private boolean isBackupFilled;
 
@@ -93,21 +100,27 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
   private int numberOfObjectRead;
 
-  private TableFieldStat tableFieldStat;
-  private SortStepRowHandler sortStepRowHandler;
-  private Comparator<IntermediateSortTempRow> comparator;
+  private int nullSetWordsLength;
+
+  private Comparator<Object[]> comparator;
+
   /**
    * Constructor to initialize
    */
   public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
     // set temp file
     this.tempFile = tempFile;
+    this.dimCnt = parameters.getDimColCount();
+    this.complexCnt = parameters.getComplexDimColCount();
+    this.measureCnt = parameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+    this.measureDataTypes = parameters.getMeasureDataType();
     this.readBufferSize = parameters.getBufferSize();
     this.compressorName = parameters.getSortTempCompressorName();
-    this.tableFieldStat = new TableFieldStat(parameters);
-    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+
     this.executorService = Executors.newFixedThreadPool(1);
-    comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn());
+    this.nullSetWordsLength = ((parameters.getMeasureColCount() - 1) >> 6) + 1;
+    comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
     initialize();
   }
 
@@ -156,17 +169,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    *
    * @throws CarbonSortKeyAndGroupByException problem while reading
    */
-  @Override
   public void readRow() throws CarbonSortKeyAndGroupByException {
     if (prefetch) {
       fillDataForPrefetch();
     } else {
-      try {
-        this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
-        this.numberOfObjectRead++;
-      } catch (IOException e) {
-        throw new CarbonSortKeyAndGroupByException("Problems while reading row", e);
-      }
+      this.returnRow = getRowFromStream();
     }
   }
 
@@ -200,22 +207,63 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   }
 
   /**
-   * get a batch of row, this interface is used in reading compressed sort temp files
-   *
-   * @param expected expected number in a batch
-   * @return a batch of row
-   * @throws IOException if error occurs while reading from stream
+   * @return
+   * @throws CarbonSortKeyAndGroupByException
    */
-  private IntermediateSortTempRow[] readBatchedRowFromStream(int expected)
-      throws IOException {
-    IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
-    for (int i = 0; i < expected; i++) {
-      IntermediateSortTempRow holder
-          = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
-      holders[i] = holder;
+  private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
+    Object[] row = new Object[dimCnt + measureCnt];
+    try {
+      int dimCount = 0;
+      for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+        if (isNoDictionaryDimensionColumn[dimCount]) {
+          short aShort = stream.readShort();
+          byte[] col = new byte[aShort];
+          stream.readFully(col);
+          row[dimCount] = col;
+        } else {
+          int anInt = stream.readInt();
+          row[dimCount] = anInt;
+        }
+      }
+
+      // write complex dimensions here.
+      for (; dimCount < dimCnt; dimCount++) {
+        short aShort = stream.readShort();
+        byte[] col = new byte[aShort];
+        stream.readFully(col);
+        row[dimCount] = col;
+      }
+
+      long[] words = new long[nullSetWordsLength];
+      for (int i = 0; i < words.length; i++) {
+        words[i] = stream.readLong();
+      }
+
+      for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
+        if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
+          DataType dataType = measureDataTypes[mesCount];
+          if (dataType == DataTypes.SHORT) {
+            row[dimCount + mesCount] = stream.readShort();
+          } else if (dataType == DataTypes.INT) {
+            row[dimCount + mesCount] = stream.readInt();
+          } else if (dataType == DataTypes.LONG) {
+            row[dimCount + mesCount] = stream.readLong();
+          } else if (dataType == DataTypes.DOUBLE) {
+            row[dimCount + mesCount] = stream.readDouble();
+          } else if (DataTypes.isDecimal(dataType)) {
+            short aShort = stream.readShort();
+            byte[] bigDecimalInBytes = new byte[aShort];
+            stream.readFully(bigDecimalInBytes);
+            row[dimCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+          } else {
+            throw new IllegalArgumentException("unsupported data type:" + dataType);
+          }
+        }
+      }
+      return row;
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException(e);
     }
-    this.numberOfObjectRead += expected;
-    return holders;
   }
 
   /**
@@ -223,7 +271,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    *
    * @return row
    */
-  public IntermediateSortTempRow getRow() {
+  public Object[] getRow() {
     return this.returnRow;
   }
 
@@ -278,7 +326,9 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
   @Override public int hashCode() {
     int hash = 0;
-    hash += tableFieldStat.hashCode();
+    hash += 31 * measureCnt;
+    hash += 31 * dimCnt;
+    hash += 31 * complexCnt;
     hash += tempFile.hashCode();
     return hash;
   }
@@ -318,12 +368,16 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   /**
    * This method will read the records from sort temp file and keep it in a buffer
    *
-   * @param numberOfRecords number of records to be read
-   * @return batch of intermediate sort temp row
-   * @throws IOException if error occurs reading records from file
+   * @param numberOfRecords
+   * @return
+   * @throws CarbonSortKeyAndGroupByException
    */
-  private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords)
-      throws IOException {
-    return readBatchedRowFromStream(numberOfRecords);
+  private Object[][] prefetchRecordsFromFile(int numberOfRecords)
+      throws CarbonSortKeyAndGroupByException {
+    Object[][] records = new Object[numberOfRecords][];
+    for (int i = 0; i < numberOfRecords; i++) {
+      records[i] = getRowFromStream();
+    }
+    return records;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 22673ff..4bbf61b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -21,21 +21,25 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.AbstractQueue;
+import java.util.Arrays;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 public class UnsafeIntermediateFileMerger implements Callable<Void> {
   /**
@@ -65,13 +69,22 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   private int totalNumberOfRecords;
 
   private SortParameters mergerParameters;
-  private TableFieldStat tableFieldStat;
+
   private File[] intermediateFiles;
+
   private File outPutFile;
 
+  private int dimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private DataType[] measureDataTypes;
   private int writeBufferSize;
   private String compressorName;
-  private SortStepRowHandler sortStepRowHandler;
+
+  private long[] nullSetWords;
+
+  private ByteBuffer rowData;
 
   private Throwable throwable;
 
@@ -84,10 +97,16 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     this.fileCounter = intermediateFiles.length;
     this.intermediateFiles = intermediateFiles;
     this.outPutFile = outPutFile;
+    this.dimCnt = mergerParameters.getDimColCount();
+    this.complexCnt = mergerParameters.getComplexDimColCount();
+    this.measureCnt = mergerParameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
+    this.measureDataTypes = mergerParameters.getMeasureDataType();
     this.writeBufferSize = mergerParameters.getBufferSize();
     this.compressorName = mergerParameters.getSortTempCompressorName();
-    this.tableFieldStat = new TableFieldStat(mergerParameters);
-    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+    this.nullSetWords = new long[((measureCnt - 1) >> 6) + 1];
+    // Take size of 2 MB for each row. I think it is high enough to use
+    rowData = ByteBuffer.allocate(2 * 1024 * 1024);
   }
 
   @Override public Void call() throws Exception {
@@ -146,14 +165,13 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   }
 
   /**
-   * This method will be used to get sorted sort temp row from the sort temp files
+   * This method will be used to get the sorted record from file
    *
    * @return sorted record sorted record
    * @throws CarbonSortKeyAndGroupByException
    */
-  private IntermediateSortTempRow getSortedRecordFromFile()
-      throws CarbonSortKeyAndGroupByException {
-    IntermediateSortTempRow row = null;
+  private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
+    Object[] row = null;
 
     // poll the top object from heap
     // heap maintains binary tree which is based on heap condition that will
@@ -217,7 +235,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
       this.recordHolderHeap.add(sortTempFileChunkHolder);
     }
 
-    LOGGER.info("Heap Size: " + this.recordHolderHeap.size());
+    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
   }
 
   /**
@@ -232,12 +250,12 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   }
 
   /**
-   * This method will be used to get the sorted sort temp row
+   * This method will be used to get the sorted row
    *
    * @return sorted row
    * @throws CarbonSortKeyAndGroupByException
    */
-  private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
+  private Object[] next() throws CarbonSortKeyAndGroupByException {
     return getSortedRecordFromFile();
   }
 
@@ -254,16 +272,82 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   /**
    * Below method will be used to write data to file
    *
-   * @throws IOException problem while writing
+   * @throws CarbonSortKeyAndGroupByException problem while writing
    */
-  private void writeDataToFile(IntermediateSortTempRow row) throws IOException {
-    sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream);
+  private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
+    int dimCount = 0;
+    int size = 0;
+    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+      if (isNoDictionaryDimensionColumn[dimCount]) {
+        byte[] col = (byte[]) row[dimCount];
+        rowData.putShort((short) col.length);
+        size += 2;
+        rowData.put(col);
+        size += col.length;
+      } else {
+        rowData.putInt((int) row[dimCount]);
+        size += 4;
+      }
+    }
+
+    // write complex dimensions here.
+    int dimensionSize = dimCnt + complexCnt;
+    for (; dimCount < dimensionSize; dimCount++) {
+      byte[] col = (byte[]) row[dimCount];
+      rowData.putShort((short)col.length);
+      size += 2;
+      rowData.put(col);
+      size += col.length;
+    }
+    Arrays.fill(nullSetWords, 0);
+    int nullSetSize = nullSetWords.length * 8;
+    int nullLoc = size;
+    size += nullSetSize;
+    for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
+      Object value = row[mesCount + dimensionSize];
+      if (null != value) {
+        DataType dataType = measureDataTypes[mesCount];
+        if (dataType == DataTypes.SHORT) {
+          rowData.putShort(size, (Short) value);
+          size += 2;
+        } else if (dataType == DataTypes.INT) {
+          rowData.putInt(size, (Integer) value);
+          size += 4;
+        } else if (dataType == DataTypes.LONG) {
+          rowData.putLong(size, (Long) value);
+          size += 8;
+        } else if (dataType == DataTypes.DOUBLE) {
+          rowData.putDouble(size, (Double) value);
+          size += 8;
+        } else if (DataTypes.isDecimal(dataType)) {
+          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(((BigDecimal) value));
+          rowData.putShort(size, (short) bigDecimalInBytes.length);
+          size += 2;
+          for (int i = 0; i < bigDecimalInBytes.length; i++) {
+            rowData.put(size++, bigDecimalInBytes[i]);
+          }
+        }
+        UnsafeCarbonRowPage.set(nullSetWords, mesCount);
+      } else {
+        UnsafeCarbonRowPage.unset(nullSetWords, mesCount);
+      }
+    }
+    for (int i = 0; i < nullSetWords.length; i++) {
+      rowData.putLong(nullLoc, nullSetWords[i]);
+      nullLoc += 8;
+    }
+    byte[] rowBytes = new byte[size];
+    rowData.position(0);
+    rowData.get(rowBytes);
+    stream.write(rowBytes);
+    rowData.clear();
   }
 
   private void finish() throws CarbonSortKeyAndGroupByException {
     clear();
     try {
       CarbonUtil.deleteFiles(intermediateFiles);
+      rowData.clear();
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 64f3c25..ce118d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -29,8 +29,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.processing.loading.sort.SortStepRowUtil;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeFinalMergePageHolder;
@@ -56,7 +55,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;
 
   private SortParameters parameters;
-  private SortStepRowHandler sortStepRowHandler;
+  private SortStepRowUtil sortStepRowUtil;
   /**
    * tempFileLocation
    */
@@ -69,7 +68,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
       String[] tempFileLocation) {
     this.parameters = parameters;
-    this.sortStepRowHandler = new SortStepRowHandler(parameters);
+    this.sortStepRowUtil = new SortStepRowUtil(parameters);
     this.tempFileLocation = tempFileLocation;
     this.tableName = parameters.getTableName();
   }
@@ -109,7 +108,9 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
       LOGGER.info("Started adding first record from each page");
       for (final UnsafeCarbonRowPage rowPage : rowPages) {
 
-        SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage);
+        SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
+            parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
+                .getMeasureColCount(), parameters.getNumberOfSortColumns());
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -120,7 +121,9 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
       for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
 
         SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn());
+            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(),
+                parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
+                    .getMeasureColCount());
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -139,7 +142,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
         recordHolderHeapLocal.add(sortTempFileChunkHolder);
       }
 
-      LOGGER.info("Heap Size: " + this.recordHolderHeapLocal.size());
+      LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
     } catch (Exception e) {
       LOGGER.error(e);
       throw new CarbonDataWriterException(e);
@@ -177,14 +180,12 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   }
 
   /**
-   * This method will be used to get the sorted row in 3-parted format.
-   * The row will feed the following writer process step.
+   * This method will be used to get the sorted row
    *
    * @return sorted row
    */
   public Object[] next() {
-    IntermediateSortTempRow sortTempRow =  getSortedRecordFromFile();
-    return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+    return sortStepRowUtil.convertRow(getSortedRecordFromFile());
   }
 
   /**
@@ -192,8 +193,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
    *
    * @return sorted record sorted record
    */
-  private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException {
-    IntermediateSortTempRow row = null;
+  private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
+    Object[] row = null;
 
     // poll the top object from heap
     // heap maintains binary tree which is based on heap condition that will

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 4579c85..1ab803b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1005,14 +1005,8 @@ public final class CarbonDataMergerUtil {
   /**
    * This method traverses Update Delta Files inside the seg and return true
    * if UpdateDelta Files are more than IUD Compaction threshold.
-   *
-   * @param seg
-   * @param identifier
-   * @param segmentUpdateStatusManager
-   * @param numberDeltaFilesThreshold
-   * @return
    */
-  public static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
+  private static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
       AbsoluteTableIdentifier identifier, SegmentUpdateStatusManager segmentUpdateStatusManager,
       int numberDeltaFilesThreshold) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index bce1b33..038d34c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -389,6 +389,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
           noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length);
     }
     sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
+
     String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     finalMerger =
@@ -406,8 +407,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
               + carbonLoadModel.getFactTimeStamp() + ".tmp";
     } else {
       carbonStoreLocation = CarbonDataProcessorUtil
-          .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(),
-              tableName, carbonLoadModel.getSegmentId());
+          .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName,
+              carbonLoadModel.getSegmentId());
     }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 4aca13a..2616def 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -76,9 +76,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
           partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
               .getFactTimeStamp() + ".tmp";
     } else {
-      carbonStoreLocation = CarbonDataProcessorUtil
-          .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
-              tableName, loadModel.getSegmentId());
+      carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+          loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
     }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 92db4c5..221697f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -47,9 +47,8 @@ public class RowResultProcessor {
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
     this.segmentProperties = segProp;
     String tableName = carbonTable.getTableName();
-    String carbonStoreLocation = CarbonDataProcessorUtil
-        .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
-            tableName, loadModel.getSegmentId());
+    String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+        loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
     CarbonFactDataHandlerModel carbonFactDataHandlerModel =
         CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
             segProp, tableName, tempStoreLocation, carbonStoreLocation);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index c06819c..04efa1f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.AbstractQueue;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
@@ -28,9 +29,11 @@ import java.util.concurrent.Callable;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
 public class IntermediateFileMerger implements Callable<Void> {
@@ -65,12 +68,17 @@ public class IntermediateFileMerger implements Callable<Void> {
   private File[] intermediateFiles;
 
   private File outPutFile;
+  private int dimCnt;
+  private int noDictDimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private DataType[] measureDataTypes;
   private int writeBufferSize;
   private String compressorName;
 
   private Throwable throwable;
-  private TableFieldStat tableFieldStat;
-  private SortStepRowHandler sortStepRowHandler;
+
   /**
    * IntermediateFileMerger Constructor
    */
@@ -80,10 +88,14 @@ public class IntermediateFileMerger implements Callable<Void> {
     this.fileCounter = intermediateFiles.length;
     this.intermediateFiles = intermediateFiles;
     this.outPutFile = outPutFile;
+    this.dimCnt = mergerParameters.getDimColCount();
+    this.noDictDimCnt = mergerParameters.getNoDictionaryCount();
+    this.complexCnt = mergerParameters.getComplexDimColCount();
+    this.measureCnt = mergerParameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
+    this.measureDataTypes = mergerParameters.getMeasureDataType();
     this.writeBufferSize = mergerParameters.getBufferSize();
     this.compressorName = mergerParameters.getSortTempCompressorName();
-    this.tableFieldStat = new TableFieldStat(mergerParameters);
-    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
   }
 
   @Override public Void call() throws Exception {
@@ -142,14 +154,13 @@ public class IntermediateFileMerger implements Callable<Void> {
   }
 
   /**
-   * This method will be used to get the sorted sort temp row from sort temp file
+   * This method will be used to get the sorted record from file
    *
    * @return sorted record sorted record
    * @throws CarbonSortKeyAndGroupByException
    */
-  private IntermediateSortTempRow getSortedRecordFromFile()
-      throws CarbonSortKeyAndGroupByException {
-    IntermediateSortTempRow row = null;
+  private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
+    Object[] row = null;
 
     // poll the top object from heap
     // heap maintains binary tree which is based on heap condition that will
@@ -216,7 +227,7 @@ public class IntermediateFileMerger implements Callable<Void> {
       this.recordHolderHeap.add(sortTempFileChunkHolder);
     }
 
-    LOGGER.info("Heap Size: " + this.recordHolderHeap.size());
+    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
   }
 
   /**
@@ -231,12 +242,12 @@ public class IntermediateFileMerger implements Callable<Void> {
   }
 
   /**
-   * This method will be used to get the sorted sort temp row
+   * This method will be used to get the sorted row
    *
    * @return sorted row
    * @throws CarbonSortKeyAndGroupByException
    */
-  private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
+  private Object[] next() throws CarbonSortKeyAndGroupByException {
     return getSortedRecordFromFile();
   }
 
@@ -253,10 +264,62 @@ public class IntermediateFileMerger implements Callable<Void> {
   /**
    * Below method will be used to write data to file
    *
-   * @throws IOException problem while writing
+   * @throws CarbonSortKeyAndGroupByException problem while writing
    */
-  private void writeDataToFile(IntermediateSortTempRow row) throws IOException {
-    sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream);
+  private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException {
+    try {
+      int[] mdkArray = (int[]) row[0];
+      byte[][] nonDictArray = (byte[][]) row[1];
+      int mdkIndex = 0;
+      int nonDictKeyIndex = 0;
+      // write dictionary and non dictionary dimensions here.
+      for (boolean nodictinary : isNoDictionaryDimensionColumn) {
+        if (nodictinary) {
+          byte[] col = nonDictArray[nonDictKeyIndex++];
+          stream.writeShort(col.length);
+          stream.write(col);
+        } else {
+          stream.writeInt(mdkArray[mdkIndex++]);
+        }
+      }
+      // write complex
+      for (; nonDictKeyIndex < noDictDimCnt + complexCnt; nonDictKeyIndex++) {
+        byte[] col = nonDictArray[nonDictKeyIndex++];
+        stream.writeShort(col.length);
+        stream.write(col);
+      }
+      // write measure
+      int fieldIndex = 0;
+      for (int counter = 0; counter < measureCnt; counter++) {
+        if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
+          stream.write((byte) 1);
+          DataType dataType = measureDataTypes[counter];
+          if (dataType == DataTypes.BOOLEAN) {
+            stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.SHORT) {
+            stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.INT) {
+            stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.LONG) {
+            stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.DOUBLE) {
+            stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (DataTypes.isDecimal(dataType)) {
+            byte[] bigDecimalInBytes = DataTypeUtil
+                .bigDecimalToByte((BigDecimal) NonDictionaryUtil.getMeasure(fieldIndex, row));
+            stream.writeInt(bigDecimalInBytes.length);
+            stream.write(bigDecimalInBytes);
+          } else {
+            throw new IllegalArgumentException("unsupported data type:" + dataType);
+          }
+        } else {
+          stream.write((byte) 0);
+        }
+        fieldIndex++;
+      }
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+    }
   }
 
   private void finish() throws CarbonSortKeyAndGroupByException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
deleted file mode 100644
index 9b6d1e8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-
-/**
- * This class is used as comparator for comparing intermediate sort temp row
- */
-public class IntermediateSortTempRowComparator implements Comparator<IntermediateSortTempRow> {
-  /**
-   * isSortColumnNoDictionary whether the sort column is not dictionary or not
-   */
-  private boolean[] isSortColumnNoDictionary;
-
-  /**
-   * @param isSortColumnNoDictionary isSortColumnNoDictionary
-   */
-  public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) {
-    this.isSortColumnNoDictionary = isSortColumnNoDictionary;
-  }
-
-  /**
-   * Below method will be used to compare two sort temp row
-   */
-  public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow rowB) {
-    int diff = 0;
-    int dictIndex = 0;
-    int nonDictIndex = 0;
-
-    for (boolean isNoDictionary : isSortColumnNoDictionary) {
-
-      if (isNoDictionary) {
-        byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex];
-        byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex];
-        nonDictIndex++;
-
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
-        }
-      } else {
-        int dimFieldA = rowA.getDictSortDims()[dictIndex];
-        int dimFieldB = rowB.getDictSortDims()[dictIndex];
-        dictIndex++;
-
-        diff = dimFieldA - dimFieldB;
-        if (diff != 0) {
-          return diff;
-        }
-      }
-    }
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
index 3f94533..d2579d2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -40,11 +40,14 @@ public class NewRowComparator implements Comparator<Object[]> {
    */
   public int compare(Object[] rowA, Object[] rowB) {
     int diff = 0;
+
     int index = 0;
 
     for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+
       if (isNoDictionary) {
         byte[] byteArr1 = (byte[]) rowA[index];
+
         byte[] byteArr2 = (byte[]) rowB[index];
 
         int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
@@ -54,7 +57,6 @@ public class NewRowComparator implements Comparator<Object[]> {
       } else {
         int dimFieldA = (int) rowA[index];
         int dimFieldB = (int) rowB[index];
-
         diff = dimFieldA - dimFieldB;
         if (diff != 0) {
           return diff;
@@ -63,6 +65,7 @@ public class NewRowComparator implements Comparator<Object[]> {
 
       index++;
     }
+
     return diff;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
index 7538c92..e01b587 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
@@ -29,7 +29,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
   private int numberOfSortColumns;
 
   /**
-   * NewRowComparatorForNormalDims Constructor
+   * RowComparatorForNormalDims Constructor
    *
    * @param numberOfSortColumns
    */
@@ -46,6 +46,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
     int diff = 0;
 
     for (int i = 0; i < numberOfSortColumns; i++) {
+
       int dimFieldA = (int)rowA[i];
       int dimFieldB = (int)rowB[i];
       diff = dimFieldA - dimFieldB;


Mime
View raw message