carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/4] incubator-carbondata git commit: add initial check in for vector reader
Date Tue, 20 Dec 2016 03:10:57 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
index 9fa63d6..dc5fb17 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
@@ -145,7 +145,7 @@ public class CarbonCompactionExecutor {
       throws QueryExecutionException {
 
     queryModel.setTableBlockInfos(blockList);
-    this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
     CarbonIterator<BatchResult> iter = null;
     try {
       iter = queryExecutor.execute(queryModel);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index d654067..f20d12d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -18,17 +18,18 @@
 package org.apache.carbondata.spark.rdd
 
 import java.text.SimpleDateFormat
-import java.util
+import java.util.ArrayList
 import java.util.Date
+import java.util.List
 
 import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark._
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -37,8 +38,9 @@ import org.apache.carbondata.core.carbon.datastore.block.Distributable
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit,
CarbonProjection}
+import org.apache.carbondata.hadoop._
 import org.apache.carbondata.scan.expression.Expression
+import org.apache.carbondata.scan.model.QueryModel
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 
 /**
@@ -46,19 +48,20 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil
  * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData
file
  * level filtering in driver side.
  */
-class CarbonScanRDD[V: ClassTag](
+class CarbonScanRDD(
     @transient sc: SparkContext,
     columnProjection: CarbonProjection,
     filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
     @transient carbonTable: CarbonTable)
-  extends RDD[V](sc, Nil) {
+  extends RDD[InternalRow](sc, Nil) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     formatter.format(new Date())
   }
+  private var vectorReader = false
 
   @transient private val jobId = new JobID(jobTrackerId, id)
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -76,7 +79,7 @@ class CarbonScanRDD[V: ClassTag](
     result
   }
 
-  private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
+  private def distributeSplits(splits: List[InputSplit]): Array[Partition] = {
     // this function distributes the split based on following logic:
     // 1. based on data locality, to make split balanced on all available nodes
     // 2. if the number of split for one
@@ -84,7 +87,7 @@ class CarbonScanRDD[V: ClassTag](
     var statistic = new QueryStatistic()
     val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
     val parallelism = sparkContext.defaultParallelism
-    val result = new util.ArrayList[Partition](parallelism)
+    val result = new ArrayList[Partition](parallelism)
     var noOfBlocks = 0
     var noOfNodes = 0
     var noOfTasks = 0
@@ -138,7 +141,7 @@ class CarbonScanRDD[V: ClassTag](
     result.toArray(new Array[Partition](result.size()))
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[V] = {
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
       System.setProperty("carbon.properties.filepath",
@@ -150,12 +153,25 @@ class CarbonScanRDD[V: ClassTag](
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
     val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
     val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
-    val reader = format.createRecordReader(inputSplit, attemptContext)
+    val model = format.getQueryModel(inputSplit, attemptContext)
+    val reader = {
+      if (vectorReader) {
+        val carbonRecordReader = createVectorizedCarbonRecordReader(model)
+        if (carbonRecordReader == null) {
+          new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
+        } else {
+          carbonRecordReader
+        }
+      } else {
+        new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
+      }
+    }
+
     reader.initialize(inputSplit, attemptContext)
 
     val queryStartTime = System.currentTimeMillis
 
-    new Iterator[V] {
+    val iterator = new Iterator[Any] {
       private var havePair = false
       private var finished = false
       private var count = 0
@@ -179,30 +195,31 @@ class CarbonScanRDD[V: ClassTag](
         !finished
       }
 
-      override def next(): V = {
+      override def next(): Any = {
         if (!hasNext) {
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        val value: V = reader.getCurrentValue
+        val value = reader.getCurrentValue
         count += 1
         value
       }
     }
+    iterator.asInstanceOf[Iterator[InternalRow]]
   }
 
-  private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
+  private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object]
= {
     CarbonInputFormat.setCarbonTable(conf, carbonTable)
     createInputFormat(conf)
   }
 
-  private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] =
{
+  private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object]
= {
     CarbonInputFormat.setCarbonReadSupport(conf, SparkReadSupport.readSupportClass)
     createInputFormat(conf)
   }
 
-  private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
-    val format = new CarbonInputFormat[V]
+  private def createInputFormat(conf: Configuration): CarbonInputFormat[Object] = {
+    val format = new CarbonInputFormat[Object]
     CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
     CarbonInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonInputFormat.setColumnProjection(conf, columnProjection)
@@ -231,4 +248,22 @@ class CarbonScanRDD[V: ClassTag](
     val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
     firstOptionLocation
   }
+
+  def createVectorizedCarbonRecordReader(queryModel: QueryModel): RecordReader[Void, Object]
= {
+    val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
+    try {
+      val cons = Class.forName(name).getDeclaredConstructors
+      cons.head.setAccessible(true)
+      cons.head.newInstance(queryModel).asInstanceOf[RecordReader[Void, Object]]
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e)
+        null
+    }
+  }
+
+  // TODO find the better way set it.
+  def setVectorReaderSupport(boolean: Boolean): Unit = {
+    vectorReader = boolean
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 976631d..bdc223a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -125,7 +125,7 @@ case class CarbonScan(
     selectedMsrs.foreach(plan.addMeasure)
   }
 
-  def inputRdd: CarbonScanRDD[Array[Any]] = {
+  def inputRdd: CarbonScanRDD = {
     val projection = new CarbonProjection
     columnProjection.foreach { attr =>
       projection.addColumn(attr.name)
@@ -152,9 +152,9 @@ case class CarbonScan(
         override def next(): InternalRow = {
           val value = iter.next
           if (outUnsafeRows) {
-            unsafeProjection(new GenericMutableRow(value))
+            unsafeProjection(value)
           } else {
-            new GenericMutableRow(value)
+            value
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
index 4f391bb..b86886e 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
@@ -133,11 +133,11 @@ class ColumnGroupDataTypesTestCase extends QueryTest with BeforeAndAfterAll
{
     try {
       sql("create table colgrp_disorder (column1 string,column2 string,column3 string,column4
string,column5 string,column6 string,column7 string,column8 string,column9 string,column10
string,measure1 int,measure2 int,measure3 int,measure4 int) STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES (\"COLUMN_GROUPS\"=\"(column7,column8),(column2,column3,column4)\")")
       sql("LOAD DATA LOCAL INPATH './src/test/resources/10dim_4msr.csv' INTO table colgrp_disorder
options('FILEHEADER'='column1,column2,column3,column4,column5,column6,column7,column8,column9,column10,measure1,measure2,measure3,measure4')");
-      assert(true)  
+      assert(true)
     } catch {
       case ex: Exception => assert(false)
     }
-    
+
   }
   override def afterAll {
     sql("drop table colgrp")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 499ef0c..3aea985 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -18,18 +18,15 @@
  */
 package org.apache.carbondata.spark.readsupport;
 
-import java.sql.Timestamp;
-
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
 
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
 
-public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row>
{
+public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<InternalRow>
{
 
   @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
@@ -37,26 +34,19 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
     //can initialize and generate schema here.
   }
 
-  @Override public Row readRow(Object[] data) {
+  @Override public InternalRow readRow(Object[] data) {
     for (int i = 0; i < dictionaries.length; i++) {
       if (data[i] == null) {
         continue;
       }
       if (dictionaries[i] == null) {
-        if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-          //convert the long to timestamp in case of direct dictionary column
-          if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
-            data[i] = new Timestamp((long) data[i]);
-          } else if(DataType.DATE == carbonColumns[i].getDataType()) {
-            data[i] = new java.sql.Date((int) data[i]);
-          }
-        } else if(dataTypes[i].equals(DataType.INT)) {
+        if(dataTypes[i].equals(DataType.INT)) {
           data[i] = ((Long)(data[i])).intValue();
         } else if(dataTypes[i].equals(DataType.SHORT)) {
           data[i] = ((Long)(data[i])).shortValue();
         }
       }
     }
-    return new GenericRow(data);
+    return new GenericMutableRow(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
new file mode 100644
index 0000000..84e5c07
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.vectorreader;
+
+import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.Decimal;
+
+public class ColumnarVectorWrapper implements CarbonColumnVector {
+
+  private ColumnVector columnVector;
+
+  public ColumnarVectorWrapper(ColumnVector columnVector) {
+    this.columnVector = columnVector;
+  }
+
+  @Override public void putShort(int rowId, short value) {
+    columnVector.putShort(rowId, value);
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    columnVector.putInt(rowId, value);
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    columnVector.putLong(rowId, value);
+  }
+
+  @Override public void putDecimal(int rowId, Decimal value, int precision) {
+    columnVector.putDecimal(rowId, value, precision);
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    columnVector.putDouble(rowId, value);
+  }
+
+  @Override public void putBytes(int rowId, byte[] value) {
+    columnVector.putByteArray(rowId, value);
+  }
+
+  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+    columnVector.putByteArray(rowId, value, offset, length);
+  }
+
+  @Override public void putNull(int rowId) {
+    columnVector.putNull(rowId);
+  }
+
+  @Override public boolean isNull(int rowId) {
+    return columnVector.isNullAt(rowId);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    //TODO handle complex types
+  }
+
+  @Override public Object getData(int rowId) {
+    //TODO handle complex types
+    return null;
+  }
+
+  @Override public void reset() {
+//    columnVector.reset();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
new file mode 100644
index 0000000..ba02bca
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.vectorreader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.scan.executor.QueryExecutor;
+import org.apache.carbondata.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.carbondata.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.spark.util.CarbonScalaUtil;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using
the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
+
+  private int batchIdx = 0;
+
+  private int numBatched = 0;
+
+  private ColumnarBatch columnarBatch;
+
+  private CarbonColumnarBatch carbonColumnarBatch;
+
+  /**
+   * If true, this class returns batches instead of rows.
+   */
+  private boolean returnColumnarBatch;
+
+  /**
+   * The default config on whether columnarBatch should be offheap.
+   */
+  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+
+  private QueryModel queryModel;
+
+  private AbstractDetailQueryResultIterator iterator;
+
+  private QueryExecutor queryExecutor;
+
+  public VectorizedCarbonRecordReader(QueryModel queryModel) {
+    this.queryModel = queryModel;
+    enableReturningBatches();
+  }
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException, UnsupportedOperationException {
+    // The input split can contain single HDFS block or multiple blocks, so firstly get all
the
+    // blocks and then set them in the query model.
+    List<CarbonInputSplit> splitList;
+    if (inputSplit instanceof CarbonInputSplit) {
+      splitList = new ArrayList<>(1);
+      splitList.add((CarbonInputSplit) inputSplit);
+    } else if (inputSplit instanceof CarbonMultiBlockSplit) {
+      // contains multiple blocks, this is an optimization for concurrent query.
+      CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
+      splitList = multiBlockSplit.getAllSplits();
+    } else {
+      throw new RuntimeException("unsupported input split type: " + inputSplit);
+    }
+    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+    queryModel.setTableBlockInfos(tableBlockInfoList);
+    queryModel.setVectorReader(true);
+    try {
+      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+      iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
+    } catch (QueryExecutionException e) {
+      throw new InterruptedException(e.getMessage());
+    }
+  }
+
+  @Override public void close() throws IOException {
+    if (columnarBatch != null) {
+      columnarBatch.close();
+      columnarBatch = null;
+    }
+    // clear dictionary cache
+    Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+    if (null != columnToDictionaryMapping) {
+      for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet())
{
+        CarbonUtil.clearDictionaryCache(entry.getValue());
+      }
+    }
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+    resultBatch();
+
+    if (returnColumnarBatch) return nextBatch();
+
+    if (batchIdx >= numBatched) {
+      if (!nextBatch()) return false;
+    }
+    ++batchIdx;
+    return true;
+  }
+
+  @Override public Object getCurrentValue() throws IOException, InterruptedException {
+    if (returnColumnarBatch) return columnarBatch;
+    return columnarBatch.getRow(batchIdx - 1);
+  }
+
+  @Override public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override public float getProgress() throws IOException, InterruptedException {
+    // TODO : Implement it based on total number of rows it is going to retrive.
+    return 0;
+  }
+
+  /**
+   * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+   * This object is reused. Calling this enables the vectorized reader. This should be called
+   * before any calls to nextKeyValue/nextBatch.
+   */
+
+  public void initBatch(MemoryMode memMode) {
+    List<QueryDimension> queryDimension = queryModel.getQueryDimension();
+    List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
+    StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
+    for (int i = 0; i < queryDimension.size(); i++) {
+      QueryDimension dim = queryDimension.get(i);
+      if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(dim.getDimension().getDataType());
+        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+            CarbonScalaUtil.convertCarbonToSparkDataType(generator.getReturnType()), true,
null);
+      } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+            CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()),
true,
+            null);
+      } else if (dim.getDimension().isComplex()) {
+        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+            CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()),
true,
+            null);
+      } else {
+        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+            CarbonScalaUtil.convertCarbonToSparkDataType(DataType.INT), true, null);
+      }
+    }
+
+    for (int i = 0; i < queryMeasures.size(); i++) {
+      QueryMeasure msr = queryMeasures.get(i);
+      switch (msr.getMeasure().getDataType()) {
+        case SHORT:
+        case INT:
+        case LONG:
+          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+              CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()),
true,
+              null);
+          break;
+        case DECIMAL:
+          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+              new DecimalType(msr.getMeasure().getPrecision(),
+                  msr.getMeasure().getScale()), true, null);
+          break;
+        default:
+          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+              CarbonScalaUtil.convertCarbonToSparkDataType(DataType.DOUBLE), true, null);
+      }
+    }
+
+    columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode);
+    CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+    for (int i = 0; i < fields.length; i++) {
+      vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i));
+    }
+    carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity());
+  }
+
+  private void initBatch() {
+    initBatch(DEFAULT_MEMORY_MODE);
+  }
+
+  private ColumnarBatch resultBatch() {
+    if (columnarBatch == null) initBatch();
+    return columnarBatch;
+  }
+
+  /*
+   * Can be called before any rows are returned to enable returning columnar batches directly.
+   */
+  public void enableReturningBatches() {
+    returnColumnarBatch = true;
+  }
+
+  /**
+   * Advances to the next batch of rows. Returns false if there are no more.
+   */
+  public boolean nextBatch() throws IOException {
+    columnarBatch.reset();
+    carbonColumnarBatch.reset();
+    if (iterator.hasNext()) {
+      iterator.processNextBatch(carbonColumnarBatch);
+      int actualSize = carbonColumnarBatch.getActualSize();
+      columnarBatch.setNumRows(actualSize);
+      numBatched = actualSize;
+      batchIdx = 0;
+      return true;
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 09a58ba..7087204 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.LoadTableByInsert
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
@@ -61,7 +62,7 @@ case class CarbonDatasourceHadoopRelation(
 
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
 
-  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow]
= {
     val filterExpression: Option[Expression] = filters.flatMap { filter =>
       CarbonFilters.createCarbonFilter(schema, filter)
     }.reduceOption(new AndExpression(_, _))
@@ -69,7 +70,7 @@ case class CarbonDatasourceHadoopRelation(
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
 
-    new CarbonScanRDD[Row](sqlContext.sparkContext, projection, filterExpression.orNull,
+    new CarbonScanRDD(sqlContext.sparkContext, projection, filterExpression.orNull,
       absIdentifier, carbonTable)
   }
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 9a625ee..ce5962d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -228,9 +228,9 @@ class CarbonDecoderRDD(
     relations: Seq[CarbonDecoderRelation],
     profile: CarbonProfile,
     aliasMap: CarbonAliasDecoderRelation,
-    prev: RDD[Row],
+    prev: RDD[InternalRow],
     output: Seq[Attribute])
-    extends RDD[Row](prev) {
+    extends RDD[InternalRow](prev) {
 
   def canBeDecoded(attr: Attribute): Boolean = {
     profile match {
@@ -296,7 +296,7 @@ class CarbonDecoderRDD(
     dictIds
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
           val storepath = CarbonEnv.get.carbonMetastore.storePath
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
@@ -319,15 +319,17 @@ class CarbonDecoderRDD(
         }
       }
     )
-    val iter = firstParent[Row].iterator(split, context)
-    new Iterator[Row] {
+    val iter = firstParent[InternalRow].iterator(split, context)
+    new Iterator[InternalRow] {
       var flag = true
       var total = 0L
+      val dataTypes = output.map { attr => attr.dataType }
       override final def hasNext: Boolean = iter.hasNext
 
-      override final def next(): Row = {
+      override final def next(): InternalRow = {
         val startTime = System.currentTimeMillis()
-        val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray
+        val row: InternalRow = iter.next()
+        val data = row.toSeq(dataTypes).toArray
         dictIndex.foreach { index =>
           if ( data(index) != null) {
             data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
@@ -335,7 +337,7 @@ class CarbonDecoderRDD(
               getDictionaryColumnIds(index)._3)
           }
         }
-        new GenericRow(data)
+        new GenericMutableRow(data)
       }
     }
   }
@@ -365,5 +367,5 @@ class CarbonDecoderRDD(
     dicts
   }
 
-  override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+  override protected def getPartitions: Array[Partition] = firstParent[InternalRow].partitions
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 7a8920f..5508a94 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.CatalystTypeConverters._
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -30,9 +31,12 @@ import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 /**
  * Carbon strategy for late decode (convert dictionary key to value as late as possible),
which
@@ -67,8 +71,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   def getDecoderRDD(
       logicalRelation: LogicalRelation,
       projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
-      rdd: RDD[Row],
-      output: Seq[Attribute]): RDD[Row] = {
+      rdd: RDD[InternalRow],
+      output: Seq[Attribute]): RDD[InternalRow] = {
     val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
       logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
@@ -87,19 +91,17 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   private[this] def toCatalystRDD(
       relation: LogicalRelation,
       output: Seq[Attribute],
-      rdd: RDD[Row],
+      rdd: RDD[InternalRow],
       needDecode: ArrayBuffer[AttributeReference]):
   RDD[InternalRow] = {
-    val newRdd = if (needDecode.size > 0) {
+    if (needDecode.size > 0) {
+      rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false)
       getDecoderRDD(relation, needDecode, rdd, output)
     } else {
+      rdd.asInstanceOf[CarbonScanRDD]
+        .setVectorReaderSupport(supportBatchedDataSource(relation.relation.sqlContext, output))
       rdd
     }
-    if (relation.relation.needConversion) {
-      execution.RDDConversions.rowToRowRdd(newRdd, output.map(_.dataType))
-    } else {
-      newRdd.asInstanceOf[RDD[InternalRow]]
-    }
   }
 
   protected def pruneFilterProject(
@@ -212,29 +214,61 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         }
         attr
       }
-      val scan = new execution.RowDataSourceScanExec(
+      val scan = getDataSourceScan(relation,
         updateProject,
-        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
-        // now carbon do not support partitioning, use UnknownPartitioning here, in future
if
-        // we add bucket, we should change the partitioning
-        relation.relation, UnknownPartitioning(0), metadata, None)
+        scanBuilder,
+        candidatePredicates,
+        pushedFilters,
+        metadata,
+        needDecoder,
+        updateRequestedColumns)
       filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
     } else {
       // Don't request columns that are only referenced by pushed filters.
       val requestedColumns =
       (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
       val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
-      val scan = new execution.RowDataSourceScanExec(
+      val scan = getDataSourceScan(relation,
         updateRequestedColumns,
-        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
-        // now carbon do not support partitioning, use UnknownPartitioning here, in future
if
-        // we add bucket, we should change the partitioning
-        relation.relation, UnknownPartitioning(0), metadata, None)
+        scanBuilder,
+        candidatePredicates,
+        pushedFilters,
+        metadata,
+        needDecoder,
+        updateRequestedColumns)
       execution.ProjectExec(
         projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
     }
   }
 
+  def getDataSourceScan(relation: LogicalRelation,
+      output: Seq[Attribute],
+      scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
+        ArrayBuffer[AttributeReference]) => RDD[InternalRow],
+      candidatePredicates: Seq[Expression],
+      pushedFilters: Seq[Filter],
+      metadata: Map[String, String],
+      needDecoder: ArrayBuffer[AttributeReference],
+      updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = {
+    if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
+        needDecoder.length == 0) {
+      BatchedDataSourceScanExec(
+        output,
+        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+        relation.relation,
+        UnknownPartitioning(0),
+        metadata,
+        relation.metastoreTableIdentifier)
+    } else {
+      RowDataSourceScanExec(output,
+        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+        relation.relation,
+        UnknownPartitioning(0),
+        metadata,
+        relation.metastoreTableIdentifier)
+    }
+  }
+
   def updateRequestedColumnsFunc(requestedColumns: Seq[AttributeReference],
       relation: CarbonDatasourceHadoopRelation,
       needDecoder: ArrayBuffer[AttributeReference]): Seq[AttributeReference] = {
@@ -393,4 +427,17 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       case others => None
     }
   }
+
+  def supportBatchedDataSource(sqlContext: SQLContext, cols: Seq[Attribute]): Boolean = {
+    val enableReader = {
+      if (sqlContext.sparkSession.conf.contains(CarbonCommonConstants.ENABLE_VECTOR_READER))
{
+        sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER).toBoolean
+      } else {
+        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT).toBoolean
+      }
+    }
+    sqlContext.conf.wholeStageEnabled && enableReader &&
+      cols.forall(_.dataType.isInstanceOf[AtomicType])
+  }
 }



Mime
View raw message