carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [2/8] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version (Refactored streaming module and column vector to support spark 2.3.1)
Date Wed, 05 Sep 2018 12:40:10 GMT
[CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version (Refactored streaming module and column vector to support spark 2.3.1)

[CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version (Refactored streaming module and column vector to support spark 2.3.1)

a3f162f5bc8f02a5d1b0ce4feb35d47257ba537e [CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version(Make API changes in carbon to be compatible with spark 2.3)

e760f4ab3f43a1dac1a0478e60d7ab7afe94138d  [HOTFIX]Fixed test case failure due to bad record handling

In this PR inorder to hide the compatibility issues of columnar vector API's from the existing common classes, i introduced a proxy vector class, All the common classes of the carbon-spark intergration layer will use this proxy class to interct with ColumnarBatch/ColumnVector API of spark datasource.
thus proxy vector class will take care the compatibility issues with respect to spark different versions.

Column vector and Columnar Batch interface compatibility issues has been addressed in this PR, The changes were related to below modifications done in spark interface.

Highlights:
a) This is a refactoring of ColumnVector hierarchy and related classes.
b) make ColumnVector read-only.
c) introduce WritableColumnVector with write interface
d) remove ReadOnlyColumnVector

This closes #2642


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bcef656d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bcef656d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bcef656d

Branch: refs/heads/master
Commit: bcef656dd6124b3962c4b4889dbd97a5f60233b2
Parents: cf74c88
Author: sujith71955 <sujithchacko.2010@gmail.com>
Authored: Wed May 23 22:51:50 2018 -0700
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Wed Sep 5 18:07:43 2018 +0530

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  12 +-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |  18 +-
 .../vectorreader/ColumnarVectorWrapper.java     |  67 +-
 .../VectorizedCarbonRecordReader.java           |  41 +-
 .../stream/CarbonStreamRecordReader.java        | 761 ++++++++++++++++++
 .../org/apache/spark/sql/CarbonVectorProxy.java | 226 ++++++
 .../org/apache/spark/sql/CarbonVectorProxy.java | 229 ++++++
 .../org/apache/spark/sql/CarbonVectorProxy.java | 255 ++++++
 .../apache/spark/sql/ColumnVectorFactory.java   |  45 ++
 .../streaming/CarbonStreamInputFormat.java      |  46 +-
 .../streaming/CarbonStreamRecordReader.java     | 772 -------------------
 .../carbondata/streaming/CarbonStreamUtils.java |  40 +
 .../streaming/StreamBlockletReader.java         |  39 +-
 13 files changed, 1693 insertions(+), 858 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e88ae81..b712109 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -62,7 +62,7 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.util.Util
-import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
+import org.apache.carbondata.streaming.CarbonStreamInputFormat
 
 /**
  * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
@@ -421,13 +421,13 @@ class CarbonScanRDD[T: ClassTag](
           // create record reader for row format
           DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
           val inputFormat = new CarbonStreamInputFormat
-          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
-            .asInstanceOf[CarbonStreamRecordReader]
-          streamReader.setVectorReader(vectorReader)
-          streamReader.setInputMetricsStats(inputMetricsStats)
+          inputFormat.setVectorReader(vectorReader)
+          inputFormat.setInputMetricsStats(inputMetricsStats)
           model.setStatisticsRecorder(
             CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
-          streamReader.setQueryModel(model)
+          inputFormat.setModel(model)
+          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
+            .asInstanceOf[RecordReader[Void, Object]]
           streamReader
         case _ =>
           // create record reader for CarbonData file format

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 994cb3d..3cbfab2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -22,7 +22,7 @@ import java.util
 import java.util.{Date, UUID}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.{Job, RecordReader, TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
 import org.apache.spark.sql.SparkSession
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
@@ -48,8 +49,13 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
+<<<<<<< 2f537b724f6f03ab40c95f7ecc8ebd38f6500099
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
+=======
+import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl}
+import org.apache.carbondata.streaming.CarbonStreamInputFormat
+>>>>>>> [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, ColumnVector Interface
 
 
 /**
@@ -75,7 +81,7 @@ class HandoffPartition(
  * and we can extract it later
  */
 class StreamingRawResultIterator(
-    recordReader: CarbonStreamRecordReader
+    recordReader: RecordReader[Void, Any]
 ) extends RawResultIterator(null, null, null) {
 
   override def hasNext: Boolean = {
@@ -162,10 +168,10 @@ class StreamHandoffRDD[K, V](
     val model = format.createQueryModel(inputSplit, attemptContext)
     val inputFormat = new CarbonStreamInputFormat
     val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
-      .asInstanceOf[CarbonStreamRecordReader]
-    streamReader.setVectorReader(false)
-    streamReader.setQueryModel(model)
-    streamReader.setUseRawRow(true)
+      .asInstanceOf[RecordReader[Void, Any]]
+    inputFormat.setVectorReader(false)
+    inputFormat.setModel(model)
+    inputFormat.setUseRawRow(true)
     streamReader.initialize(inputSplit, attemptContext)
     val iteratorList = new util.ArrayList[RawResultIterator](1)
     iteratorList.add(new StreamingRawResultIterator(streamReader))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 4f34650..7bab117 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -24,48 +24,46 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 
 import org.apache.parquet.column.Encoding;
+import org.apache.spark.sql.CarbonVectorProxy;
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
 import org.apache.spark.sql.types.Decimal;
 
 class ColumnarVectorWrapper implements CarbonColumnVector {
 
-  private ColumnVector columnVector;
+  private CarbonVectorProxy writableColumnVector;
 
   private boolean[] filteredRows;
 
   private int counter;
 
+  private int ordinal;
+
   private boolean filteredRowsExist;
 
   private DataType blockDataType;
 
-  private CarbonColumnVector dictionaryVector;
-
-  ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
-    this.columnVector = columnVector;
+  ColumnarVectorWrapper(CarbonVectorProxy writableColumnVector,
+                        boolean[] filteredRows, int ordinal) {
+    this.writableColumnVector = writableColumnVector;
     this.filteredRows = filteredRows;
-    if (columnVector.getDictionaryIds() != null) {
-      this.dictionaryVector =
-          new ColumnarVectorWrapper(columnVector.getDictionaryIds(), filteredRows);
-    }
+    this.ordinal = ordinal;
   }
 
   @Override public void putBoolean(int rowId, boolean value) {
     if (!filteredRows[rowId]) {
-      columnVector.putBoolean(counter++, value);
+      writableColumnVector.putBoolean(counter++, value, ordinal);
     }
   }
 
   @Override public void putFloat(int rowId, float value) {
     if (!filteredRows[rowId]) {
-      columnVector.putFloat(counter++, value);
+      writableColumnVector.putFloat(counter++, value,ordinal);
     }
   }
 
   @Override public void putShort(int rowId, short value) {
     if (!filteredRows[rowId]) {
-      columnVector.putShort(counter++, value);
+      writableColumnVector.putShort(counter++, value, ordinal);
     }
   }
 
@@ -73,18 +71,18 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          columnVector.putShort(counter++, value);
+          writableColumnVector.putShort(counter++, value, ordinal);
         }
         rowId++;
       }
     } else {
-      columnVector.putShorts(rowId, count, value);
+      writableColumnVector.putShorts(rowId, count, value, ordinal);
     }
   }
 
   @Override public void putInt(int rowId, int value) {
     if (!filteredRows[rowId]) {
-      columnVector.putInt(counter++, value);
+      writableColumnVector.putInt(counter++, value, ordinal);
     }
   }
 
