carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject carbondata git commit: [CARBONDATA-2656] Presto vector stream readers performance Enhancement
Date Wed, 18 Jul 2018 06:36:23 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 6ca03f6b7 -> a4c2ef5f8


[CARBONDATA-2656] Presto vector stream readers performance Enhancement

eliminate the extra iteration over the carbonColumnVectorImpl object -> vectorArray, by extending it to StreamReaders which will fill up carbon-core vector data (one by one) directly to the block(presto), and on the call of block builder it will return the block to the Presto.

This closes #2412


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a4c2ef5f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a4c2ef5f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a4c2ef5f

Branch: refs/heads/master
Commit: a4c2ef5f833373b4d3bfb6dc4a9fb1c166ae0ed4
Parents: 6ca03f6
Author: sv71294 <sv71294@gmail.com>
Authored: Tue Jun 5 17:44:58 2018 +0530
Committer: chenliang613 <chenliang613@huawei.com>
Committed: Wed Jul 18 14:36:09 2018 +0800

----------------------------------------------------------------------
 .../carbondata/presto/CarbonVectorBatch.java    |  89 +++++---
 .../carbondata/presto/CarbondataPageSource.java |  95 ++------
 .../presto/CarbondataPageSourceProvider.java    |  18 +-
 .../PrestoCarbonVectorizedRecordReader.java     |  25 ++-
 .../presto/readers/AbstractStreamReader.java    |  66 ------
 .../presto/readers/BooleanStreamReader.java     |  93 +++-----
 .../readers/DecimalSliceStreamReader.java       | 219 +++++--------------
 .../presto/readers/DoubleStreamReader.java      |  94 +++-----
 .../presto/readers/IntegerStreamReader.java     |  90 +++-----
 .../presto/readers/LongStreamReader.java        |  87 +++-----
 .../presto/readers/ObjectStreamReader.java      |  56 ++---
 .../readers/PrestoVectorBlockBuilder.java       |  28 +++
 .../presto/readers/ShortStreamReader.java       |  87 +++-----
 .../presto/readers/SliceStreamReader.java       | 105 ++++-----
 .../carbondata/presto/readers/StreamReader.java |  43 ----
 .../presto/readers/StreamReaders.java           |  98 ---------
 .../presto/readers/TimestampStreamReader.java   |  75 ++++---
 .../CarbonDictionaryDecodeReadSupport.scala     |   6 +-
 18 files changed, 461 insertions(+), 913 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
index b6caaa3..6a4cc0d 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
@@ -20,50 +20,81 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.presto.readers.BooleanStreamReader;
+import org.apache.carbondata.presto.readers.DecimalSliceStreamReader;
+import org.apache.carbondata.presto.readers.DoubleStreamReader;
+import org.apache.carbondata.presto.readers.IntegerStreamReader;
+import org.apache.carbondata.presto.readers.LongStreamReader;
+import org.apache.carbondata.presto.readers.ObjectStreamReader;
+import org.apache.carbondata.presto.readers.ShortStreamReader;
+import org.apache.carbondata.presto.readers.SliceStreamReader;
+import org.apache.carbondata.presto.readers.TimestampStreamReader;
+
+import com.facebook.presto.spi.block.SliceArrayBlock;
 
 public class CarbonVectorBatch {
 
-  private static final int DEFAULT_BATCH_SIZE =  4 * 1024;
+  private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
 
-  private final StructField[] schema;
   private final int capacity;
-  private int numRows;
   private final CarbonColumnVectorImpl[] columns;
-
   // True if the row is filtered.
   private final boolean[] filteredRows;
-
   // Column indices that cannot have null values.
   private final Set<Integer> nullFilteredColumns;
-
+  private int numRows;
   // Total number of rows that have been filtered.
   private int numRowsFiltered = 0;
 
-
-  private CarbonVectorBatch(StructField[] schema, int maxRows) {
-    this.schema = schema;
+  private CarbonVectorBatch(StructField[] schema, CarbonDictionaryDecodeReadSupport readSupport,
+      int maxRows) {
     this.capacity = maxRows;
     this.columns = new CarbonColumnVectorImpl[schema.length];
     this.nullFilteredColumns = new HashSet<>();
     this.filteredRows = new boolean[maxRows];
+    Dictionary[] dictionaries = readSupport.getDictionaries();
+    DataType[] dataTypes = readSupport.getDataTypes();
 
     for (int i = 0; i < schema.length; ++i) {
-      StructField field = schema[i];
-      columns[i] = new CarbonColumnVectorImpl(maxRows, field.getDataType());
+      columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i], dictionaries[i],
+          readSupport.getSliceArrayBlock(i));
     }
-
   }
 
-
-  public static CarbonVectorBatch allocate(StructField[] schema) {
-    return new CarbonVectorBatch(schema, DEFAULT_BATCH_SIZE);
+  public static CarbonVectorBatch allocate(StructField[] schema,
+      CarbonDictionaryDecodeReadSupport readSupport) {
+    return new CarbonVectorBatch(schema, readSupport, DEFAULT_BATCH_SIZE);
   }
 
-  public static CarbonVectorBatch allocate(StructField[] schema,  int maxRows) {
-    return new CarbonVectorBatch(schema, maxRows);
+  private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
+      StructField field, Dictionary dictionary, SliceArrayBlock dictionarySliceArrayBlock) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return new BooleanStreamReader(batchSize, field.getDataType(), dictionary);
+    } else if (dataType == DataTypes.SHORT) {
+      return new ShortStreamReader(batchSize, field.getDataType(), dictionary);
+    } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
+      return new IntegerStreamReader(batchSize, field.getDataType(), dictionary);
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      return new TimestampStreamReader(batchSize, field.getDataType(), dictionary);
+    } else if (dataType == DataTypes.LONG) {
+      return new LongStreamReader(batchSize, field.getDataType(), dictionary);
+    } else if (dataType == DataTypes.DOUBLE) {
+      return new DoubleStreamReader(batchSize, field.getDataType(), dictionary);
+    } else if (dataType == DataTypes.STRING) {
+      return new SliceStreamReader(batchSize, field.getDataType(), dictionarySliceArrayBlock);
+    } else if (DataTypes.isDecimal(dataType)) {
+      return new DecimalSliceStreamReader(batchSize, (DecimalType) field.getDataType(), dictionary);
+    } else {
+      return new ObjectStreamReader(batchSize, field.getDataType());
+    }
   }
+
   /**
    * Resets the batch for writing.
    */
@@ -78,18 +109,19 @@ public class CarbonVectorBatch {
     this.numRowsFiltered = 0;
   }
 
-
   /**
    * Returns the number of columns that make up this batch.
    */
