carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/2] carbondata git commit: Revert "[CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, ColumnVector Interface"
Date Wed, 18 Jul 2018 01:22:38 GMT
Revert "[CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, ColumnVector Interface"

This reverts commit 2b8ae2628d50efcd095696b5bf614eab2fcdb8d2.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/96fe233a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/96fe233a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/96fe233a

Branch: refs/heads/carbonstore
Commit: 96fe233a20c5e8df2584a79671c3257f119f9414
Parents: 0aab4e7
Author: Jacky Li <jacky.likun@qq.com>
Authored: Wed Jul 18 09:16:27 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Wed Jul 18 09:16:27 2018 +0800

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  12 +-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |  15 +-
 .../vectorreader/ColumnarVectorWrapper.java     |  60 +-
 .../VectorizedCarbonRecordReader.java           |  34 +-
 .../stream/CarbonStreamRecordReader.java        | 747 ------------------
 .../org/apache/spark/sql/CarbonVectorProxy.java | 222 ------
 .../org/apache/spark/sql/CarbonVectorProxy.java | 221 ------
 .../org/apache/spark/sql/CarbonVectorProxy.java | 247 ------
 .../apache/spark/sql/ColumnVectorFactory.java   |  45 --
 .../streaming/CarbonStreamInputFormat.java      |  46 +-
 .../streaming/CarbonStreamRecordReader.java     | 758 +++++++++++++++++++
 .../carbondata/streaming/CarbonStreamUtils.java |  40 -
 .../streaming/StreamBlockletReader.java         |  39 +-
 13 files changed, 838 insertions(+), 1648 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index afd3af2..149f711 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -60,7 +60,7 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.format.{CsvReadSupport, VectorCsvReadSupport}
 import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
-import org.apache.carbondata.streaming.CarbonStreamInputFormat
+import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
 
 /**
  * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
@@ -431,13 +431,13 @@ class CarbonScanRDD[T: ClassTag](
           // create record reader for row format
           DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
           val inputFormat = new CarbonStreamInputFormat
-          inputFormat.setVectorReader(vectorReader)
-          inputFormat.setInputMetricsStats(inputMetricsStats)
+          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
+            .asInstanceOf[CarbonStreamRecordReader]
+          streamReader.setVectorReader(vectorReader)
+          streamReader.setInputMetricsStats(inputMetricsStats)
           model.setStatisticsRecorder(
             CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
-          inputFormat.setModel(model)
-          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
-            .asInstanceOf[RecordReader[Void, Object]]
+          streamReader.setQueryModel(model)
           streamReader
         case FileFormat.EXTERNAL =>
           require(storageFormat.equals("csv"),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 0e8f660..1f3decc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -22,7 +22,7 @@ import java.util
 import java.util.{Date, UUID}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{Job, RecordReader, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
 import org.apache.spark.sql.SparkSession
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
@@ -49,7 +48,7 @@ import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, C
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
 import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl}
-import org.apache.carbondata.streaming.CarbonStreamInputFormat
+import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
 
 
 /**
@@ -75,7 +74,7 @@ class HandoffPartition(
  * and we can extract it later
  */
 class StreamingRawResultIterator(
-    recordReader: RecordReader[Void, Any]
+    recordReader: CarbonStreamRecordReader
 ) extends RawResultIterator(null, null, null, true) {
 
   override def hasNext: Boolean = {
@@ -164,10 +163,10 @@ class StreamHandoffRDD[K, V](
     val model = format.createQueryModel(inputSplit, attemptContext)
     val inputFormat = new CarbonStreamInputFormat
     val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
-      .asInstanceOf[RecordReader[Void, Any]]
-    inputFormat.setVectorReader(false)
-    inputFormat.setModel(model)
-    inputFormat.setUseRawRow(true)
+      .asInstanceOf[CarbonStreamRecordReader]
+    streamReader.setVectorReader(false)
+    streamReader.setQueryModel(model)
+    streamReader.setUseRawRow(true)
     streamReader.initialize(inputSplit, attemptContext)
     val iteratorList = new util.ArrayList[RawResultIterator](1)
     iteratorList.add(new StreamingRawResultIterator(streamReader))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 09df68c..9e0c102 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -23,45 +23,41 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.spark.util.CarbonScalaUtil;
 
-import org.apache.spark.sql.CarbonVectorProxy;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
 import org.apache.spark.sql.types.Decimal;
 
 class ColumnarVectorWrapper implements CarbonColumnVector {
 
-  private CarbonVectorProxy writableColumnVector;
+  private ColumnVector columnVector;
 
   private boolean[] filteredRows;
 
   private int counter;
 
-  private int ordinal;
-
   private boolean filteredRowsExist;
 
   private DataType blockDataType;
 
-  ColumnarVectorWrapper(CarbonVectorProxy writableColumnVector,
-                        boolean[] filteredRows, int ordinal) {
-    this.writableColumnVector = writableColumnVector;
+  ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
+    this.columnVector = columnVector;
     this.filteredRows = filteredRows;
-    this.ordinal = ordinal;
   }
 
   @Override public void putBoolean(int rowId, boolean value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putBoolean(counter++, value, ordinal);
+      columnVector.putBoolean(counter++, value);
     }
   }
 
   @Override public void putFloat(int rowId, float value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putFloat(counter++, value,ordinal);
+      columnVector.putFloat(counter++, value);
     }
   }
 
   @Override public void putShort(int rowId, short value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putShort(counter++, value, ordinal);
+      columnVector.putShort(counter++, value);
     }
   }
 
@@ -69,18 +65,18 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putShort(counter++, value, ordinal);
+          columnVector.putShort(counter++, value);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putShorts(rowId, count, value, ordinal);
+      columnVector.putShorts(rowId, count, value);
     }
   }
 
   @Override public void putInt(int rowId, int value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putInt(counter++, value, ordinal);
+      columnVector.putInt(counter++, value);
     }
   }
 