@@ -92,18 +90,18 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          columnVector.putInt(counter++, value);
+          writableColumnVector.putInt(counter++, value, ordinal);
         }
         rowId++;
       }
     } else {
-      columnVector.putInts(rowId, count, value);
+      writableColumnVector.putInts(rowId, count, value, ordinal);
     }
   }
 
   @Override public void putLong(int rowId, long value) {
     if (!filteredRows[rowId]) {
-      columnVector.putLong(counter++, value);
+      writableColumnVector.putLong(counter++, value, ordinal);
     }
   }
 
@@ -111,19 +109,19 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          columnVector.putLong(counter++, value);
+          writableColumnVector.putLong(counter++, value, ordinal);
         }
         rowId++;
       }
     } else {
-      columnVector.putLongs(rowId, count, value);
+      writableColumnVector.putLongs(rowId, count, value, ordinal);
     }
   }
 
   @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
     if (!filteredRows[rowId]) {
       Decimal toDecimal = Decimal.apply(value);
-      columnVector.putDecimal(counter++, toDecimal, precision);
+      writableColumnVector.putDecimal(counter++, toDecimal, precision, ordinal);
     }
   }
 
@@ -131,7 +129,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     Decimal decimal = Decimal.apply(value);
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
-        columnVector.putDecimal(counter++, decimal, precision);
+        writableColumnVector.putDecimal(counter++, decimal, precision, ordinal);
       }
       rowId++;
     }
@@ -139,7 +137,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   @Override public void putDouble(int rowId, double value) {
     if (!filteredRows[rowId]) {
-      columnVector.putDouble(counter++, value);
+      writableColumnVector.putDouble(counter++, value, ordinal);
     }
   }
 
@@ -147,25 +145,25 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          columnVector.putDouble(counter++, value);
+          writableColumnVector.putDouble(counter++, value, ordinal);
         }
         rowId++;
       }
     } else {
-      columnVector.putDoubles(rowId, count, value);
+      writableColumnVector.putDoubles(rowId, count, value, ordinal);
     }
   }
 
   @Override public void putBytes(int rowId, byte[] value) {
     if (!filteredRows[rowId]) {
-      columnVector.putByteArray(counter++, value);
+      writableColumnVector.putByteArray(counter++, value, ordinal);
     }
   }
 
   @Override public void putBytes(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
-        columnVector.putByteArray(counter++, value);
+        writableColumnVector.putByteArray(counter++, value, ordinal);
       }
       rowId++;
     }
@@ -173,13 +171,13 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
     if (!filteredRows[rowId]) {
-      columnVector.putByteArray(counter++, value, offset, length);
+      writableColumnVector.putByteArray(counter++, value, offset, length, ordinal);
     }
   }
 
   @Override public void putNull(int rowId) {
     if (!filteredRows[rowId]) {
-      columnVector.putNull(counter++);
+      writableColumnVector.putNull(counter++, ordinal);
     }
   }
 
@@ -187,12 +185,12 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          columnVector.putNull(counter++);
+          writableColumnVector.putNull(counter++, ordinal);
         }
         rowId++;
       }
     } else {
-      columnVector.putNulls(rowId, count);
+      writableColumnVector.putNulls(rowId, count,ordinal);
     }
   }
 