-  public int numCols() { return columns.length; }
+  public int numCols() {
+    return columns.length;
+  }
 
   /**
    * Sets the number of rows that are valid. Additionally, marks all rows as "filtered" if one or
    * more of their attributes are part of a non-nullable column.
    */
   public void setNumRows(int numRows) {
-    assert(numRows <= this.capacity);
+    assert (numRows <= this.capacity);
     this.numRows = numRows;
 
     for (int ordinal : nullFilteredColumns) {
@@ -102,30 +134,33 @@ public class CarbonVectorBatch {
     }
   }
 
-
   /**
    * Returns the number of rows for read, including filtered rows.
    */
-  public int numRows() { return numRows; }
+  public int numRows() {
+    return numRows;
+  }
 
   /**
    * Returns the number of valid rows.
    */
   public int numValidRows() {
-    assert(numRowsFiltered <= numRows);
+    assert (numRowsFiltered <= numRows);
     return numRows - numRowsFiltered;
   }
 
   /**
    * Returns the column at `ordinal`.
    */
-  public CarbonColumnVectorImpl column(int ordinal) { return columns[ordinal]; }
+  public CarbonColumnVectorImpl column(int ordinal) {
+    return columns[ordinal];
+  }
 
   /**
    * Returns the max capacity (in number of rows) for this batch.
    */
-  public int capacity() { return capacity; }
-
-
+  public int capacity() {
+    return capacity;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index d31010f..ad7006a 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -19,12 +19,10 @@ package org.apache.carbondata.presto;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.presto.readers.StreamReader;
-import org.apache.carbondata.presto.readers.StreamReaders;
+import org.apache.carbondata.presto.readers.PrestoVectorBlockBuilder;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 
 import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables;
@@ -35,7 +33,6 @@ import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.LazyBlock;
 import com.facebook.presto.spi.block.LazyBlockLoader;
-import com.facebook.presto.spi.type.Type;
 
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
@@ -47,25 +44,18 @@ class CarbondataPageSource implements ConnectorPageSource {
 
   private static final LogService logger =
       LogServiceFactory.getLogService(CarbondataPageSource.class.getName());
-  private final List<Type> types;
+  private List<ColumnHandle> columnHandles;
   private boolean closed;
   private PrestoCarbonVectorizedRecordReader vectorReader;
-  private CarbonDictionaryDecodeReadSupport<Object[]> readSupport;
-  List<ColumnHandle> columnHandles;
   private long sizeOfData = 0;
-  private final StreamReader[] readers ;
   private int batchId;
   private long nanoStart;
   private long nanoEnd;
 
-  public CarbondataPageSource(CarbonDictionaryDecodeReadSupport readSupport,
-      PrestoCarbonVectorizedRecordReader vectorizedRecordReader,
-      List<ColumnHandle> columnHandles ) {
+  CarbondataPageSource(PrestoCarbonVectorizedRecordReader vectorizedRecordReader,
+      List<ColumnHandle> columnHandles) {
     this.columnHandles = columnHandles;
-    this.types = getColumnTypes();
-    this.readSupport = readSupport;
     vectorReader = vectorizedRecordReader;
-    this.readers = createStreamReaders();
   }
 
   @Override public long getCompletedBytes() {
@@ -77,10 +67,9 @@ class CarbondataPageSource implements ConnectorPageSource {
   }
 
   @Override public boolean isFinished() {
-    return closed ;
+    return closed;
   }
 
-
   @Override public Page getNextPage() {
     if (nanoStart == 0) {
       nanoStart = System.nanoTime();
@@ -89,13 +78,12 @@ class CarbondataPageSource implements ConnectorPageSource {
     int batchSize = 0;
     try {
       batchId++;
-      if(vectorReader.nextKeyValue()) {
+      if (vectorReader.nextKeyValue()) {
         Object vectorBatch = vectorReader.getCurrentValue();
-        if(vectorBatch != null && vectorBatch instanceof CarbonVectorBatch)
-        {
+        if (vectorBatch instanceof CarbonVectorBatch) {
           columnarBatch = (CarbonVectorBatch) vectorBatch;
           batchSize = columnarBatch.numRows();
-          if(batchSize == 0){
+          if (batchSize == 0) {
             close();
             return null;
           }
@@ -108,22 +96,16 @@ class CarbondataPageSource implements ConnectorPageSource {
         return null;
       }
 
-      Block[] blocks = new Block[types.size()];
+      Block[] blocks = new Block[columnHandles.size()];
       for (int column = 0; column < blocks.length; column++) {
-        Type type = types.get(column);
-        readers[column].setBatchSize(columnarBatch.numRows());
-        readers[column].setVectorReader(true);
-        readers[column].setVector(columnarBatch.column(column));
-        blocks[column] = new LazyBlock(batchSize, new CarbondataBlockLoader(column, type));
+        blocks[column] = new LazyBlock(batchSize, new CarbondataBlockLoader(column));
       }
       Page page = new Page(batchSize, blocks);
       return page;
-    }
-    catch (PrestoException e) {
+    } catch (PrestoException e) {
       closeWithSuppression(e);
       throw e;
-    }
-    catch ( RuntimeException | InterruptedException | IOException e) {
+    } catch (RuntimeException | InterruptedException | IOException e) {
       closeWithSuppression(e);
       throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
     }
@@ -133,7 +115,7 @@ class CarbondataPageSource implements ConnectorPageSource {
     return sizeOfData;
   }
 
-  @Override public void close()  {
+  @Override public void close() {
     // some hive input formats are broken and bad things can happen if you close them multiple times
     if (closed) {
       return;
@@ -148,13 +130,11 @@ class CarbondataPageSource implements ConnectorPageSource {
 
   }
 
-  protected void closeWithSuppression(Throwable throwable)
-  {
+  private void closeWithSuppression(Throwable throwable) {
     requireNonNull(throwable, "throwable is null");
     try {
       close();
-    }
-    catch (RuntimeException e) {
+    } catch (RuntimeException e) {
       // Self-suppression not permitted
       logger.error(e, e.getMessage());
       if (throwable != e) {
@@ -166,61 +146,32 @@ class CarbondataPageSource implements ConnectorPageSource {
   /**
    * Lazy Block Implementation for the Carbondata
    */
-  private final class CarbondataBlockLoader
-      implements LazyBlockLoader<LazyBlock>
-  {
+  private final class CarbondataBlockLoader implements LazyBlockLoader<LazyBlock> {
     private final int expectedBatchId = batchId;
     private final int columnIndex;
-    private final Type type;
     private boolean loaded;
 
-    public CarbondataBlockLoader(int columnIndex, Type type)
-    {
+    CarbondataBlockLoader(int columnIndex) {
       this.columnIndex = columnIndex;
-      this.type = requireNonNull(type, "type is null");
     }
 
-    @Override
-    public final void load(LazyBlock lazyBlock)
-    {
+    @Override public final void load(LazyBlock lazyBlock) {
       if (loaded) {
         return;
       }
       checkState(batchId == expectedBatchId);
       try {
-        Block block = readers[columnIndex].readBlock(type);
+        PrestoVectorBlockBuilder blockBuilder =
+            (PrestoVectorBlockBuilder) vectorReader.getColumnarBatch().column(columnIndex);
+        blockBuilder.setBatchSize(lazyBlock.getPositionCount());
+        Block block = blockBuilder.buildBlock();
         sizeOfData += block.getSizeInBytes();
         lazyBlock.setBlock(block);
-      }
-      catch (IOException e) {
+      } catch (Exception e) {
         throw new CarbonDataLoadingException("Error in Reading Data from Carbondata ", e);
       }
       loaded = true;
     }
-
   }
 
-
-  /**
-   * Create the Stream Reader for every column based on their type
-   * This method will be initialized only once based on the types.
-   *
-   * @return
-   */
-  private StreamReader[] createStreamReaders( ) {
-    requireNonNull(types);
-    StreamReader[] readers = new StreamReader[types.size()];
-    for (int i = 0; i < types.size(); i++) {
-      readers[i] = StreamReaders.createStreamReader(types.get(i), readSupport
-          .getSliceArrayBlock(i),readSupport.getDictionaries()[i]);
-    }
-    return readers;
-  }
-
-   private List<Type> getColumnTypes() {
-    return columnHandles.stream().map(a -> ((CarbondataColumnHandle)a).getColumnType())
-        .collect(Collectors.toList());
-  }
-
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 4679eac..cc5bf2a 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -78,13 +78,12 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
       ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
     this.queryId = ((CarbondataSplit)split).getQueryId();
     CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport();
-    PrestoCarbonVectorizedRecordReader carbonRecordReader = createReader(split, columns, readSupport);
-    return new CarbondataPageSource(readSupport, carbonRecordReader, columns );
+    PrestoCarbonVectorizedRecordReader carbonRecordReader =
+        createReader(split, columns, readSupport);
+    return new CarbondataPageSource(carbonRecordReader, columns);
   }
 
-
   /**
-   *
    * @param split
    * @param columns
    * @param readSupport
@@ -103,7 +102,7 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
       CarbonIterator iterator = queryExecutor.execute(queryModel);
       readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
       PrestoCarbonVectorizedRecordReader reader = new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
-          (AbstractDetailQueryResultIterator) iterator);
+          (AbstractDetailQueryResultIterator) iterator, readSupport);
       reader.setTaskId(carbondataSplit.getIndex());
       return reader;
     } catch (IOException e) {
@@ -116,7 +115,6 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
   }
 
   /**
-   *
    * @param carbondataSplit
    * @param columns
    * @return
@@ -152,9 +150,6 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
       List<TableBlockInfo> tableBlockInfoList =
           CarbonInputSplit.createBlocks(carbonInputSplit.getAllSplits());
       queryModel.setTableBlockInfos(tableBlockInfoList);
-
-
-
       return queryModel;
     } catch (IOException e) {
       throw new RuntimeException("Unable to get the Query Model ", e);
@@ -162,7 +157,6 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
   }
 
   /**
-   *
    * @param conf
    * @param carbonTable
    * @param filterExpression
@@ -190,9 +184,7 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
     return format;
   }
 
-
   /**
-   *
    * @param columns
    * @return
    */
@@ -208,7 +200,6 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
   }
 
   /**
-   *
    * @param carbonSplit
    * @return
    */
@@ -222,5 +213,4 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
     return tableCacheModel.carbonTable;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
index 913d423..32e163a 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -80,11 +80,14 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
 
   private long queryStartTime;
 
+  private CarbonDictionaryDecodeReadSupport readSupport;
+
   public PrestoCarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel queryModel,
-      AbstractDetailQueryResultIterator iterator) {
+      AbstractDetailQueryResultIterator iterator, CarbonDictionaryDecodeReadSupport readSupport) {
     this.queryModel = queryModel;
     this.iterator = iterator;
     this.queryExecutor = queryExecutor;
+    this.readSupport = readSupport;
     enableReturningBatches();
     this.queryStartTime = System.currentTimeMillis();
   }
@@ -184,17 +187,15 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
       if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
             .getDirectDictionaryGenerator(dim.getDimension().getDataType());
-        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
-           generator.getReturnType());
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), generator.getReturnType());
       } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
-        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
-            dim.getDimension().getDataType());
+        fields[dim.getOrdinal()] =
+            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
       } else if (dim.getDimension().isComplex()) {
-        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
-           dim.getDimension().getDataType());
+        fields[dim.getOrdinal()] =
+            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
       } else {
-        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
-            DataTypes.INT);
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), DataTypes.INT);
       }
     }
 
@@ -212,7 +213,7 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
       }
     }
 
-    columnarBatch = CarbonVectorBatch.allocate(fields);
+    columnarBatch = CarbonVectorBatch.allocate(fields, readSupport);
     CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
     boolean[] filteredRows = new boolean[columnarBatch.capacity()];
     for (int i = 0; i < fields.length; i++) {
@@ -221,7 +222,6 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
   }
 
-
   private CarbonVectorBatch resultBatch() {
     if (columnarBatch == null) initBatch();
     return columnarBatch;
@@ -251,6 +251,9 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     return false;
   }
 
+  public CarbonVectorBatch getColumnarBatch() {
+    return columnarBatch;
+  }
   public void setTaskId(long taskId) {
     this.taskId = taskId;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
deleted file mode 100644
index 81a4b4f..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto.readers;
-
-import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
-
-/**
- * Abstract class for Stream Readers
- */
-public abstract class AbstractStreamReader implements StreamReader {
-
-  protected Object[] streamData;
-
-  protected CarbonColumnVectorImpl columnVector;
-
-  protected boolean isVectorReader;
-
-  protected int batchSize;
-
-  /**
-   * Setter for StreamData
-   * @param data
-   */
-  @Override public void setStreamData(Object[] data) {
-    this.streamData = data;
-  }
-
-  /**
-   * Setter for Vector data
-   * @param vector
-   */
-  @Override public void setVector(CarbonColumnVectorImpl vector) {
-    this.columnVector = vector;
-  }
-
-  /**
-   * Setter for vector Reader
-   * @param isVectorReader
-   */
-  public void setVectorReader(boolean isVectorReader) {
-    this.isVectorReader = isVectorReader;
-  }
-
-  /**
-   * Setter for BatchSize
-   * @param batchSize
-   */
-  public void setBatchSize(int batchSize) {
-    this.batchSize = batchSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
index 0b7206b..17578d7 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
@@ -17,91 +17,64 @@
 
 package org.apache.carbondata.presto.readers;
 
-import java.io.IOException;
-
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.BooleanType;
 import com.facebook.presto.spi.type.Type;
 
-public class BooleanStreamReader extends AbstractStreamReader {
+public class BooleanStreamReader extends CarbonColumnVectorImpl
+    implements PrestoVectorBlockBuilder {
 
-  private boolean isDictionary;
-  private Dictionary dictionary;
+  protected int batchSize;
 
-  public BooleanStreamReader() {
+  protected Type type = BooleanType.BOOLEAN;
 
-  }
+  protected BlockBuilder builder;
 
-  public BooleanStreamReader(boolean isDictionary, Dictionary dictionary) {
-    this.isDictionary = isDictionary;
+  private Dictionary dictionary;
+
+  public BooleanStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
+    super(batchSize, dataType);
+    this.batchSize = batchSize;
+    this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
     this.dictionary = dictionary;
   }
 
-  public Block readBlock(Type type) throws IOException {
-    int numberOfRows = 0;
-    BlockBuilder builder = null;
-    if (isVectorReader) {
-      numberOfRows = batchSize;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (columnVector != null) {
-        if (isDictionary) {
-          populateDictionaryVector(type, numberOfRows, builder);
-        } else {
-          if (columnVector.anyNullsSet()) {
-            handleNullInVector(type, numberOfRows, builder);
-          } else {
-            populateVector(type, numberOfRows, builder);
-          }
-        }
-      }
-    } else {
-      numberOfRows = streamData.length;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeBoolean(builder, byteToBoolean(streamData[i]));
-      }
-    }
-
+  @Override public Block buildBlock() {
     return builder.build();
   }
 
-  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      if (columnVector.isNullAt(i)) {
-        builder.appendNull();
-      } else {
-        type.writeBoolean(builder, byteToBoolean(columnVector.getData(i)));
-      }
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    Object data = DataTypeUtil
+        .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.BOOLEAN);
+    if (data != null) {
+      type.writeBoolean(builder, (boolean) data);
+    } else {
+      builder.appendNull();
     }
   }
 
-  private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeBoolean(builder, byteToBoolean(columnVector.getData(i)));
-      }
+  @Override public void putBoolean(int rowId, boolean value) {
+    type.writeBoolean(builder, value);
   }
 
-  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      int value = (int) columnVector.getData(i);
-      Object data = DataTypeUtil
-          .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.BOOLEAN);
-      if (data != null) {
-        type.writeBoolean(builder,(boolean) data);
-      } else {
-        builder.appendNull();
-      }
-    }
+  @Override public void putNull(int rowId) {
+    builder.appendNull();
   }
 
-  private Boolean byteToBoolean(Object value){
-    byte byteValue = (byte)value;
-    return byteValue == 1;
+  @Override public void reset() {
+    builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
   }
-}
 
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index 54f2b5f..ed88343 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -17,13 +17,13 @@
 
 package org.apache.carbondata.presto.readers;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Objects;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
@@ -45,80 +45,66 @@ import static java.math.RoundingMode.HALF_UP;
 /**
  * Reader for DecimalValues
  */
-public class DecimalSliceStreamReader  extends AbstractStreamReader {
+public class DecimalSliceStreamReader extends CarbonColumnVectorImpl
+    implements PrestoVectorBlockBuilder {
 
+  private final char[] buffer = new char[100];
+  protected int batchSize;
+  protected Type type;
+  protected BlockBuilder builder;
   private Dictionary dictionary;
-  private boolean isDictionary;
 
+  public DecimalSliceStreamReader(int batchSize,
+      org.apache.carbondata.core.metadata.datatype.DecimalType dataType, Dictionary dictionary) {
+    super(batchSize, dataType);
+    this.type = DecimalType.createDecimalType(dataType.getPrecision(), dataType.getScale());
+    this.batchSize = batchSize;
+    this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+    this.dictionary = dictionary;
+  }
 
-  private final char[] buffer = new char[100];
+  @Override public Block buildBlock() {
+    return builder.build();
+  }
 
-  public DecimalSliceStreamReader() {
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
 
+  @Override public void putInt(int rowId, int value) {
+    DecimalType decimalType = (DecimalType) type;
+    Object data = DataTypeUtil.getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value),
+        DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
+    if (Objects.isNull(data)) {
+      builder.appendNull();
+    } else {
+      decimalBlockWriter((BigDecimal) data);
+    }
   }
 
-  public DecimalSliceStreamReader(boolean isDictionary, Dictionary dictionary) {
-    this.dictionary = dictionary;
-    this.isDictionary = isDictionary;
+  @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
+    decimalBlockWriter(value);
   }
 
-  /**
-   * Create Block for DecimalType
-   * @param type
-   * @return
-   * @throws IOException
-   */
-  public Block readBlock(Type type) throws IOException {
-    int numberOfRows = 0;
-    BlockBuilder builder = null;
-    if (isVectorReader) {
-      numberOfRows = batchSize;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (columnVector != null) {
-        if (isDictionary) {
-          if (isShortDecimal(type)) {
-            populateShortDictionaryVector(type, numberOfRows, builder);
-          } else {
-            populateLongDictionaryVector(type, numberOfRows, builder);
-          }
-        } else {
-          if (columnVector.anyNullsSet()) {
-            handleNullInVector(type, numberOfRows, builder);
-          } else {
-            if (isShortDecimal(type)) {
-              populateShortDecimalVector(type, numberOfRows, builder);
-            } else {
-              populateLongDecimalVector(type, numberOfRows, builder);
-            }
-          }
-        }
-      }
+  @Override public void putNull(int rowId) {
+    builder.appendNull();
+  }
+
+  @Override public void reset() {
+    builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+  }
+
+  private void decimalBlockWriter(BigDecimal value) {
+    if (isShortDecimal(type)) {
+      long rescaledDecimal = Decimals.rescale(value.unscaledValue().longValue(), value.scale(),
+          ((DecimalType) type).getScale());
+      type.writeLong(builder, rescaledDecimal);
     } else {
-      if (streamData != null) {
-        numberOfRows = streamData.length;
-        builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-        for (int i = 0; i < numberOfRows; i++) {
-          Slice slice = getSlice(streamData[i], type);
-          if (isShortDecimal(type)) {
-            type.writeLong(builder, parseLong((DecimalType) type, slice, 0, slice.length()));
-          } else {
-            type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
-          }
-        }
-      }
-    }
-    if (builder == null) {
-      return null;
+      Slice slice = getSlice(value, type);
+      type.writeSlice(builder, parseSlice((DecimalType) type, slice, slice.length()));
     }
-    return builder.build();
   }
 
-  /**
-   * Function to getSlice from Decimal Object
-   * @param value
-   * @param type
-   * @return
-   */
   private Slice getSlice(Object value, Type type) {
     if (type instanceof DecimalType) {
       DecimalType actual = (DecimalType) type;
@@ -137,50 +123,20 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
               rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(), actual.getScale());
           Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
           return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
-
         }
-
       }
     } else {
       return utf8Slice(value.toString());
     }
   }
 
-  /**
-   * Function to parse ShortDecimalType as it is internally treated as Long
-   * @param type
-   * @param slice
-   * @param offset
-   * @param length
-   * @return
-   */
-  private long parseLong(DecimalType type, Slice slice, int offset, int length) {
-    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
-    return decimal.unscaledValue().longValue();
-  }
-
-  /**
-   * Function for parsing the Slice
-   * @param type
-   * @param slice
-   * @param offset
-   * @param length
-   * @return
-   */
-  private Slice parseSlice(DecimalType type, Slice slice, int offset, int length) {
-    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+  private Slice parseSlice(DecimalType type, Slice slice, int length) {
+    BigDecimal decimal = parseBigDecimal(type, slice, length);
     return encodeUnscaledValue(decimal.unscaledValue());
   }
 
-  /**
-   * Function for parsing the BigDecimal
-   * @param type
-   * @param slice
-   * @param offset
-   * @param length
-   * @return
-   */
-  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length) {
+  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int length) {
+    int offset = 0;
     checkArgument(length < buffer.length);
     for (int i = 0; i < length; i++) {
       buffer[i] = (char) slice.getByte(offset + i);
@@ -192,78 +148,5 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
     checkState(decimal.precision() <= type.getPrecision(),
         "Read decimal precision larger than column precision");
     return decimal;
-
   }
-
-  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      if (columnVector.isNullAt(i)) {
-        builder.appendNull();
-      } else {
-        if (isShortDecimal(type)) {
-          BigDecimal decimalValue = (BigDecimal)columnVector.getData(i);
-          long rescaledDecimal = Decimals.rescale(decimalValue.unscaledValue().longValue(),
-              decimalValue.scale(),((DecimalType) type).getScale());
-          type.writeLong(builder, rescaledDecimal);
-        } else {
-          Slice slice = getSlice(columnVector.getData(i), type);
-          type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
-        }
-      }
-    }
-  }
-
-  private void populateShortDecimalVector(Type type, int numberOfRows, BlockBuilder builder) {
-    DecimalType decimalType = (DecimalType) type;
-    for (int i = 0; i < numberOfRows; i++) {
-      BigDecimal decimalValue = (BigDecimal) columnVector.getData(i);
-      long rescaledDecimal = Decimals
-          .rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(),
-              decimalType.getScale());
-      type.writeLong(builder, rescaledDecimal);
-    }
-  }
-
-  private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      Slice slice = getSlice((columnVector.getData(i)), type);
-      type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
-    }
-  }
-
-  private void populateShortDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
-    DecimalType decimalType = (DecimalType) type;
-    for (int i = 0; i < numberOfRows; i++) {
-      int value = (int) columnVector.getData(i);
-      Object data = DataTypeUtil.getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value),
-          DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
-      if (Objects.isNull(data)) {
-        builder.appendNull();
-      } else {
-        BigDecimal decimalValue = (BigDecimal) data;
-        long rescaledDecimal = Decimals
-            .rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(),
-                decimalType.getScale());
-        type.writeLong(builder, rescaledDecimal);
-      }
-    }
-  }
-
-  private void populateLongDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
-    DecimalType decimalType = (DecimalType) type;
-    for (int i = 0; i < numberOfRows; i++) {
-      int value = (int) columnVector.getData(i);
-      Object data = DataTypeUtil.getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value),
-          DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
-      if (Objects.isNull(data)) {
-        builder.appendNull();
-      } else {
-        BigDecimal decimalValue = (BigDecimal) data;
-        Slice slice = getSlice(decimalValue, type);
-        type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
-      }
-    }
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
index 3e7fc59..384112f 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -17,95 +17,65 @@
 
 package org.apache.carbondata.presto.readers;
 
-import java.io.IOException;
-
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.DoubleType;
 import com.facebook.presto.spi.type.Type;
 
 /**
  * Class for Reading the Double value and setting it in Block
  */
-public class DoubleStreamReader extends AbstractStreamReader {
+public class DoubleStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder {
+
+  protected int batchSize;
+
+  protected Type type = DoubleType.DOUBLE;
+
+  protected BlockBuilder builder;
 
-  private boolean isDictionary;
   private Dictionary dictionary;
 
-  public DoubleStreamReader() {
+  public DoubleStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
+    super(batchSize, dataType);
+    this.batchSize = batchSize;
+    this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+    this.dictionary = dictionary;
+  }
 
+  @Override public Block buildBlock() {
+    return builder.build();
   }
 
-  public DoubleStreamReader(boolean isDictionary, Dictionary dictionary) {
-    this.isDictionary = isDictionary;
-    this.dictionary = dictionary;
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
   }
 
-  /**
-   * Create the DoubleType Block
-   *
-   * @param type
-   * @return
-   * @throws IOException
-   */
-  public Block readBlock(Type type) throws IOException {
-    int numberOfRows;
-    BlockBuilder builder;
-    if (isVectorReader) {
-      numberOfRows = batchSize;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (columnVector != null) {
-        if (isDictionary) {
-          populateDictionaryVector(type, numberOfRows, builder);
-        } else {
-          if (columnVector.anyNullsSet()) {
-            handleNullInVector(type, numberOfRows, builder);
-          } else {
-            populateVector(type, numberOfRows, builder);
-          }
-        }
-      }
+  @Override public void putInt(int rowId, int value) {
+    Object data = DataTypeUtil
+        .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.DOUBLE);
+    if (data != null) {
+      type.writeDouble(builder, (Double) data);
     } else {
-      numberOfRows = streamData.length;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeDouble(builder, (Double) streamData[i]);
-      }
+      builder.appendNull();
     }
-
-    return builder.build();
   }
 
-  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      if (columnVector.isNullAt(i)) {
-        builder.appendNull();
-      } else {
-        type.writeDouble(builder, (Double) columnVector.getData(i));
-      }
-    }
+  @Override public void putDouble(int rowId, double value) {
+    type.writeDouble(builder, value);
   }
 
-  private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      type.writeDouble(builder, (Double) columnVector.getData(i));
-    }
+  @Override public void putNull(int rowId) {
+    builder.appendNull();
   }
 
-  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      int value = (int) columnVector.getData(i);
-      Object data = DataTypeUtil
-          .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.DOUBLE);
-      if (data != null) {
-        type.writeDouble(builder, (Double) data);
-      } else {
-        builder.appendNull();
-      }
-    }
+  @Override public void reset() {
+    builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index ffe1aef..a3ce908 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -17,86 +17,64 @@
 
 package org.apache.carbondata.presto.readers;
 
-import java.io.IOException;
-
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.IntegerType;
 import com.facebook.presto.spi.type.Type;
 
-public class IntegerStreamReader extends AbstractStreamReader {
+public class IntegerStreamReader extends CarbonColumnVectorImpl
+    implements PrestoVectorBlockBuilder {
 
-  private Dictionary dictionaryValues;
-  private boolean isDictionary;
+  protected int batchSize;
 
-  public IntegerStreamReader() {
+  protected Type type = IntegerType.INTEGER;
 
-  }
+  protected BlockBuilder builder;
 
-  public IntegerStreamReader(boolean isDictionary, Dictionary dictionary) {
-    this.dictionaryValues = dictionary;
-    this.isDictionary = isDictionary;
+  private Dictionary dictionary;
+
+  public IntegerStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
+    super(batchSize, dataType);
+    this.batchSize = batchSize;
+    this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+    this.dictionary = dictionary;
   }
 
-  public Block readBlock(Type type) throws IOException {
-    int numberOfRows;
-    BlockBuilder builder;
-    if (isVectorReader) {
-      numberOfRows = batchSize;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (columnVector != null) {
-        if (isDictionary) {
-          populateDictionaryVector(type, numberOfRows, builder);
-        } else {
-          if (columnVector.anyNullsSet()) {
-            handleNullInVector(type, numberOfRows, builder);
-          } else {
-            populateVector(type, numberOfRows, builder);
-          }
-        }
-      }
-    } else {
-      numberOfRows = streamData.length;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeLong(builder, ((Integer) streamData[i]).longValue());
-      }
-    }
+  @Override public Block buildBlock() {
     return builder.build();
   }
 
-  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      if (columnVector.isNullAt(i)) {
-        builder.appendNull();
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    if (dictionary == null) {
+      type.writeLong(builder, value);
+    } else {
+      Object data = DataTypeUtil
+          .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.INT);
+      if (data != null) {
+        type.writeLong(builder, ((Integer) data).longValue());
       } else {
-        type.writeLong(builder, ((Integer) columnVector.getData(i)).longValue());
+        builder.appendNull();
       }
     }
   }
 
-  private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-      for (int i = 0; i < numberOfRows; i++) {
-        Integer value = (Integer) columnVector.getData(i);
-        type.writeLong(builder, value.longValue());
-      }
+  @Override public void putNull(int rowId) {
+    builder.appendNull();
   }
 
-  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
-      for (int i = 0; i < numberOfRows; i++) {
-        int value = (int) columnVector.getData(i);
-        Object data = DataTypeUtil
-            .getDataBasedOnDataType(dictionaryValues.getDictionaryValueForKey(value),
-                DataTypes.INT);
-        if (data != null) {
-          type.writeLong(builder, ((Integer) data).longValue());
-        } else {
-          builder.appendNull();
-        }
-      }
+  @Override public void reset() {
+    builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index e1000c5..892614d 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -17,83 +17,62 @@
 
 package org.apache.carbondata.presto.readers;
 
-import java.io.IOException;
-
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.BigintType;
 import com.facebook.presto.spi.type.Type;
 
-public class LongStreamReader extends AbstractStreamReader {
+public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder {
+
+  protected int batchSize;
+
+  protected Type type = BigintType.BIGINT;
+
+  protected BlockBuilder builder;
 
-  private boolean isDictionary;
   private Dictionary dictionary;
 
-  public LongStreamReader() {
+  public LongStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
+    super(batchSize, dataType);
+    this.batchSize = batchSize;
+    this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+    this.dictionary = dictionary;
+  }
 
+  @Override public Block buildBlock() {
+    return builder.build();
   }
 
-  public LongStreamReader(boolean isDictionary, Dictionary dictionary) {
-    this.isDictionary = isDictionary;
-    this.dictionary = dictionary;
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
   }
 
-  public Block readBlock(Type type) throws IOException {
-    int numberOfRows;
-    BlockBuilder builder;
-    if (isVectorReader) {
-      numberOfRows = batchSize;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (columnVector != null) {
-        if (isDictionary) {
-          populateDictionaryVector(type, numberOfRows, builder);
-        }
-        if (columnVector.anyNullsSet()) {
-          handleNullInVector(type, numberOfRows, builder);
-        } else {
-          populateVector(type, numberOfRows, builder);
-        }
-      }
+  @Override public void putInt(int rowId, int value) {
+    Object data = DataTypeUtil
+        .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.LONG);
+    if (data != null) {
+      type.writeLong(builder, (Long) data);
     } else {
-      numberOfRows = streamData.length;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeLong(builder, (Long) streamData[i]);
-      }
+      builder.appendNull();
     }
-    return builder.build();
   }
 
-  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      if (columnVector.isNullAt(i)) {
-        builder.appendNull();
-      } else {
-        type.writeLong(builder, (Long) columnVector.getData(i));
-      }
-    }
+  @Override public void putLong(int rowId, long value) {
+    type.writeLong(builder, value);
   }
 
-  private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-        type.writeLong(builder, (long) columnVector.getData(i));
-    }
+  @Override public void putNull(int rowId) {
+    builder.appendNull();
   }
 
-  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-        int value = (int) columnVector.getData(i);
-        Object data = DataTypeUtil
-            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.LONG);
-        if (data != null) {
-          type.writeLong(builder, (Long) data);
-        } else {
-          builder.appendNull();
-        }
-      }
-    }
+  @Override public void reset() {
+    builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
index 8952712..e4c9775 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
@@ -17,50 +17,50 @@
 
 package org.apache.carbondata.presto.readers;
 
-import java.io.IOException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.IntegerType;
 import com.facebook.presto.spi.type.Type;
 
 /**
  * Class to read the Object Stream
  */
-public class ObjectStreamReader  extends AbstractStreamReader {
+public class ObjectStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder {
 
+  protected int batchSize;
 
+  protected Type type = IntegerType.INTEGER;
 
-  public ObjectStreamReader() {
+  protected BlockBuilder builder;
 
+  public ObjectStreamReader(int batchSize, DataType dataType) {
+    super(batchSize, dataType);
+    this.batchSize = batchSize;
+    this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
   }
 
-  /**
-   * Function to create the object Block
-   * @param type
-   * @return
-   * @throws IOException
-   */
-  public Block readBlock(Type type) throws IOException {
-    int numberOfRows = 0;
-    BlockBuilder builder = null;
-    if (isVectorReader) {
-      numberOfRows = batchSize;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (columnVector != null) {
-        for (int i = 0; i < numberOfRows; i++) {
-          type.writeObject(builder, columnVector.getData(i));
-        }
-      }
-    } else {
-      numberOfRows = streamData.length;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeObject(builder, streamData[i]);
-      }
-    }
-
+  @Override public Block buildBlock() {
     return builder.build();
   }
 
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @Override public void putObject(int rowId, Object value) {
+    type.writeObject(builder, value);
+  }
+
+  @Override public void putNull(int rowId) {
+    builder.appendNull();
+  }
+
+  @Override public void reset() {
+    builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/PrestoVectorBlockBuilder.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/PrestoVectorBlockBuilder.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/PrestoVectorBlockBuilder.java
new file mode 100644
index 0000000..001e4c4
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/PrestoVectorBlockBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import com.facebook.presto.spi.block.Block;
+
+public interface PrestoVectorBlockBuilder {
+
+  Block buildBlock();
+
+  void setBatchSize(int batchSize);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 51f1cd5..d207fd9 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -17,85 +17,62 @@
 
 package org.apache.carbondata.presto.readers;
 
-import java.io.IOException;
-
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.SmallintType;
 import com.facebook.presto.spi.type.Type;
 
-public class ShortStreamReader extends AbstractStreamReader {
+public class ShortStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder {
 
-  private boolean isDictionary;
-  private Dictionary dictionary;
+  protected int batchSize;
 
-  public ShortStreamReader() {
+  protected Type type = SmallintType.SMALLINT;
 
-  }
+  protected BlockBuilder builder;
 
-  public ShortStreamReader(boolean isDictionary, Dictionary dictionary) {
-    this.isDictionary = isDictionary;
+  private Dictionary dictionary;
+
+  public ShortStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
+    super(batchSize, dataType);
+    this.batchSize = batchSize;
+    this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
     this.dictionary = dictionary;
   }
 
-  public Block readBlock(Type type) throws IOException {
-    int numberOfRows;
-    BlockBuilder builder;
-    if (isVectorReader) {
-      numberOfRows = batchSize;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (columnVector != null) {
-        if (isDictionary) {
-          populateDictionaryVector(type, numberOfRows, builder);
-        } else {
-          if (columnVector.anyNullsSet()) {
-            handleNullInVector(type, numberOfRows, builder);
-          } else {
-            populateVector(type, numberOfRows, builder);
-          }
-        }
-      }
-    } else {
-      numberOfRows = streamData.length;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeLong(builder, (Short) streamData[i]);
-      }
-    }
+  @Override public Block buildBlock() {
     return builder.build();
   }
 
-  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      if (columnVector.isNullAt(i)) {
-        builder.appendNull();
-      } else {
-        type.writeLong(builder, ((Short) columnVector.getData(i)));
-      }
-    }
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
   }
 
-  private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      type.writeLong(builder, (Short) columnVector.getData(i));
+  @Override public void putInt(int rowId, int value) {
+    Object data = DataTypeUtil
+        .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.SHORT);
+    if (data != null) {
+      type.writeLong(builder, (Short) data);
+    } else {
+      builder.appendNull();
     }
   }
 
-  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      int value = (int) columnVector.getData(i);
-      Object data = DataTypeUtil
-          .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.SHORT);
-      if (data != null) {
-        type.writeLong(builder, (Short) data);
-      } else {
-        builder.appendNull();
-      }
-    }
+  @Override public void putShort(int rowId, short value) {
+    type.writeLong(builder, value);
   }
 
+  @Override public void putNull(int rowId) {
+    builder.appendNull();
+  }
+
+  @Override public void reset() {
+    builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index cce35e0..53ece0b 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -17,10 +17,8 @@
 
 package org.apache.carbondata.presto.readers;
 
-import java.io.IOException;
-
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
@@ -28,85 +26,68 @@ import com.facebook.presto.spi.block.BlockBuilderStatus;
 import com.facebook.presto.spi.block.DictionaryBlock;
 import com.facebook.presto.spi.block.SliceArrayBlock;
 import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.VarcharType;
 
-import static io.airlift.slice.Slices.utf8Slice;
 import static io.airlift.slice.Slices.wrappedBuffer;
 
 /**
  * This class reads the String data and convert it into Slice Block
  */
-public class SliceStreamReader extends AbstractStreamReader {
+public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder {
 
-  private boolean isDictionary;
+  protected int batchSize;
 
-  private SliceArrayBlock dictionarySliceArrayBlock;
+  protected Type type = VarcharType.VARCHAR;
 
-  public SliceStreamReader() {
-  }
+  protected BlockBuilder builder;
+  int[] values;
+  private SliceArrayBlock dictionarySliceArrayBlock;
 
-  public SliceStreamReader(boolean isDictionary, SliceArrayBlock dictionarySliceArrayBlock) {
-    this.isDictionary = isDictionary;
-    this.dictionarySliceArrayBlock = dictionarySliceArrayBlock;
+  public SliceStreamReader(int batchSize, DataType dataType,
+      SliceArrayBlock dictionarySliceArrayBlock) {
+    super(batchSize, dataType);
+    this.batchSize = batchSize;
+    if (dictionarySliceArrayBlock == null) {
+      this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+    } else {
+      this.dictionarySliceArrayBlock = dictionarySliceArrayBlock;
+      this.values = new int[batchSize];
+    }
   }
 
-  /**
-   * Function to create the Slice Block
-   *
-   * @param type
-   * @return
-   * @throws IOException
-   */
-  public Block readBlock(Type type) throws IOException {
-    int numberOfRows;
-    BlockBuilder builder;
-    if (isVectorReader) {
-      numberOfRows = batchSize;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (columnVector != null) {
-        if (isDictionary) {
-          int[] values = new int[numberOfRows];
-          for (int i = 0; i < numberOfRows; i++) {
-            if (!columnVector.isNullAt(i)) {
-              values[i] = (Integer) columnVector.getData(i);
-            }
-          }
-          return new DictionaryBlock(batchSize, dictionarySliceArrayBlock, values);
-        } else {
-          if (columnVector.anyNullsSet()) {
-            handleNullInVector(type, numberOfRows, builder);
-          } else {
-            populateVector(type, numberOfRows, builder);
-          }
-        }
-      }
+  @Override public Block buildBlock() {
+    if (dictionarySliceArrayBlock == null) {
+      return builder.build();
     } else {
-      numberOfRows = streamData.length;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeSlice(builder, utf8Slice(streamData[i].toString()));
-      }
+      return new DictionaryBlock(batchSize, dictionarySliceArrayBlock, values);
     }
-
-    return builder.build();
   }
 
-  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      if (columnVector.isNullAt(i)) {
-        builder.appendNull();
-      } else {
-        type.writeSlice(builder, wrappedBuffer((byte[]) columnVector.getData(i)));
-      }
-    }
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
   }
 
-  private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      type.writeSlice(builder, wrappedBuffer((byte[]) columnVector.getData(i)));
-    }
+  @Override public void putInt(int rowId, int value) {
+    values[rowId] = value;
   }
 
+  @Override public void putBytes(int rowId, byte[] value) {
+    type.writeSlice(builder, wrappedBuffer(value));
+  }
 
+  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+    byte[] byteArr = new byte[length];
+    System.arraycopy(value, offset, byteArr, 0, length);
+    type.writeSlice(builder, wrappedBuffer(byteArr));
+  }
 
+  @Override public void putNull(int rowId) {
+    if (dictionarySliceArrayBlock == null) {
+      builder.appendNull();
+    }
+  }
 
+  @Override public void reset() {
+    builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
deleted file mode 100644
index c3cd6c0..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto.readers;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
-
-import com.facebook.presto.spi.block.Block;
-import com.facebook.presto.spi.type.Type;
-
-/**
- * Interface for StreamReader
- */
-public interface StreamReader {
-
-  Block readBlock(Type type) throws IOException;
-
-  void setStreamData(Object[] data);
-
-  void setVector(CarbonColumnVectorImpl vector);
-
-  void setVectorReader(boolean isVectorReader);
-
-  void setBatchSize(int batchSize);
-
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
deleted file mode 100644
index 1ad3d28..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.presto.readers;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-
-import com.facebook.presto.spi.block.SliceArrayBlock;
-import com.facebook.presto.spi.type.DateType;
-import com.facebook.presto.spi.type.DecimalType;
-import com.facebook.presto.spi.type.IntegerType;
-import com.facebook.presto.spi.type.SmallintType;
-import com.facebook.presto.spi.type.TimestampType;
-import com.facebook.presto.spi.type.Type;
-import io.airlift.slice.Slice;
-
-/**
- * This class creates streamReader
- * Based on type.
- */
-public final class StreamReaders {
-  /**
-   * This function select Stream readers based on Type and use it.
-   *
-   * @param type
-   * @param dictionarySliceArrayBlock
-   * @return StreamReader
-   */
-  public static StreamReader createStreamReader(Type type,
-      SliceArrayBlock dictionarySliceArrayBlock, Dictionary dictionary) {
-    Class<?> javaType = type.getJavaType();
-    if (dictionary != null) {
-      if (javaType == long.class) {
-        if (type instanceof IntegerType || type instanceof DateType) {
-          return new IntegerStreamReader(true, dictionary);
-        } else if (type instanceof DecimalType) {
-          return new DecimalSliceStreamReader(true, dictionary);
-        } else if (type instanceof SmallintType) {
-          return new ShortStreamReader(true, dictionary);
-        }
-        return new LongStreamReader(true, dictionary);
-
-      } else if (javaType == double.class) {
-        return new DoubleStreamReader(true, dictionary);
-      } else if (javaType == Slice.class) {
-        if (type instanceof DecimalType) {
-          return new DecimalSliceStreamReader(true, dictionary);
-        } else {
-          return new SliceStreamReader(true, dictionarySliceArrayBlock);
-        }
-      }else if (javaType == boolean.class) {
-              return new BooleanStreamReader(true,dictionary);
-      } else {
-        return new ObjectStreamReader();
-      }
-    } else {
-      if (javaType == long.class) {
-        if (type instanceof IntegerType || type instanceof DateType) {
-          return new IntegerStreamReader();
-        } else if (type instanceof DecimalType) {
-          return new DecimalSliceStreamReader();
-        } else if (type instanceof SmallintType) {
-          return new ShortStreamReader();
-        } else if (type instanceof TimestampType) {
-          return new TimestampStreamReader();
-        }
-        return new LongStreamReader();
-
-      } else if (javaType == double.class) {
-        return new DoubleStreamReader();
-      } else if (javaType == Slice.class) {
-        if (type instanceof DecimalType) {
-          return new DecimalSliceStreamReader();
-        } else {
-          return new SliceStreamReader();
-        }
-      }else if (javaType == boolean.class) {
-        return new BooleanStreamReader();
-      } else {
-        return new ObjectStreamReader();
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
index a22ef29..f52916c 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
@@ -17,60 +17,63 @@
 
 package org.apache.carbondata.presto.readers;
 
-import java.io.IOException;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.TimestampType;
 import com.facebook.presto.spi.type.Type;
 
-public class TimestampStreamReader extends AbstractStreamReader {
+public class TimestampStreamReader extends CarbonColumnVectorImpl
+    implements PrestoVectorBlockBuilder {
 
-  private int TIMESTAMP_DIVISOR  = 1000;
+  protected int batchSize;
 
-  public TimestampStreamReader() {
+  protected Type type = TimestampType.TIMESTAMP;
 
-  }
+  protected BlockBuilder builder;
 
-  public Block readBlock(Type type) throws IOException {
-    int numberOfRows = 0;
-    BlockBuilder builder = null;
-    if (isVectorReader) {
-      numberOfRows = batchSize;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (columnVector != null) {
-        if (columnVector.anyNullsSet()) {
-          handleNullInVector(type, numberOfRows, builder);
-        } else {
-          populateVector(type, numberOfRows, builder);
-        }
-      }
+  private Dictionary dictionary;
 
-    } else {
-      numberOfRows = streamData.length;
-      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeLong(builder, (Long) streamData[i]);
-      }
-    }
+  public TimestampStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
+    super(batchSize, dataType);
+    this.batchSize = batchSize;
+    this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+    this.dictionary = dictionary;
+  }
 
+  @Override public Block buildBlock() {
     return builder.build();
   }
 
-  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      if (columnVector.isNullAt(i)) {
-        builder.appendNull();
-      } else {
-        type.writeLong(builder, (Long)columnVector.getData(i)/ TIMESTAMP_DIVISOR);
-      }
-    }
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
   }
 
-  private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      type.writeLong(builder, (Long)columnVector.getData(i)/TIMESTAMP_DIVISOR);
+  @Override public void putInt(int rowId, int value) {
+    Object data = DataTypeUtil
+        .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.LONG);
+    if (data != null) {
+      type.writeLong(builder, (Long) data / 1000);
+    } else {
+      builder.appendNull();
     }
   }
 
+  @Override public void putLong(int rowId, long value) {
+    type.writeLong(builder, value / 1000);
+  }
+
+  @Override public void putNull(int rowId) {
+    builder.appendNull();
+  }
+
+  @Override public void reset() {
+    builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
index 82cdf3a..42d7c93 100644
--- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
+++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
@@ -93,7 +93,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
     while (chunks.hasNext) {
       {
         val value: Array[Byte] = chunks.next
-        if(count ==1) {
+        if (count == 1) {
           sliceArray(count) = null
         }
         else {
@@ -123,6 +123,10 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
     dictionaries
   }
 
+  def getDataTypes: Array[DataType] = {
+    dataTypes
+  }
+
   /**
    * to book keep the dictionary cache or update access count for each
    * column involved during decode, to facilitate LRU cache policy if memory


Mime
View raw message