@@ -88,18 +84,18 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putInt(counter++, value, ordinal);
+          columnVector.putInt(counter++, value);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putInts(rowId, count, value, ordinal);
+      columnVector.putInts(rowId, count, value);
     }
   }
 
   @Override public void putLong(int rowId, long value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putLong(counter++, value, ordinal);
+      columnVector.putLong(counter++, value);
     }
   }
 
@@ -107,19 +103,19 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putLong(counter++, value, ordinal);
+          columnVector.putLong(counter++, value);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putLongs(rowId, count, value, ordinal);
+      columnVector.putLongs(rowId, count, value);
     }
   }
 
   @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
     if (!filteredRows[rowId]) {
       Decimal toDecimal = Decimal.apply(value);
-      writableColumnVector.putDecimal(counter++, toDecimal, precision, ordinal);
+      columnVector.putDecimal(counter++, toDecimal, precision);
     }
   }
 
@@ -127,7 +123,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     Decimal decimal = Decimal.apply(value);
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
-        writableColumnVector.putDecimal(counter++, decimal, precision, ordinal);
+        columnVector.putDecimal(counter++, decimal, precision);
       }
       rowId++;
     }
@@ -135,7 +131,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   @Override public void putDouble(int rowId, double value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putDouble(counter++, value, ordinal);
+      columnVector.putDouble(counter++, value);
     }
   }
 
@@ -143,25 +139,25 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putDouble(counter++, value, ordinal);
+          columnVector.putDouble(counter++, value);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putDoubles(rowId, count, value, ordinal);
+      columnVector.putDoubles(rowId, count, value);
     }
   }
 
   @Override public void putBytes(int rowId, byte[] value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putByteArray(counter++, value, ordinal);
+      columnVector.putByteArray(counter++, value);
     }
   }
 
   @Override public void putBytes(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
-        writableColumnVector.putByteArray(counter++, value, ordinal);
+        columnVector.putByteArray(counter++, value);
       }
       rowId++;
     }
@@ -169,13 +165,13 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putByteArray(counter++, value, offset, length, ordinal);
+      columnVector.putByteArray(counter++, value, offset, length);
     }
   }
 
   @Override public void putNull(int rowId) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putNull(counter++, ordinal);
+      columnVector.putNull(counter++);
     }
   }
 