@@ -216,7 +214,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   }
 
   @Override public boolean isNull(int rowId) {
-    return columnVector.isNullAt(rowId);
+    return writableColumnVector.isNullAt(rowId,ordinal);
   }
 
   @Override public void putObject(int rowId, Object obj) {
@@ -237,7 +235,8 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   }
 
   @Override public DataType getType() {
-    return CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(columnVector.dataType());
+    return CarbonSparkDataSourceUtil
+        .convertSparkToCarbonDataType(writableColumnVector.dataType(ordinal));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index f237552..7c98608 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -51,13 +51,16 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.spark.memory.MemoryMode;
+<<<<<<< 2f537b724f6f03ab40c95f7ecc8ebd38f6500099:integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+=======
+import org.apache.spark.sql.CarbonVectorProxy;
+>>>>>>> [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, ColumnVector Interface:integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
 import org.apache.spark.sql.types.DecimalType;
 import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 
 /**
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
@@ -70,9 +73,11 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
 
   private int batchIdx = 0;
 
+  private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
+
   private int numBatched = 0;
 
-  private ColumnarBatch columnarBatch;
+  private CarbonVectorProxy vectorProxy;
 
   private CarbonColumnarBatch carbonColumnarBatch;
 
@@ -159,9 +164,9 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
   @Override
   public void close() throws IOException {
     logStatistics(rowCount, queryModel.getStatisticsRecorder());
-    if (columnarBatch != null) {
-      columnarBatch.close();
-      columnarBatch = null;
+    if (vectorProxy != null) {
+      vectorProxy.close();
+      vectorProxy = null;
     }
     // clear dictionary cache
     Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
@@ -195,15 +200,15 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
   @Override
   public Object getCurrentValue() throws IOException, InterruptedException {
     if (returnColumnarBatch) {
-      int value = columnarBatch.numValidRows();
+      int value = vectorProxy.numRows();
       rowCount += value;
       if (inputMetricsStats != null) {
         inputMetricsStats.incrementRecordRead((long) value);
       }
-      return columnarBatch;
+      return vectorProxy.getColumnarBatch();
     }
     rowCount += 1;
-    return columnarBatch.getRow(batchIdx - 1);
+    return vectorProxy.getRow(batchIdx - 1);
   }
 
   @Override
@@ -271,7 +276,6 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
             CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
       }
     }
-
     StructType schema = new StructType(fields);
     if (partitionColumns != null) {
       for (StructField field : partitionColumns.fields()) {
@@ -286,15 +290,16 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
         columnarBatch.column(i + partitionIdx).setIsConstant();
       }
     }
+    vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP,DEFAULT_BATCH_SIZE,fields);
     CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
-    boolean[] filteredRows = new boolean[columnarBatch.capacity()];
+    boolean[] filteredRows = new boolean[vectorProxy.numRows()];
     for (int i = 0; i < fields.length; i++) {
-      if (isNoDictStringField[i]) {
-        columnarBatch.column(i).reserveDictionaryIds(columnarBatch.capacity());
+    if (isNoDictStringField[i]) {
+      vectorProxy.reserveDictionaryIds(vectorProxy.numRows(), i);
       }
-      vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows);
+      vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i);
     }
-    carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
+    carbonColumnarBatch = new CarbonColumnarBatch(vectors, vectorProxy.numRows(), filteredRows);
   }
 
   private void initBatch() {
@@ -302,7 +307,7 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
   }
 
   private void resultBatch() {
-    if (columnarBatch == null) initBatch();
+    if (vectorProxy == null) initBatch();
   }
 
 
@@ -314,16 +319,16 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
     if (null != isNoDictStringField) {
       for (int i = 0; i < isNoDictStringField.length; i++) {
         if (isNoDictStringField[i]) {
-          columnarBatch.column(i).getDictionaryIds().reset();
+          vectorProxy.resetDictionaryIds(i);
         }
       }
     }
-    columnarBatch.reset();
+    vectorProxy.reset();
     carbonColumnarBatch.reset();
     if (iterator.hasNext()) {
       iterator.processNextBatch(carbonColumnarBatch);
       int actualSize = carbonColumnarBatch.getActualSize();
-      columnarBatch.setNumRows(actualSize);
+      vectorProxy.setNumRows(actualSize);
       numBatched = actualSize;
       batchIdx = 0;
       return true;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
new file mode 100644
index 0000000..124413d
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.streaming.CarbonStreamInputFormat;
+import org.apache.carbondata.streaming.CarbonStreamUtils;
+import org.apache.carbondata.streaming.StreamBlockletReader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Stream record reader
+ */
+public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
+  // vector reader
+  private boolean isVectorReader;
+
+  // metadata
+  private CarbonTable carbonTable;
+  private CarbonColumn[] storageColumns;
+  private boolean[] isRequired;
+  private DataType[] measureDataTypes;
+  private int dimensionCount;
+  private int measureCount;
+
+  // input
+  private FileSplit fileSplit;
+  private Configuration hadoopConf;
+  private StreamBlockletReader input;
+  private boolean isFirstRow = true;
+  private QueryModel model;
+
+  // decode data
+  private BitSet allNonNull;
+  private boolean[] isNoDictColumn;
+  private DirectDictionaryGenerator[] directDictionaryGenerators;
+  private CacheProvider cacheProvider;
+  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+  private GenericQueryType[] queryTypes;
+
+  // vectorized reader
+  private StructType outputSchema;
+  private Object vectorProxy;
+  private boolean isFinished = false;
+
+  // filter
+  private FilterExecuter filter;
+  private boolean[] isFilterRequired;
+  private Object[] filterValues;
+  private RowIntf filterRow;
+  private int[] filterMap;
+
+  // output
+  private CarbonColumn[] projection;
+  private boolean[] isProjectionRequired;
+  private int[] projectionMap;
+  private Object[] outputValues;
+  private InternalRow outputRow;
+
+  // empty project, null filter
+  private boolean skipScanData;
+
+  // return raw row for handoff
+  private boolean useRawRow = false;
+
+  // InputMetricsStats
+  private InputMetricsStats inputMetricsStats;
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(CarbonStreamRecordReader.class.getName());
+
+  public CarbonStreamRecordReader(boolean isVectorReader, InputMetricsStats inputMetricsStats,
+      QueryModel mdl, boolean useRawRow) {
+    this.isVectorReader = isVectorReader;
+    this.inputMetricsStats = inputMetricsStats;
+    this.model = mdl;
+    this.useRawRow = useRawRow;
+
+  }
+  @Override public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    // input
+    if (split instanceof CarbonInputSplit) {
+      fileSplit = (CarbonInputSplit) split;
+    } else if (split instanceof CarbonMultiBlockSplit) {
+      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+    } else {
+      fileSplit = (FileSplit) split;
+    }
+
+    // metadata
+    hadoopConf = context.getConfiguration();
+    if (model == null) {
+      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+      model = format.createQueryModel(split, context);
+    }
+    carbonTable = model.getTable();
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
+    dimensionCount = dimensions.size();
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
+    measureCount = measures.size();
+    List<CarbonColumn> carbonColumnList =
+        carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
+    storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
+    isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
+    directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(storageColumns[i].getDataType());
+      }
+    }
+    measureDataTypes = new DataType[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
+    }
+
+    // decode data
+    allNonNull = new BitSet(storageColumns.length);
+    projection = model.getProjectionColumns();
+
+    isRequired = new boolean[storageColumns.length];
+    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
+    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+    isFilterRequired = new boolean[storageColumns.length];
+    filterMap = new int[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].isDimension()) {
+        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = storageColumns[i].getOrdinal();
+        }
+      } else {
+        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
+        }
+      }
+    }
+
+    isProjectionRequired = new boolean[storageColumns.length];
+    projectionMap = new int[storageColumns.length];
+    for (int j = 0; j < projection.length; j++) {
+      for (int i = 0; i < storageColumns.length; i++) {
+        if (storageColumns[i].getColName().equals(projection[j].getColName())) {
+          isRequired[i] = true;
+          isProjectionRequired[i] = true;
+          projectionMap[i] = j;
+          break;
+        }
+      }
+    }
+
+    // initialize filter
+    if (null != model.getFilterExpressionResolverTree()) {
+      initializeFilter();
+    } else if (projection.length == 0) {
+      skipScanData = true;
+    }
+
+  }
+
+  private void initializeFilter() {
+
+    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+    for (int i = 0; i < dimLensWithComplex.length; i++) {
+      dimLensWithComplex[i] = Integer.MAX_VALUE;
+    }
+
+    int[] dictionaryColumnCardinality =
+        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+    SegmentProperties segmentProperties =
+        new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
+    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
+
+    FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+    filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
+        complexDimensionInfoMap);
+    // for row filter, we need update column index
+    FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+        carbonTable.getDimensionOrdinalMax());
+
+  }
+
+  private byte[] getSyncMarker(String filePath) throws IOException {
+    CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
+    FileHeader header = headerReader.readHeader();
+    return header.getSync_marker();
+  }
+
+  private void initializeAtFirstRow() throws IOException {
+    filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
+    filterRow = new RowImpl();
+    filterRow.setValues(filterValues);
+
+    outputValues = new Object[projection.length];
+    outputRow = new GenericInternalRow(outputValues);
+
+    Path file = fileSplit.getPath();
+
+    byte[] syncMarker = getSyncMarker(file.toString());
+
+    FileSystem fs = file.getFileSystem(hadoopConf);
+
+    int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
+        CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
+
+    FSDataInputStream fileIn = fs.open(file, bufferSize);
+    fileIn.seek(fileSplit.getStart());
+    input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
+        fileSplit.getStart() == 0);
+
+    cacheProvider = CacheProvider.getInstance();
+    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
+    queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
+
+    outputSchema = new StructType((StructField[])
+        DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection));
+  }
+
+  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (isFirstRow) {
+      isFirstRow = false;
+      initializeAtFirstRow();
+    }
+    if (isFinished) {
+      return false;
+    }
+
+    if (isVectorReader) {
+      return nextColumnarBatch();
+    }
+
+    return nextRow();
+  }
+
+  /**
+   * for vector reader, check next columnar batch
+   */
+  private boolean nextColumnarBatch() throws IOException {
+    boolean hasNext;
+    boolean scanMore = false;
+    do {
+      // move to the next blocklet
+      hasNext = input.nextBlocklet();
+      if (hasNext) {
+        // read blocklet header
+        BlockletHeader header = input.readBlockletHeader();
+        if (isScanRequired(header)) {
+          scanMore = !scanBlockletAndFillVector(header);
+        } else {
+          input.skipBlockletData(true);
+          scanMore = true;
+        }
+      } else {
+        isFinished = true;
+        scanMore = false;
+      }
+    } while (scanMore);
+    return hasNext;
+  }
+
+  /**
+   * check next Row
+   */
+  private boolean nextRow() throws IOException {
+    // read row one by one
+    try {
+      boolean hasNext;
+      boolean scanMore = false;
+      do {
+        hasNext = input.hasNext();
+        if (hasNext) {
+          if (skipScanData) {
+            input.nextRow();
+            scanMore = false;
+          } else {
+            if (useRawRow) {
+              // read raw row for streaming handoff which does not require decode raw row
+              readRawRowFromStream();
+            } else {
+              readRowFromStream();
+            }
+            if (null != filter) {
+              scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
+            } else {
+              scanMore = false;
+            }
+          }
+        } else {
+          if (input.nextBlocklet()) {
+            BlockletHeader header = input.readBlockletHeader();
+            if (isScanRequired(header)) {
+              if (skipScanData) {
+                input.skipBlockletData(false);
+              } else {
+                input.readBlockletData(header);
+              }
+            } else {
+              input.skipBlockletData(true);
+            }
+            scanMore = true;
+          } else {
+            isFinished = true;
+            scanMore = false;
+          }
+        }
+      } while (scanMore);
+      return hasNext;
+    } catch (FilterUnsupportedException e) {
+      throw new IOException("Failed to filter row in detail reader", e);
+    }
+  }
+
+  @Override public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override public Object getCurrentValue() throws IOException, InterruptedException {
+    if (isVectorReader) {
+      Method method = null;
+      try {
+        method = vectorProxy.getClass().getMethod("numRows");
+        int value = (int) method.invoke(vectorProxy);
+        if (inputMetricsStats != null) {
+          inputMetricsStats.incrementRecordRead((long) value);
+        }
+        method = vectorProxy.getClass().getMethod("getColumnarBatch");
+        return method.invoke(vectorProxy);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    if (inputMetricsStats != null) {
+      inputMetricsStats.incrementRecordRead(1L);
+    }
+
+    return outputRow;
+  }
+
+  private boolean isScanRequired(BlockletHeader header) {
+    if (filter != null && header.getBlocklet_index() != null) {
+      BlockletMinMaxIndex minMaxIndex = CarbonMetadataUtil.convertExternalMinMaxIndex(
+          header.getBlocklet_index().getMin_max_index());
+      if (minMaxIndex != null) {
+        BitSet bitSet =
+            filter.isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues());
+        if (bitSet.isEmpty()) {
+          return false;
+        } else {
+          return true;
+        }
+      }
+    }
+    return true;
+  }
+
+  private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
+    Constructor cons = null;
+    // if filter is null and output projection is empty, use the row number of blocklet header
+    int rowNum = 0;
+    String methodName = "setNumRows";
+    try {
+      String vectorReaderClassName = "org.apache.spark.sql.CarbonVectorProxy";
+      cons = CarbonStreamUtils.getConstructorWithReflection(vectorReaderClassName, MemoryMode.class,
+              StructType.class, int.class);
+      if (skipScanData) {
+
+        int rowNums = header.getBlocklet_info().getNum_rows();
+        vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP, outputSchema, rowNums);
+        Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class);
+        setNumRowsMethod.invoke(vectorProxy, rowNums);
+        input.skipBlockletData(true);
+        return rowNums > 0;
+      }
+      input.readBlockletData(header);
+      vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP,outputSchema, input.getRowNums());
+      if (null == filter) {
+        while (input.hasNext()) {
+          readRowFromStream();
+          putRowToColumnBatch(rowNum++);
+        }
+      } else {
+        try {
+          while (input.hasNext()) {
+            readRowFromStream();
+            if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
+              putRowToColumnBatch(rowNum++);
+            }
+          }
+        } catch (FilterUnsupportedException e) {
+          throw new IOException("Failed to filter row in vector reader", e);
+        }
+      }
+      Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class);
+      setNumRowsMethod.invoke(vectorProxy, rowNum);
+    } catch (Exception e) {
+      throw new IOException("Failed to fill row in  vector reader", e);
+    }
+    return rowNum > 0;
+  }
+
+  private void readRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          if (isRequired[colCount]) {
+            byte[] b = input.readBytes(v);
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = b;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
+                      storageColumns[colCount].getDataType());
+            }
+          } else {
+            input.skipBytes(v);
+          }
+        } else if (null != directDictionaryGenerators[colCount]) {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = input.readInt();
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        short v = input.readShort();
+        if (isRequired[colCount]) {
+          byte[] b = input.readBytes(v);
+          if (isFilterRequired[colCount]) {
+            filterValues[filterMap[colCount]] = b;
+          }
+          if (isProjectionRequired[colCount]) {
+            outputValues[projectionMap[colCount]] = queryTypes[colCount]
+                .getDataBasedOnDataType(ByteBuffer.wrap(b));
+          }
+        } else {
+          input.skipBytes(v);
+        }
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          if (isRequired[colCount]) {
+            boolean v = input.readBoolean();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(1);
+          }
+        } else if (dataType == DataTypes.SHORT) {
+          if (isRequired[colCount]) {
+            short v = input.readShort();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(2);
+          }
+        } else if (dataType == DataTypes.INT) {
+          if (isRequired[colCount]) {
+            int v = input.readInt();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else if (dataType == DataTypes.LONG) {
+          if (isRequired[colCount]) {
+            long v = input.readLong();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (dataType == DataTypes.DOUBLE) {
+          if (isRequired[colCount]) {
+            double v = input.readDouble();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (DataTypes.isDecimal(dataType)) {
+          int len = input.readShort();
+          if (isRequired[colCount]) {
+            BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v);
+            }
+          } else {
+            input.skipBytes(len);
+          }
+        }
+      }
+    }
+  }
+
+  private void readRawRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          outputValues[colCount] = input.readBytes(v);
+        } else {
+          outputValues[colCount] = input.readInt();
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        short v = input.readShort();
+        outputValues[colCount] = input.readBytes(v);
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          outputValues[colCount] = input.readBoolean();
+        } else if (dataType == DataTypes.SHORT) {
+          outputValues[colCount] = input.readShort();
+        } else if (dataType == DataTypes.INT) {
+          outputValues[colCount] = input.readInt();
+        } else if (dataType == DataTypes.LONG) {
+          outputValues[colCount] = input.readLong();
+        } else if (dataType == DataTypes.DOUBLE) {
+          outputValues[colCount] = input.readDouble();
+        } else if (DataTypes.isDecimal(dataType)) {
+          int len = input.readShort();
+          outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+        }
+      }
+    }
+  }
+
+  private void putRowToColumnBatch(int rowId) {
+    Class<?>[] paramTypes = {int.class, Object.class, int.class};
+    Method putRowToColumnBatch = null;
+    try {
+      putRowToColumnBatch = vectorProxy.getClass().getMethod("putRowToColumnBatch", paramTypes);
+
+    } catch (Exception e) {
+      LOGGER.error(
+              "Unable to put the row in the vector" + "rowid: " + rowId + e);
+    }
+    for (int i = 0; i < projection.length; i++) {
+      Object value = outputValues[i];
+      try {
+        putRowToColumnBatch.invoke(vectorProxy, rowId, value, i);
+      } catch (Exception e) {
+        LOGGER.error(
+                "Unable to put the row in the vector" + "rowid: " + rowId + e);
+      }
+    }
+  }
+
+  @Override public float getProgress() throws IOException, InterruptedException {
+    return 0;
+  }
+
+  @Override public void close() throws IOException {
+    if (null != input) {
+      input.close();
+    }
+    if (null != vectorProxy) {
+      try {
+        Method closeMethod = vectorProxy.getClass().getMethod("close");
+        closeMethod.invoke(vectorProxy);
+      } catch (Exception e) {
+        LOGGER.error(
+                "Unable to close the stream vector reader" + e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
new file mode 100644
index 0000000..7fa01e9
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import java.math.BigInteger;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Adapter class which handles the columnar vector reading of the carbondata
+ * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+ * handles the complexity of spark 2.1 version related api changes since
+ * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+ */
+public class CarbonVectorProxy {
+
+    private ColumnarBatch columnarBatch;
+
+    /**
+     * Adapter class which handles the columnar vector reading of the carbondata
+     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+     * handles the complexity of spark 2.3 version related api changes since
+     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+     *
+     * @param memMode       which represent the type onheap or offheap vector.
+     * @param rowNum        rows number for vector reading
+     * @param structFileds, metadata related to current schema of table.
+     */
+    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
+        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
+    }
+
+    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
+        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
+    }
+
+    /**
+     * Sets the number of rows in this batch.
+     */
+    public void setNumRows(int numRows) {
+        columnarBatch.setNumRows(numRows);
+    }
+
+    /**
+     * Returns the number of rows for read, including filtered rows.
+     */
+    public int numRows() {
+        return columnarBatch.capacity();
+    }
+
+    /**
+     * Called to close all the columns in this batch. It is not valid to access the data after
+     * calling this. This must be called at the end to clean up memory allocations.
+     */
+    public void close() {
+        columnarBatch.close();
+    }
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public InternalRow getRow(int rowId) {
+        return columnarBatch.getRow(rowId);
+    }
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public Object getColumnarBatch() {
+        return columnarBatch;
+    }
+
+    public void resetDictionaryIds(int ordinal) {
+        columnarBatch.column(ordinal).getDictionaryIds().reset();
+    }
+
+    /**
+     * Resets this column for writing. The currently stored values are no longer accessible.
+     */
+    public void reset() {
+        columnarBatch.reset();
+    }
+
+    public void putRowToColumnBatch(int rowId, Object value, int offset) {
+        org.apache.spark.sql.types.DataType t = dataType(offset);
+        if (null == value) {
+            putNull(rowId, offset);
+        } else {
+            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+                putBoolean(rowId, (boolean) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+                putByte(rowId, (byte) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+                putShort(rowId, (short) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+                putInt(rowId, (int) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+                putLong(rowId, (long) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+                putFloat(rowId, (float) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+                putDouble(rowId, (double) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+                UTF8String v = (UTF8String) value;
+                putByteArray(rowId, v.getBytes(), offset);
+            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+                DecimalType dt = (DecimalType) t;
+                Decimal d = Decimal.fromDecimal(value);
+                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+                    putInt(rowId, (int) d.toUnscaledLong(), offset);
+                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+                    putLong(rowId, d.toUnscaledLong(), offset);
+                } else {
+                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+                    byte[] bytes = integer.toByteArray();
+                    putByteArray(rowId, bytes, 0, bytes.length, offset);
+                }
+            } else if (t instanceof CalendarIntervalType) {
+                CalendarInterval c = (CalendarInterval) value;
+                columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
+                columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
+            } else if (t instanceof org.apache.spark.sql.types.DateType) {
+                putInt(rowId, (int) value, offset);
+            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+                putLong(rowId, (long) value, offset);
+            }
+        }
+    }
+
+    public void putBoolean(int rowId, boolean value, int ordinal) {
+        columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
+    }
+
+    public void putByte(int rowId, byte value, int ordinal) {
+        columnarBatch.column(ordinal).putByte(rowId, (byte) value);
+    }
+
+    public void putShort(int rowId, short value, int ordinal) {
+        columnarBatch.column(ordinal).putShort(rowId, (short) value);
+    }
+
+    public void putInt(int rowId, int value, int ordinal) {
+        columnarBatch.column(ordinal).putInt(rowId, (int) value);
+    }
+
+    public void putFloat(int rowId, float value, int ordinal) {
+        columnarBatch.column(ordinal).putFloat(rowId, (float) value);
+    }
+
+    public void putLong(int rowId, long value, int ordinal) {
+        columnarBatch.column(ordinal).putLong(rowId, (long) value);
+    }
+
+    public void putDouble(int rowId, double value, int ordinal) {
+        columnarBatch.column(ordinal).putDouble(rowId, (double) value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int ordinal) {
+        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
+    }
+
+    public void putInts(int rowId, int count, int value, int ordinal) {
+        columnarBatch.column(ordinal).putInts(rowId, count, value);
+    }
+
+    public void putShorts(int rowId, int count, short value, int ordinal) {
+        columnarBatch.column(ordinal).putShorts(rowId, count, value);
+    }
+
+    public void putLongs(int rowId, int count, long value, int ordinal) {
+        columnarBatch.column(ordinal).putLongs(rowId, count, value);
+    }
+
+    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
+        columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
+
+    }
+
+    public void putDoubles(int rowId, int count, double value, int ordinal) {
+        columnarBatch.column(ordinal).putDoubles(rowId, count, value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
+        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
+    }
+
+    public void putNull(int rowId, int ordinal) {
+        columnarBatch.column(ordinal).putNull(rowId);
+    }
+
+    public void putNulls(int rowId, int count, int ordinal) {
+        columnarBatch.column(ordinal).putNulls(rowId, count);
+    }
+
+    public boolean isNullAt(int rowId, int ordinal) {
+        return columnarBatch.column(ordinal).isNullAt(rowId);
+    }
+
+    public DataType dataType(int ordinal) {
+        return columnarBatch.column(ordinal).dataType();
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
new file mode 100644
index 0000000..944b32e
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import java.math.BigInteger;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Adapter class which handles the columnar vector reading of the carbondata
+ * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+ * handles the complexity of spark 2.2 version related api changes since
+ * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+ */
+public class CarbonVectorProxy {
+
+    private ColumnarBatch columnarBatch;
+
+    /**
+     * Adapter class which handles the columnar vector reading of the carbondata
+     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+     * handles the complexity of spark 2.3 version related api changes since
+     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+     *
+     * @param memMode       which represent the type onheap or offheap vector.
+     * @param rowNum        rows number for vector reading
+     * @param structFileds, metadata related to current schema of table.
+     */
+    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
+        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
+    }
+
+    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
+        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
+    }
+
+    /**
+     * Sets the number of rows in this batch.
+     */
+    public void setNumRows(int numRows) {
+        columnarBatch.setNumRows(numRows);
+    }
+
+    /**
+     * Returns the number of rows for read, including filtered rows.
+     */
+    public int numRows() {
+        return columnarBatch.capacity();
+    }
+
+    /**
+     * Called to close all the columns in this batch. It is not valid to access the data after
+     * calling this. This must be called at the end to clean up memory allocations.
+     */
+    public void close() {
+        columnarBatch.close();
+    }
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public InternalRow getRow(int rowId) {
+        return columnarBatch.getRow(rowId);
+    }
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public Object getColumnarBatch() {
+        return columnarBatch;
+    }
+
+    public Object reserveDictionaryIds(int capacity , int dummyOrdinal) {
+        return columnarBatch.column(ordinal).reserveDictionaryIds(capacity);
+    }
+
+    public void resetDictionaryIds(int ordinal) {
+        columnarBatch.column(ordinal).getDictionaryIds().reset();
+    }
+
+    /**
+     * Resets this column for writing. The currently stored values are no longer accessible.
+     */
+    public void reset() {
+        columnarBatch.reset();
+    }
+
+    public void putRowToColumnBatch(int rowId, Object value, int offset) {
+        org.apache.spark.sql.types.DataType t = dataType(offset);
+        if (null == value) {
+            putNull(rowId, offset);
+        } else {
+            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+                putBoolean(rowId, (boolean) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+                putByte(rowId, (byte) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+                putShort(rowId, (short) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+                putInt(rowId, (int) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+                putLong(rowId, (long) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+                putFloat(rowId, (float) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+                putDouble(rowId, (double) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+                UTF8String v = (UTF8String) value;
+                putByteArray(rowId, v.getBytes(), offset);
+            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+                DecimalType dt = (DecimalType) t;
+                Decimal d = Decimal.fromDecimal(value);
+                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+                    putInt(rowId, (int) d.toUnscaledLong(), offset);
+                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+                    putLong(rowId, d.toUnscaledLong(), offset);
+                } else {
+                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+                    byte[] bytes = integer.toByteArray();
+                    putByteArray(rowId, bytes, 0, bytes.length, offset);
+                }
+            } else if (t instanceof CalendarIntervalType) {
+                CalendarInterval c = (CalendarInterval) value;
+                columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
+                columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
+            } else if (t instanceof org.apache.spark.sql.types.DateType) {
+                putInt(rowId, (int) value, offset);
+            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+                putLong(rowId, (long) value, offset);
+            }
+        }
+    }
+
+    public void putBoolean(int rowId, boolean value, int ordinal) {
+        columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
+    }
+
+    public void putByte(int rowId, byte value, int ordinal) {
+        columnarBatch.column(ordinal).putByte(rowId, (byte) value);
+    }
+
+    public void putShort(int rowId, short value, int ordinal) {
+        columnarBatch.column(ordinal).putShort(rowId, (short) value);
+    }
+
+    public void putInt(int rowId, int value, int ordinal) {
+        columnarBatch.column(ordinal).putInt(rowId, (int) value);
+    }
+
+    public void putFloat(int rowId, float value, int ordinal) {
+        columnarBatch.column(ordinal).putFloat(rowId, (float) value);
+    }
+
+    public void putLong(int rowId, long value, int ordinal) {
+        columnarBatch.column(ordinal).putLong(rowId, (long) value);
+    }
+
+    public void putDouble(int rowId, double value, int ordinal) {
+        columnarBatch.column(ordinal).putDouble(rowId, (double) value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int ordinal) {
+        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
+    }
+
+    public void putInts(int rowId, int count, int value, int ordinal) {
+        columnarBatch.column(ordinal).putInts(rowId, count, value);
+    }
+
+    public void putShorts(int rowId, int count, short value, int ordinal) {
+        columnarBatch.column(ordinal).putShorts(rowId, count, value);
+    }
+
+    public void putLongs(int rowId, int count, long value, int ordinal) {
+        columnarBatch.column(ordinal).putLongs(rowId, count, value);
+    }
+
+    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
+        columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
+    }
+
+    public void putDoubles(int rowId, int count, double value, int ordinal) {
+        columnarBatch.column(ordinal).putDoubles(rowId, count, value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
+        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
+    }
+
+    public void putNull(int rowId, int ordinal) {
+        columnarBatch.column(ordinal).putNull(rowId);
+    }
+
+    public void putNulls(int rowId, int count, int ordinal) {
+        columnarBatch.column(ordinal).putNulls(rowId, count);
+    }
+
+    public boolean isNullAt(int rowId, int ordinal) {
+        return columnarBatch.column(ordinal).isNullAt(rowId);
+    }
+
+    public DataType dataType(int ordinal) {
+        return columnarBatch.column(ordinal).dataType();
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
new file mode 100644
index 0000000..783a528
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import java.math.BigInteger;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.ColumnVectorFactory;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Adapter class which handles the columnar vector reading of the carbondata
+ * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+ * handles the complexity of spark 2.3 version related api changes since
+ * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+ */
+public class CarbonVectorProxy {
+
+    private ColumnarBatch columnarBatch;
+    private WritableColumnVector[] writableColumnVectors;
+
+    /**
+     * Adapter class which handles the columnar vector reading of the carbondata
+     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+     * handles the complexity of spark 2.3 version related api changes since
+     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+     *
+     * @param memMode       which represent the type onheap or offheap vector.
+     * @param rowNum        rows number for vector reading
+     * @param structFileds, metadata related to current schema of table.
+     */
+    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
+        writableColumnVectors = ColumnVectorFactory
+                .getColumnVector(memMode, new StructType(structFileds), rowNum);
+        columnarBatch = new ColumnarBatch(writableColumnVectors);
+        columnarBatch.setNumRows(rowNum);
+    }
+
+    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
+        writableColumnVectors = ColumnVectorFactory
+                .getColumnVector(memMode, outputSchema, rowNum);
+        columnarBatch = new ColumnarBatch(writableColumnVectors);
+        columnarBatch.setNumRows(rowNum);
+    }
+
+    /**
+     * Returns the number of rows for read, including filtered rows.
+     */
+    public int numRows() {
+        return columnarBatch.numRows();
+    }
+
+    public Object reserveDictionaryIds(int capacity, int ordinal) {
+        return writableColumnVectors[ordinal].reserveDictionaryIds(capacity);
+    }
+
+    /**
+     * This API will return a columnvector from a batch of column vector rows
+     * based on the ordinal
+     *
+     * @param ordinal
+     * @return
+     */
+    public ColumnVector column(int ordinal) {
+        return columnarBatch.column(ordinal);
+    }
+
+    /**
+     * Resets this column for writing. The currently stored values are no longer accessible.
+     */
+    public void reset() {
+        for (WritableColumnVector col : writableColumnVectors) {
+            col.reset();
+        }
+    }
+
+    public void resetDictionaryIds(int ordinal) {
+        writableColumnVectors[ordinal].getDictionaryIds().reset();
+    }
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public InternalRow getRow(int rowId) {
+        return columnarBatch.getRow(rowId);
+    }
+
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public Object getColumnarBatch() {
+        return columnarBatch;
+    }
+
+    /**
+     * Called to close all the columns in this batch. It is not valid to access the data after
+     * calling this. This must be called at the end to clean up memory allocations.
+     */
+    public void close() {
+        columnarBatch.close();
+    }
+
+    /**
+     * Sets the number of rows in this batch.
+     */
+    public void setNumRows(int numRows) {
+        columnarBatch.setNumRows(numRows);
+    }
+
+    /**
+     * This method will add the row to the corresponding column vector object
+     *
+     * @param rowId
+     * @param value
+     */
+    public void putRowToColumnBatch(int rowId, Object value, int offset) {
+            org.apache.spark.sql.types.DataType t = dataType(offset);
+            if (null == value) {
+                putNull(rowId, offset);
+            } else {
+                if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+                    putBoolean(rowId, (boolean) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+                    putByte(rowId, (byte) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+                    putShort(rowId, (short) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+                    putInt(rowId, (int) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+                    putLong(rowId, (long) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+                    putFloat(rowId, (float) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+                    putDouble(rowId, (double) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+                    UTF8String v = (UTF8String) value;
+                    putByteArray(rowId, v.getBytes(), offset);
+                } else if (t instanceof DecimalType) {
+                    DecimalType dt = (DecimalType) t;
+                    Decimal d = Decimal.fromDecimal(value);
+                    if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+                        putInt(rowId, (int) d.toUnscaledLong(), offset);
+                    } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+                        putLong(rowId, d.toUnscaledLong(), offset);
+                    } else {
+                        final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+                        byte[] bytes = integer.toByteArray();
+                        putByteArray(rowId, bytes, 0, bytes.length, offset);
+                    }
+                } else if (t instanceof CalendarIntervalType) {
+                    CalendarInterval c = (CalendarInterval) value;
+                    writableColumnVectors[offset].getChild(0).putInt(rowId, c.months);
+                    writableColumnVectors[offset].getChild(1).putLong(rowId, c.microseconds);
+                } else if (t instanceof org.apache.spark.sql.types.DateType) {
+                    putInt(rowId, (int) value, offset);
+                } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+                    putLong(rowId, (long) value, offset);
+                }
+            }
+    }
+
+    public void putBoolean(int rowId, boolean value, int ordinal) {
+        writableColumnVectors[ordinal].putBoolean(rowId, (boolean) value);
+    }
+
+    public void putByte(int rowId, byte value, int ordinal) {
+        writableColumnVectors[ordinal].putByte(rowId, (byte) value);
+    }
+
+    public void putShort(int rowId, short value, int ordinal) {
+        writableColumnVectors[ordinal].putShort(rowId, (short) value);
+    }
+
+    public void putInt(int rowId, int value, int ordinal) {
+        writableColumnVectors[ordinal].putInt(rowId, (int) value);
+    }
+
+    public void putFloat(int rowId, float value, int ordinal) {
+        writableColumnVectors[ordinal].putFloat(rowId, (float) value);
+    }
+
+    public void putLong(int rowId, long value, int ordinal) {
+        writableColumnVectors[ordinal].putLong(rowId, (long) value);
+    }
+
+    public void putDouble(int rowId, double value, int ordinal) {
+        writableColumnVectors[ordinal].putDouble(rowId, (double) value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int ordinal) {
+        writableColumnVectors[ordinal].putByteArray(rowId, (byte[]) value);
+    }
+
+    public void putInts(int rowId, int count, int value, int ordinal) {
+        writableColumnVectors[ordinal].putInts(rowId, count, value);
+    }
+
+    public void putShorts(int rowId, int count, short value, int ordinal) {
+        writableColumnVectors[ordinal].putShorts(rowId, count, value);
+    }
+
+    public void putLongs(int rowId, int count, long value, int ordinal) {
+        writableColumnVectors[ordinal].putLongs(rowId, count, value);
+    }
+
+    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
+        writableColumnVectors[ordinal].putDecimal(rowId, value, precision);
+
+    }
+
+    public void putDoubles(int rowId, int count, double value, int ordinal) {
+        writableColumnVectors[ordinal].putDoubles(rowId, count, value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
+        writableColumnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length);
+    }
+
+    public void putNull(int rowId, int ordinal) {
+        writableColumnVectors[ordinal].putNull(rowId);
+    }
+
+    public void putNulls(int rowId, int count, int ordinal) {
+        writableColumnVectors[ordinal].putNulls(rowId, count);
+    }
+
+    public boolean isNullAt(int rowId, int ordinal) {
+        return writableColumnVectors[ordinal].isNullAt(rowId);
+    }
+
+    public DataType dataType(int ordinal) {
+        return writableColumnVectors[ordinal].dataType();
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
new file mode 100644
index 0000000..6fe5ede
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class ColumnVectorFactory {
+
+
+    public static WritableColumnVector[] getColumnVector(MemoryMode memMode, StructType outputSchema, int rowNums) {
+
+
+        WritableColumnVector[] writableColumnVectors = null;
+        switch (memMode) {
+            case ON_HEAP:
+                writableColumnVectors = OnHeapColumnVector
+                        .allocateColumns(rowNums, outputSchema);
+                break;
+            case OFF_HEAP:
+                writableColumnVectors = OffHeapColumnVector
+                        .allocateColumns(rowNums, outputSchema);
+                break;
+        }
+        return writableColumnVectors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcef656d/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
index 644ac4c..c0e270c 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.streaming;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
@@ -33,7 +34,10 @@ import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
 import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
 import org.apache.carbondata.core.scan.complextypes.StructQueryType;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+import org.apache.carbondata.streaming.CarbonStreamUtils;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -47,10 +51,46 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
 
   public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
   public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+  public static final String STREAM_RECORD_READER_INSTANCE =
+      "org.apache.carbondata.stream.CarbonStreamRecordReader";
+  // return raw row for handoff
+  private boolean useRawRow = false;
 
-  @Override public RecordReader<Void, Object> createRecordReader(InputSplit split,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    return new CarbonStreamRecordReader();
+  public void setUseRawRow(boolean useRawRow) {
+    this.useRawRow = useRawRow;
+  }
+
+  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
+    this.inputMetricsStats = inputMetricsStats;
+  }
+
+  public void setVectorReader(boolean vectorReader) {
+    isVectorReader = vectorReader;
+  }
+
+  public void setModel(QueryModel model) {
+    this.model = model;
+  }
+
+  // InputMetricsStats
+  private InputMetricsStats inputMetricsStats;
+  // vector reader
+  private boolean isVectorReader;
+  private QueryModel model;
+
+  @Override
+  public RecordReader<Void, Object> createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    try {
+      Constructor cons = CarbonStreamUtils
+          .getConstructorWithReflection(STREAM_RECORD_READER_INSTANCE, boolean.class,
+              InputMetricsStats.class, QueryModel.class, boolean.class);
+      return (RecordReader) CarbonStreamUtils
+          .getInstanceWithReflection(cons, isVectorReader, inputMetricsStats, model, useRawRow);
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
   }
 
   public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,


Mime
View raw message