@@ -183,17 +179,17 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putNull(counter++, ordinal);
+          columnVector.putNull(counter++);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putNulls(rowId, count,ordinal);
+      columnVector.putNulls(rowId, count);
     }
   }
 
   @Override public boolean isNull(int rowId) {
-    return writableColumnVector.isNullAt(rowId,ordinal);
+    return columnVector.isNullAt(rowId);
   }
 
   @Override public void putObject(int rowId, Object obj) {
@@ -211,7 +207,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   }
 
   @Override public DataType getType() {
-    return CarbonScalaUtil.convertSparkToCarbonDataType(writableColumnVector.dataType(ordinal));
+    return CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 1e7389a..082ef8b 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -52,9 +52,10 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.CarbonVectorProxy;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
 import org.apache.spark.sql.types.DecimalType;
 import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 
 /**
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
@@ -67,11 +68,9 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
 
   private int batchIdx = 0;
 
-  private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
-
   private int numBatched = 0;
 
-  private CarbonVectorProxy vectorProxy;
+  private ColumnarBatch columnarBatch;
 
   private CarbonColumnarBatch carbonColumnarBatch;
 
@@ -155,9 +154,9 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
   @Override
   public void close() throws IOException {
     logStatistics(rowCount, queryModel.getStatisticsRecorder());
-    if (vectorProxy != null) {
-      vectorProxy.close();
-      vectorProxy = null;
+    if (columnarBatch != null) {
+      columnarBatch.close();
+      columnarBatch = null;
     }
     // clear dictionary cache
     Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
@@ -191,15 +190,15 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
   @Override
   public Object getCurrentValue() throws IOException, InterruptedException {
     if (returnColumnarBatch) {
-      int value = vectorProxy.numRows();
+      int value = columnarBatch.numValidRows();
       rowCount += value;
       if (inputMetricsStats != null) {
         inputMetricsStats.incrementRecordRead((long) value);
       }
-      return vectorProxy.getColumnarBatch();
+      return columnarBatch;
     }
     rowCount += 1;
-    return vectorProxy.getRow(batchIdx - 1);
+    return columnarBatch.getRow(batchIdx - 1);
   }
 
   @Override
@@ -261,13 +260,14 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
             CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
       }
     }
-    vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP,DEFAULT_BATCH_SIZE,fields);
+
+    columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode);
     CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
-    boolean[] filteredRows = new boolean[vectorProxy.numRows()];
+    boolean[] filteredRows = new boolean[columnarBatch.capacity()];
     for (int i = 0; i < fields.length; i++) {
-      vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i);
+      vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows);
     }
-    carbonColumnarBatch = new CarbonColumnarBatch(vectors, vectorProxy.numRows(), filteredRows);
+    carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
   }
 
   private void initBatch() {
@@ -275,7 +275,7 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
   }
 
   private void resultBatch() {
-    if (vectorProxy == null) initBatch();
+    if (columnarBatch == null) initBatch();
   }
 
 
@@ -284,12 +284,12 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
    * Advances to the next batch of rows. Returns false if there are no more.
    */
   private boolean nextBatch() {
-    vectorProxy.reset();
+    columnarBatch.reset();
     carbonColumnarBatch.reset();
     if (iterator.hasNext()) {
       iterator.processNextBatch(carbonColumnarBatch);
       int actualSize = carbonColumnarBatch.getActualSize();
-      vectorProxy.setNumRows(actualSize);
+      columnarBatch.setNumRows(actualSize);
       numBatched = actualSize;
       batchIdx = 0;
       return true;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
deleted file mode 100644
index 3e97eb8..0000000
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ /dev/null
@@ -1,747 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.stream;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.reader.CarbonHeaderReader;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.intf.RowImpl;
-import org.apache.carbondata.core.scan.filter.intf.RowIntf;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.format.BlockletHeader;
-import org.apache.carbondata.format.FileHeader;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.InputMetricsStats;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.streaming.CarbonStreamInputFormat;
-import org.apache.carbondata.streaming.CarbonStreamUtils;
-import org.apache.carbondata.streaming.StreamBlockletReader;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * Stream record reader
- */
-public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
-  // vector reader
-  private boolean isVectorReader;
-
-  // metadata
-  private CarbonTable carbonTable;
-  private CarbonColumn[] storageColumns;
-  private boolean[] isRequired;
-  private DataType[] measureDataTypes;
-  private int dimensionCount;
-  private int measureCount;
-
-  // input
-  private FileSplit fileSplit;
-  private Configuration hadoopConf;
-  private StreamBlockletReader input;
-  private boolean isFirstRow = true;
-  private QueryModel model;
-
-  // decode data
-  private BitSet allNonNull;
-  private boolean[] isNoDictColumn;
-  private DirectDictionaryGenerator[] directDictionaryGenerators;
-  private CacheProvider cacheProvider;
-  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
-  private GenericQueryType[] queryTypes;
-
-  // vectorized reader
-  private StructType outputSchema;
-  private Object vectorProxy;
-  private boolean isFinished = false;
-
-  // filter
-  private FilterExecuter filter;
-  private boolean[] isFilterRequired;
-  private Object[] filterValues;
-  private RowIntf filterRow;
-  private int[] filterMap;
-
-  // output
-  private CarbonColumn[] projection;
-  private boolean[] isProjectionRequired;
-  private int[] projectionMap;
-  private Object[] outputValues;
-  private InternalRow outputRow;
-
-  // empty project, null filter
-  private boolean skipScanData;
-
-  // return raw row for handoff
-  private boolean useRawRow = false;
-
-  // InputMetricsStats
-  private InputMetricsStats inputMetricsStats;
-
-  private static final LogService LOGGER =
-          LogServiceFactory.getLogService(CarbonStreamRecordReader.class.getName());
-
-  public CarbonStreamRecordReader(boolean isVectorReader, InputMetricsStats inputMetricsStats,
-      QueryModel mdl, boolean useRawRow) {
-    this.isVectorReader = isVectorReader;
-    this.inputMetricsStats = inputMetricsStats;
-    this.model = mdl;
-    this.useRawRow = useRawRow;
-
-  }
-  @Override public void initialize(InputSplit split, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    // input
-    if (split instanceof CarbonInputSplit) {
-      fileSplit = (CarbonInputSplit) split;
-    } else if (split instanceof CarbonMultiBlockSplit) {
-      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
-    } else {
-      fileSplit = (FileSplit) split;
-    }
-
-    // metadata
-    hadoopConf = context.getConfiguration();
-    if (model == null) {
-      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
-      model = format.createQueryModel(split, context);
-    }
-    carbonTable = model.getTable();
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getTableName());
-    dimensionCount = dimensions.size();
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getTableName());
-    measureCount = measures.size();
-    List<CarbonColumn> carbonColumnList =
-        carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
-    storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
-    isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
-    directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
-    for (int i = 0; i < storageColumns.length; i++) {
-      if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
-            .getDirectDictionaryGenerator(storageColumns[i].getDataType());
-      }
-    }
-    measureDataTypes = new DataType[measureCount];
-    for (int i = 0; i < measureCount; i++) {
-      measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
-    }
-
-    // decode data
-    allNonNull = new BitSet(storageColumns.length);
-    projection = model.getProjectionColumns();
-
-    isRequired = new boolean[storageColumns.length];
-    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
-    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
-    isFilterRequired = new boolean[storageColumns.length];
-    filterMap = new int[storageColumns.length];
-    for (int i = 0; i < storageColumns.length; i++) {
-      if (storageColumns[i].isDimension()) {
-        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
-          isRequired[i] = true;
-          isFilterRequired[i] = true;
-          filterMap[i] = storageColumns[i].getOrdinal();
-        }
-      } else {
-        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
-          isRequired[i] = true;
-          isFilterRequired[i] = true;
-          filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
-        }
-      }
-    }
-
-    isProjectionRequired = new boolean[storageColumns.length];
-    projectionMap = new int[storageColumns.length];
-    for (int j = 0; j < projection.length; j++) {
-      for (int i = 0; i < storageColumns.length; i++) {
-        if (storageColumns[i].getColName().equals(projection[j].getColName())) {
-          isRequired[i] = true;
-          isProjectionRequired[i] = true;
-          projectionMap[i] = j;
-          break;
-        }
-      }
-    }
-
-    // initialize filter
-    if (null != model.getFilterExpressionResolverTree()) {
-      initializeFilter();
-    } else if (projection.length == 0) {
-      skipScanData = true;
-    }
-
-  }
-
-  private void initializeFilter() {
-
-    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
-        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
-            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
-    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
-    for (int i = 0; i < dimLensWithComplex.length; i++) {
-      dimLensWithComplex[i] = Integer.MAX_VALUE;
-    }
-
-    int[] dictionaryColumnCardinality =
-        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
-    SegmentProperties segmentProperties =
-        new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
-    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
-
-    FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
-    filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
-        complexDimensionInfoMap);
-    // for row filter, we need update column index
-    FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
-        carbonTable.getDimensionOrdinalMax());
-
-  }
-
-  private byte[] getSyncMarker(String filePath) throws IOException {
-    CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
-    FileHeader header = headerReader.readHeader();
-    return header.getSync_marker();
-  }
-
-  private void initializeAtFirstRow() throws IOException {
-    filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
-    filterRow = new RowImpl();
-    filterRow.setValues(filterValues);
-
-    outputValues = new Object[projection.length];
-    outputRow = new GenericInternalRow(outputValues);
-
-    Path file = fileSplit.getPath();
-
-    byte[] syncMarker = getSyncMarker(file.toString());
-
-    FileSystem fs = file.getFileSystem(hadoopConf);
-
-    int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
-        CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
-
-    FSDataInputStream fileIn = fs.open(file, bufferSize);
-    fileIn.seek(fileSplit.getStart());
-    input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
-        fileSplit.getStart() == 0);
-
-    cacheProvider = CacheProvider.getInstance();
-    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
-    queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
-
-    outputSchema = new StructType((StructField[])
-        DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection));
-  }
-
-  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (isFirstRow) {
-      isFirstRow = false;
-      initializeAtFirstRow();
-    }
-    if (isFinished) {
-      return false;
-    }
-
-    if (isVectorReader) {
-      return nextColumnarBatch();
-    }
-
-    return nextRow();
-  }
-
-  /**
-   * for vector reader, check next columnar batch
-   */
-  private boolean nextColumnarBatch() throws IOException {
-    boolean hasNext;
-    boolean scanMore = false;
-    do {
-      // move to the next blocklet
-      hasNext = input.nextBlocklet();
-      if (hasNext) {
-        // read blocklet header
-        BlockletHeader header = input.readBlockletHeader();
-        if (isScanRequired(header)) {
-          scanMore = !scanBlockletAndFillVector(header);
-        } else {
-          input.skipBlockletData(true);
-          scanMore = true;
-        }
-      } else {
-        isFinished = true;
-        scanMore = false;
-      }
-    } while (scanMore);
-    return hasNext;
-  }
-
-  /**
-   * check next Row
-   */
-  private boolean nextRow() throws IOException {
-    // read row one by one
-    try {
-      boolean hasNext;
-      boolean scanMore = false;
-      do {
-        hasNext = input.hasNext();
-        if (hasNext) {
-          if (skipScanData) {
-            input.nextRow();
-            scanMore = false;
-          } else {
-            if (useRawRow) {
-              // read raw row for streaming handoff which does not require decode raw row
-              readRawRowFromStream();
-            } else {
-              readRowFromStream();
-            }
-            if (null != filter) {
-              scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
-            } else {
-              scanMore = false;
-            }
-          }
-        } else {
-          if (input.nextBlocklet()) {
-            BlockletHeader header = input.readBlockletHeader();
-            if (isScanRequired(header)) {
-              if (skipScanData) {
-                input.skipBlockletData(false);
-              } else {
-                input.readBlockletData(header);
-              }
-            } else {
-              input.skipBlockletData(true);
-            }
-            scanMore = true;
-          } else {
-            isFinished = true;
-            scanMore = false;
-          }
-        }
-      } while (scanMore);
-      return hasNext;
-    } catch (FilterUnsupportedException e) {
-      throw new IOException("Failed to filter row in detail reader", e);
-    }
-  }
-
-  @Override public Void getCurrentKey() throws IOException, InterruptedException {
-    return null;
-  }
-
-  @Override public Object getCurrentValue() throws IOException, InterruptedException {
-    if (isVectorReader) {
-      Method method = null;
-      try {
-        method = vectorProxy.getClass().getMethod("numRows");
-        int value = (int) method.invoke(vectorProxy);
-        if (inputMetricsStats != null) {
-          inputMetricsStats.incrementRecordRead((long) value);
-        }
-        method = vectorProxy.getClass().getMethod("getColumnarBatch");
-        return method.invoke(vectorProxy);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-
-    if (inputMetricsStats != null) {
-      inputMetricsStats.incrementRecordRead(1L);
-    }
-
-    return outputRow;
-  }
-
-  private boolean isScanRequired(BlockletHeader header) {
-    // TODO require to implement min-max index
-    return true;
-  }
-
-  private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
-    Constructor cons = null;
-    // if filter is null and output projection is empty, use the row number of blocklet header
-    int rowNum = 0;
-    String methodName = "setNumRows";
-    try {
-      String vectorReaderClassName = "org.apache.spark.sql.CarbonVectorProxy";
-      cons = CarbonStreamUtils.getConstructorWithReflection(vectorReaderClassName, MemoryMode.class,
-              StructType.class, int.class);
-      if (skipScanData) {
-
-        int rowNums = header.getBlocklet_info().getNum_rows();
-        vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP, outputSchema, rowNums);
-        Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class);
-        setNumRowsMethod.invoke(vectorProxy, rowNums);
-        input.skipBlockletData(true);
-        return rowNums > 0;
-      }
-      input.readBlockletData(header);
-      vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP,outputSchema, input.getRowNums());
-      if (null == filter) {
-        while (input.hasNext()) {
-          readRowFromStream();
-          putRowToColumnBatch(rowNum++);
-        }
-      } else {
-        try {
-          while (input.hasNext()) {
-            readRowFromStream();
-            if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
-              putRowToColumnBatch(rowNum++);
-            }
-          }
-        } catch (FilterUnsupportedException e) {
-          throw new IOException("Failed to filter row in vector reader", e);
-        }
-      }
-      Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class);
-      setNumRowsMethod.invoke(vectorProxy, rowNum);
-    } catch (Exception e) {
-      throw new IOException("Failed to fill row in  vector reader", e);
-    }
-    return rowNum > 0;
-  }
-
-  private void readRowFromStream() {
-    input.nextRow();
-    short nullLen = input.readShort();
-    BitSet nullBitSet = allNonNull;
-    if (nullLen > 0) {
-      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
-    }
-    int colCount = 0;
-    // primitive type dimension
-    for (; colCount < isNoDictColumn.length; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        if (isFilterRequired[colCount]) {
-          filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
-        }
-        if (isProjectionRequired[colCount]) {
-          outputValues[projectionMap[colCount]] = null;
-        }
-      } else {
-        if (isNoDictColumn[colCount]) {
-          int v = input.readShort();
-          if (isRequired[colCount]) {
-            byte[] b = input.readBytes(v);
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = b;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] =
-                  DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
-                      storageColumns[colCount].getDataType());
-            }
-          } else {
-            input.skipBytes(v);
-          }
-        } else if (null != directDictionaryGenerators[colCount]) {
-          if (isRequired[colCount]) {
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = input.copy(4);
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] =
-                  directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
-            } else {
-              input.skipBytes(4);
-            }
-          } else {
-            input.skipBytes(4);
-          }
-        } else {
-          if (isRequired[colCount]) {
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = input.copy(4);
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = input.readInt();
-            } else {
-              input.skipBytes(4);
-            }
-          } else {
-            input.skipBytes(4);
-          }
-        }
-      }
-    }
-    // complex type dimension
-    for (; colCount < dimensionCount; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        if (isFilterRequired[colCount]) {
-          filterValues[filterMap[colCount]] = null;
-        }
-        if (isProjectionRequired[colCount]) {
-          outputValues[projectionMap[colCount]] = null;
-        }
-      } else {
-        short v = input.readShort();
-        if (isRequired[colCount]) {
-          byte[] b = input.readBytes(v);
-          if (isFilterRequired[colCount]) {
-            filterValues[filterMap[colCount]] = b;
-          }
-          if (isProjectionRequired[colCount]) {
-            outputValues[projectionMap[colCount]] = queryTypes[colCount]
-                .getDataBasedOnDataType(ByteBuffer.wrap(b));
-          }
-        } else {
-          input.skipBytes(v);
-        }
-      }
-    }
-    // measure
-    DataType dataType;
-    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
-      if (nullBitSet.get(colCount)) {
-        if (isFilterRequired[colCount]) {
-          filterValues[filterMap[colCount]] = null;
-        }
-        if (isProjectionRequired[colCount]) {
-          outputValues[projectionMap[colCount]] = null;
-        }
-      } else {
-        dataType = measureDataTypes[msrCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          if (isRequired[colCount]) {
-            boolean v = input.readBoolean();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(1);
-          }
-        } else if (dataType == DataTypes.SHORT) {
-          if (isRequired[colCount]) {
-            short v = input.readShort();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(2);
-          }
-        } else if (dataType == DataTypes.INT) {
-          if (isRequired[colCount]) {
-            int v = input.readInt();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(4);
-          }
-        } else if (dataType == DataTypes.LONG) {
-          if (isRequired[colCount]) {
-            long v = input.readLong();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(8);
-          }
-        } else if (dataType == DataTypes.DOUBLE) {
-          if (isRequired[colCount]) {
-            double v = input.readDouble();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(8);
-          }
-        } else if (DataTypes.isDecimal(dataType)) {
-          int len = input.readShort();
-          if (isRequired[colCount]) {
-            BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] =
-                  DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v);
-            }
-          } else {
-            input.skipBytes(len);
-          }
-        }
-      }
-    }
-  }
-
-  private void readRawRowFromStream() {
-    input.nextRow();
-    short nullLen = input.readShort();
-    BitSet nullBitSet = allNonNull;
-    if (nullLen > 0) {
-      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
-    }
-    int colCount = 0;
-    // primitive type dimension
-    for (; colCount < isNoDictColumn.length; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
-      } else {
-        if (isNoDictColumn[colCount]) {
-          int v = input.readShort();
-          outputValues[colCount] = input.readBytes(v);
-        } else {
-          outputValues[colCount] = input.readInt();
-        }
-      }
-    }
-    // complex type dimension
-    for (; colCount < dimensionCount; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        outputValues[colCount] = null;
-      } else {
-        short v = input.readShort();
-        outputValues[colCount] = input.readBytes(v);
-      }
-    }
-    // measure
-    DataType dataType;
-    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
-      if (nullBitSet.get(colCount)) {
-        outputValues[colCount] = null;
-      } else {
-        dataType = measureDataTypes[msrCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          outputValues[colCount] = input.readBoolean();
-        } else if (dataType == DataTypes.SHORT) {
-          outputValues[colCount] = input.readShort();
-        } else if (dataType == DataTypes.INT) {
-          outputValues[colCount] = input.readInt();
-        } else if (dataType == DataTypes.LONG) {
-          outputValues[colCount] = input.readLong();
-        } else if (dataType == DataTypes.DOUBLE) {
-          outputValues[colCount] = input.readDouble();
-        } else if (DataTypes.isDecimal(dataType)) {
-          int len = input.readShort();
-          outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
-        }
-      }
-    }
-  }
-
-  private void putRowToColumnBatch(int rowId) {
-    Class<?>[] paramTypes = {int.class, Object.class, int.class};
-    Method putRowToColumnBatch = null;
-    try {
-      putRowToColumnBatch = vectorProxy.getClass().getMethod("putRowToColumnBatch", paramTypes);
-
-    } catch (Exception e) {
-      LOGGER.error(
-              "Unable to put the row in the vector" + "rowid: " + rowId + e);
-    }
-    for (int i = 0; i < projection.length; i++) {
-      Object value = outputValues[i];
-      try {
-        putRowToColumnBatch.invoke(vectorProxy, rowId, value, i);
-      } catch (Exception e) {
-        LOGGER.error(
-                "Unable to put the row in the vector" + "rowid: " + rowId + e);
-      }
-    }
-  }
-
-  @Override public float getProgress() throws IOException, InterruptedException {
-    return 0;
-  }
-
-  @Override public void close() throws IOException {
-    if (null != input) {
-      input.close();
-    }
-    if (null != vectorProxy) {
-      try {
-        Method closeMethod = vectorProxy.getClass().getMethod("close");
-        closeMethod.invoke(vectorProxy);
-      } catch (Exception e) {
-        LOGGER.error(
-                "Unable to close the stream vector reader" + e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 1e30466..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import java.math.BigInteger;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.1 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
-    private ColumnarBatch columnarBatch;
-
-    /**
-     * Adapter class which handles the columnar vector reading of the carbondata
-     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-     * handles the complexity of spark 2.3 version related api changes since
-     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-     *
-     * @param memMode       which represent the type onheap or offheap vector.
-     * @param rowNum        rows number for vector reading
-     * @param structFileds, metadata related to current schema of table.
-     */
-    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
-    }
-
-    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
-        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
-    }
-
-    /**
-     * Sets the number of rows in this batch.
-     */
-    public void setNumRows(int numRows) {
-        columnarBatch.setNumRows(numRows);
-    }
-
-    /**
-     * Returns the number of rows for read, including filtered rows.
-     */
-    public int numRows() {
-        return columnarBatch.capacity();
-    }
-
-    /**
-     * Called to close all the columns in this batch. It is not valid to access the data after
-     * calling this. This must be called at the end to clean up memory allocations.
-     */
-    public void close() {
-        columnarBatch.close();
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public InternalRow getRow(int rowId) {
-        return columnarBatch.getRow(rowId);
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public Object getColumnarBatch() {
-        return columnarBatch;
-    }
-
-    /**
-     * Resets this column for writing. The currently stored values are no longer accessible.
-     */
-    public void reset() {
-        columnarBatch.reset();
-    }
-
-    public void putRowToColumnBatch(int rowId, Object value, int offset) {
-        org.apache.spark.sql.types.DataType t = dataType(offset);
-        if (null == value) {
-            putNull(rowId, offset);
-        } else {
-            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-                putBoolean(rowId, (boolean) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-                putByte(rowId, (byte) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-                putShort(rowId, (short) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-                putLong(rowId, (long) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-                putFloat(rowId, (float) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-                putDouble(rowId, (double) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-                UTF8String v = (UTF8String) value;
-                putByteArray(rowId, v.getBytes(), offset);
-            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
-                DecimalType dt = (DecimalType) t;
-                Decimal d = Decimal.fromDecimal(value);
-                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-                    putInt(rowId, (int) d.toUnscaledLong(), offset);
-                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-                    putLong(rowId, d.toUnscaledLong(), offset);
-                } else {
-                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-                    byte[] bytes = integer.toByteArray();
-                    putByteArray(rowId, bytes, 0, bytes.length, offset);
-                }
-            } else if (t instanceof CalendarIntervalType) {
-                CalendarInterval c = (CalendarInterval) value;
-                columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
-                columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
-            } else if (t instanceof org.apache.spark.sql.types.DateType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-                putLong(rowId, (long) value, offset);
-            }
-        }
-    }
-
-    public void putBoolean(int rowId, boolean value, int ordinal) {
-        columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
-    }
-
-    public void putByte(int rowId, byte value, int ordinal) {
-        columnarBatch.column(ordinal).putByte(rowId, (byte) value);
-    }
-
-    public void putShort(int rowId, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShort(rowId, (short) value);
-    }
-
-    public void putInt(int rowId, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInt(rowId, (int) value);
-    }
-
-    public void putFloat(int rowId, float value, int ordinal) {
-        columnarBatch.column(ordinal).putFloat(rowId, (float) value);
-    }
-
-    public void putLong(int rowId, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLong(rowId, (long) value);
-    }
-
-    public void putDouble(int rowId, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDouble(rowId, (double) value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
-    }
-
-    public void putInts(int rowId, int count, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInts(rowId, count, value);
-    }
-
-    public void putShorts(int rowId, int count, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShorts(rowId, count, value);
-    }
-
-    public void putLongs(int rowId, int count, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLongs(rowId, count, value);
-    }
-
-    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
-        columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
-
-    }
-
-    public void putDoubles(int rowId, int count, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDoubles(rowId, count, value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
-    }
-
-    public void putNull(int rowId, int ordinal) {
-        columnarBatch.column(ordinal).putNull(rowId);
-    }
-
-    public void putNulls(int rowId, int count, int ordinal) {
-        columnarBatch.column(ordinal).putNulls(rowId, count);
-    }
-
-    public boolean isNullAt(int rowId, int ordinal) {
-        return columnarBatch.column(ordinal).isNullAt(rowId);
-    }
-
-    public DataType dataType(int ordinal) {
-        return columnarBatch.column(ordinal).dataType();
-    }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 7c1bb16..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import java.math.BigInteger;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.2 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
-    private ColumnarBatch columnarBatch;
-
-    /**
-     * Adapter class which handles the columnar vector reading of the carbondata
-     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-     * handles the complexity of spark 2.3 version related api changes since
-     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-     *
-     * @param memMode       which represent the type onheap or offheap vector.
-     * @param rowNum        rows number for vector reading
-     * @param structFileds, metadata related to current schema of table.
-     */
-    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
-    }
-
-    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
-        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
-    }
-
-    /**
-     * Sets the number of rows in this batch.
-     */
-    public void setNumRows(int numRows) {
-        columnarBatch.setNumRows(numRows);
-    }
-
-    /**
-     * Returns the number of rows for read, including filtered rows.
-     */
-    public int numRows() {
-        return columnarBatch.capacity();
-    }
-
-    /**
-     * Called to close all the columns in this batch. It is not valid to access the data after
-     * calling this. This must be called at the end to clean up memory allocations.
-     */
-    public void close() {
-        columnarBatch.close();
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public InternalRow getRow(int rowId) {
-        return columnarBatch.getRow(rowId);
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public Object getColumnarBatch() {
-        return columnarBatch;
-    }
-
-    /**
-     * Resets this column for writing. The currently stored values are no longer accessible.
-     */
-    public void reset() {
-        columnarBatch.reset();
-    }
-
-    public void putRowToColumnBatch(int rowId, Object value, int offset) {
-        org.apache.spark.sql.types.DataType t = dataType(offset);
-        if (null == value) {
-            putNull(rowId, offset);
-        } else {
-            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-                putBoolean(rowId, (boolean) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-                putByte(rowId, (byte) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-                putShort(rowId, (short) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-                putLong(rowId, (long) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-                putFloat(rowId, (float) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-                putDouble(rowId, (double) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-                UTF8String v = (UTF8String) value;
-                putByteArray(rowId, v.getBytes(), offset);
-            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
-                DecimalType dt = (DecimalType) t;
-                Decimal d = Decimal.fromDecimal(value);
-                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-                    putInt(rowId, (int) d.toUnscaledLong(), offset);
-                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-                    putLong(rowId, d.toUnscaledLong(), offset);
-                } else {
-                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-                    byte[] bytes = integer.toByteArray();
-                    putByteArray(rowId, bytes, 0, bytes.length, offset);
-                }
-            } else if (t instanceof CalendarIntervalType) {
-                CalendarInterval c = (CalendarInterval) value;
-                columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
-                columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
-            } else if (t instanceof org.apache.spark.sql.types.DateType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-                putLong(rowId, (long) value, offset);
-            }
-        }
-    }
-
-    public void putBoolean(int rowId, boolean value, int ordinal) {
-        columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
-    }
-
-    public void putByte(int rowId, byte value, int ordinal) {
-        columnarBatch.column(ordinal).putByte(rowId, (byte) value);
-    }
-
-    public void putShort(int rowId, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShort(rowId, (short) value);
-    }
-
-    public void putInt(int rowId, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInt(rowId, (int) value);
-    }
-
-    public void putFloat(int rowId, float value, int ordinal) {
-        columnarBatch.column(ordinal).putFloat(rowId, (float) value);
-    }
-
-    public void putLong(int rowId, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLong(rowId, (long) value);
-    }
-
-    public void putDouble(int rowId, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDouble(rowId, (double) value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
-    }
-
-    public void putInts(int rowId, int count, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInts(rowId, count, value);
-    }
-
-    public void putShorts(int rowId, int count, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShorts(rowId, count, value);
-    }
-
-    public void putLongs(int rowId, int count, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLongs(rowId, count, value);
-    }
-
-    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
-        columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
-    }
-
-    public void putDoubles(int rowId, int count, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDoubles(rowId, count, value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
-    }
-
-    public void putNull(int rowId, int ordinal) {
-        columnarBatch.column(ordinal).putNull(rowId);
-    }
-
-    public void putNulls(int rowId, int count, int ordinal) {
-        columnarBatch.column(ordinal).putNulls(rowId, count);
-    }
-
-    public boolean isNullAt(int rowId, int ordinal) {
-        return columnarBatch.column(ordinal).isNullAt(rowId);
-    }
-
-    public DataType dataType(int ordinal) {
-        return columnarBatch.column(ordinal).dataType();
-    }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 245c575..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import java.math.BigInteger;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.ColumnVectorFactory;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.sql.types.*;
-import org.apache.spark.sql.vectorized.ColumnVector;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
-    private ColumnarBatch columnarBatch;
-    private WritableColumnVector[] writableColumnVectors;
-
-    /**
-     * Adapter class which handles the columnar vector reading of the carbondata
-     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-     * handles the complexity of spark 2.3 version related api changes since
-     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-     *
-     * @param memMode       which represent the type onheap or offheap vector.
-     * @param rowNum        rows number for vector reading
-     * @param structFileds, metadata related to current schema of table.
-     */
-    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-        writableColumnVectors = ColumnVectorFactory
-                .getColumnVector(memMode, new StructType(structFileds), rowNum);
-        columnarBatch = new ColumnarBatch(writableColumnVectors);
-        columnarBatch.setNumRows(rowNum);
-    }
-
-    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
-        writableColumnVectors = ColumnVectorFactory
-                .getColumnVector(memMode, outputSchema, rowNum);
-        columnarBatch = new ColumnarBatch(writableColumnVectors);
-        columnarBatch.setNumRows(rowNum);
-    }
-
-    /**
-     * Returns the number of rows for read, including filtered rows.
-     */
-    public int numRows() {
-        return columnarBatch.numRows();
-    }
-
-    /**
-     * This API will return a columnvector from a batch of column vector rows
-     * based on the ordinal
-     *
-     * @param ordinal
-     * @return
-     */
-    public ColumnVector column(int ordinal) {
-        return columnarBatch.column(ordinal);
-    }
-
-    /**
-     * Resets this column for writing. The currently stored values are no longer accessible.
-     */
-    public void reset() {
-        for (WritableColumnVector col : writableColumnVectors) {
-            col.reset();
-        }
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public InternalRow getRow(int rowId) {
-        return columnarBatch.getRow(rowId);
-    }
-
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public Object getColumnarBatch() {
-        return columnarBatch;
-    }
-
-    /**
-     * Called to close all the columns in this batch. It is not valid to access the data after
-     * calling this. This must be called at the end to clean up memory allocations.
-     */
-    public void close() {
-        columnarBatch.close();
-    }
-
-    /**
-     * Sets the number of rows in this batch.
-     */
-    public void setNumRows(int numRows) {
-        columnarBatch.setNumRows(numRows);
-    }
-
-    /**
-     * This method will add the row to the corresponding column vector object
-     *
-     * @param rowId
-     * @param value
-     */
-    public void putRowToColumnBatch(int rowId, Object value, int offset) {
-            org.apache.spark.sql.types.DataType t = dataType(offset);
-            if (null == value) {
-                putNull(rowId, offset);
-            } else {
-                if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-                    putBoolean(rowId, (boolean) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-                    putByte(rowId, (byte) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-                    putShort(rowId, (short) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-                    putInt(rowId, (int) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-                    putLong(rowId, (long) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-                    putFloat(rowId, (float) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-                    putDouble(rowId, (double) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-                    UTF8String v = (UTF8String) value;
-                    putByteArray(rowId, v.getBytes(), offset);
-                } else if (t instanceof DecimalType) {
-                    DecimalType dt = (DecimalType) t;
-                    Decimal d = Decimal.fromDecimal(value);
-                    if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-                        putInt(rowId, (int) d.toUnscaledLong(), offset);
-                    } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-                        putLong(rowId, d.toUnscaledLong(), offset);
-                    } else {
-                        final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-                        byte[] bytes = integer.toByteArray();
-                        putByteArray(rowId, bytes, 0, bytes.length, offset);
-                    }
-                } else if (t instanceof CalendarIntervalType) {
-                    CalendarInterval c = (CalendarInterval) value;
-                    writableColumnVectors[offset].getChild(0).putInt(rowId, c.months);
-                    writableColumnVectors[offset].getChild(1).putLong(rowId, c.microseconds);
-                } else if (t instanceof org.apache.spark.sql.types.DateType) {
-                    putInt(rowId, (int) value, offset);
-                } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-                    putLong(rowId, (long) value, offset);
-                }
-            }
-    }
-
-    public void putBoolean(int rowId, boolean value, int ordinal) {
-        writableColumnVectors[ordinal].putBoolean(rowId, (boolean) value);
-    }
-
-    public void putByte(int rowId, byte value, int ordinal) {
-        writableColumnVectors[ordinal].putByte(rowId, (byte) value);
-    }
-
-    public void putShort(int rowId, short value, int ordinal) {
-        writableColumnVectors[ordinal].putShort(rowId, (short) value);
-    }
-
-    public void putInt(int rowId, int value, int ordinal) {
-        writableColumnVectors[ordinal].putInt(rowId, (int) value);
-    }
-
-    public void putFloat(int rowId, float value, int ordinal) {
-        writableColumnVectors[ordinal].putFloat(rowId, (float) value);
-    }
-
-    public void putLong(int rowId, long value, int ordinal) {
-        writableColumnVectors[ordinal].putLong(rowId, (long) value);
-    }
-
-    public void putDouble(int rowId, double value, int ordinal) {
-        writableColumnVectors[ordinal].putDouble(rowId, (double) value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int ordinal) {
-        writableColumnVectors[ordinal].putByteArray(rowId, (byte[]) value);
-    }
-
-    public void putInts(int rowId, int count, int value, int ordinal) {
-        writableColumnVectors[ordinal].putInts(rowId, count, value);
-    }
-
-    public void putShorts(int rowId, int count, short value, int ordinal) {
-        writableColumnVectors[ordinal].putShorts(rowId, count, value);
-    }
-
-    public void putLongs(int rowId, int count, long value, int ordinal) {
-        writableColumnVectors[ordinal].putLongs(rowId, count, value);
-    }
-
-    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
-        writableColumnVectors[ordinal].putDecimal(rowId, value, precision);
-
-    }
-
-    public void putDoubles(int rowId, int count, double value, int ordinal) {
-        writableColumnVectors[ordinal].putDoubles(rowId, count, value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
-        writableColumnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length);
-    }
-
-    public void putNull(int rowId, int ordinal) {
-        writableColumnVectors[ordinal].putNull(rowId);
-    }
-
-    public void putNulls(int rowId, int count, int ordinal) {
-        writableColumnVectors[ordinal].putNulls(rowId, count);
-    }
-
-    public boolean isNullAt(int rowId, int ordinal) {
-        return writableColumnVectors[ordinal].isNullAt(rowId);
-    }
-
-    public DataType dataType(int ordinal) {
-        return writableColumnVectors[ordinal].dataType();
-    }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
deleted file mode 100644
index 6fe5ede..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
-import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
-public class ColumnVectorFactory {
-
-
-    public static WritableColumnVector[] getColumnVector(MemoryMode memMode, StructType outputSchema, int rowNums) {
-
-
-        WritableColumnVector[] writableColumnVectors = null;
-        switch (memMode) {
-            case ON_HEAP:
-                writableColumnVectors = OnHeapColumnVector
-                        .allocateColumns(rowNums, outputSchema);
-                break;
-            case OFF_HEAP:
-                writableColumnVectors = OffHeapColumnVector
-                        .allocateColumns(rowNums, outputSchema);
-                break;
-        }
-        return writableColumnVectors;
-    }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
index c0e270c..644ac4c 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.streaming;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
@@ -34,10 +33,7 @@ import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
 import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
 import org.apache.carbondata.core.scan.complextypes.StructQueryType;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.hadoop.InputMetricsStats;
-import org.apache.carbondata.streaming.CarbonStreamUtils;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -51,46 +47,10 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
 
   public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
   public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
-  public static final String STREAM_RECORD_READER_INSTANCE =
-      "org.apache.carbondata.stream.CarbonStreamRecordReader";
-  // return raw row for handoff
-  private boolean useRawRow = false;
 
-  public void setUseRawRow(boolean useRawRow) {
-    this.useRawRow = useRawRow;
-  }
-
-  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
-    this.inputMetricsStats = inputMetricsStats;
-  }
-
-  public void setVectorReader(boolean vectorReader) {
-    isVectorReader = vectorReader;
-  }
-
-  public void setModel(QueryModel model) {
-    this.model = model;
-  }
-
-  // InputMetricsStats
-  private InputMetricsStats inputMetricsStats;
-  // vector reader
-  private boolean isVectorReader;
-  private QueryModel model;
-
-  @Override
-  public RecordReader<Void, Object> createRecordReader(InputSplit split, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    try {
-      Constructor cons = CarbonStreamUtils
-          .getConstructorWithReflection(STREAM_RECORD_READER_INSTANCE, boolean.class,
-              InputMetricsStats.class, QueryModel.class, boolean.class);
-      return (RecordReader) CarbonStreamUtils
-          .getInstanceWithReflection(cons, isVectorReader, inputMetricsStats, model, useRawRow);
-
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+  @Override public RecordReader<Void, Object> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new CarbonStreamRecordReader();
   }
 
   public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,


Mime
View raw message