http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
index 9fa63d6..dc5fb17 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
@@ -145,7 +145,7 @@ public class CarbonCompactionExecutor {
throws QueryExecutionException {
queryModel.setTableBlockInfos(blockList);
- this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
+ this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
CarbonIterator<BatchResult> iter = null;
try {
iter = queryExecutor.execute(queryModel);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index d654067..f20d12d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -18,17 +18,18 @@
package org.apache.carbondata.spark.rdd
import java.text.SimpleDateFormat
-import java.util
+import java.util.ArrayList
import java.util.Date
+import java.util.List
import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark._
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -37,8 +38,9 @@ import org.apache.carbondata.core.carbon.datastore.block.Distributable
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit,
CarbonProjection}
+import org.apache.carbondata.hadoop._
import org.apache.carbondata.scan.expression.Expression
+import org.apache.carbondata.scan.model.QueryModel
import org.apache.carbondata.spark.load.CarbonLoaderUtil
/**
@@ -46,19 +48,20 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil
* CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData
file
* level filtering in driver side.
*/
-class CarbonScanRDD[V: ClassTag](
+class CarbonScanRDD(
@transient sc: SparkContext,
columnProjection: CarbonProjection,
filterExpression: Expression,
identifier: AbsoluteTableIdentifier,
@transient carbonTable: CarbonTable)
- extends RDD[V](sc, Nil) {
+ extends RDD[InternalRow](sc, Nil) {
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
formatter.format(new Date())
}
+ private var vectorReader = false
@transient private val jobId = new JobID(jobTrackerId, id)
@transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -76,7 +79,7 @@ class CarbonScanRDD[V: ClassTag](
result
}
- private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
+ private def distributeSplits(splits: List[InputSplit]): Array[Partition] = {
// this function distributes the split based on following logic:
// 1. based on data locality, to make split balanced on all available nodes
// 2. if the number of split for one
@@ -84,7 +87,7 @@ class CarbonScanRDD[V: ClassTag](
var statistic = new QueryStatistic()
val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
val parallelism = sparkContext.defaultParallelism
- val result = new util.ArrayList[Partition](parallelism)
+ val result = new ArrayList[Partition](parallelism)
var noOfBlocks = 0
var noOfNodes = 0
var noOfTasks = 0
@@ -138,7 +141,7 @@ class CarbonScanRDD[V: ClassTag](
result.toArray(new Array[Partition](result.size()))
}
- override def compute(split: Partition, context: TaskContext): Iterator[V] = {
+ override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
if (null == carbonPropertiesFilePath) {
System.setProperty("carbon.properties.filepath",
@@ -150,12 +153,25 @@ class CarbonScanRDD[V: ClassTag](
val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
- val reader = format.createRecordReader(inputSplit, attemptContext)
+ val model = format.getQueryModel(inputSplit, attemptContext)
+ val reader = {
+ if (vectorReader) {
+ val carbonRecordReader = createVectorizedCarbonRecordReader(model)
+ if (carbonRecordReader == null) {
+ new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
+ } else {
+ carbonRecordReader
+ }
+ } else {
+ new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
+ }
+ }
+
reader.initialize(inputSplit, attemptContext)
val queryStartTime = System.currentTimeMillis
- new Iterator[V] {
+ val iterator = new Iterator[Any] {
private var havePair = false
private var finished = false
private var count = 0
@@ -179,30 +195,31 @@ class CarbonScanRDD[V: ClassTag](
!finished
}
- override def next(): V = {
+ override def next(): Any = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
- val value: V = reader.getCurrentValue
+ val value = reader.getCurrentValue
count += 1
value
}
}
+ iterator.asInstanceOf[Iterator[InternalRow]]
}
- private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
+ private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object]
= {
CarbonInputFormat.setCarbonTable(conf, carbonTable)
createInputFormat(conf)
}
- private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] =
{
+ private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object]
= {
CarbonInputFormat.setCarbonReadSupport(conf, SparkReadSupport.readSupportClass)
createInputFormat(conf)
}
- private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
- val format = new CarbonInputFormat[V]
+ private def createInputFormat(conf: Configuration): CarbonInputFormat[Object] = {
+ val format = new CarbonInputFormat[Object]
CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
CarbonInputFormat.setFilterPredicates(conf, filterExpression)
CarbonInputFormat.setColumnProjection(conf, columnProjection)
@@ -231,4 +248,22 @@ class CarbonScanRDD[V: ClassTag](
val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
firstOptionLocation
}
+
+ def createVectorizedCarbonRecordReader(queryModel: QueryModel): RecordReader[Void, Object]
= {
+ val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
+ try {
+ val cons = Class.forName(name).getDeclaredConstructors
+ cons.head.setAccessible(true)
+ cons.head.newInstance(queryModel).asInstanceOf[RecordReader[Void, Object]]
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ null
+ }
+ }
+
+ // TODO find the better way set it.
+ def setVectorReaderSupport(boolean: Boolean): Unit = {
+ vectorReader = boolean
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 976631d..bdc223a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -125,7 +125,7 @@ case class CarbonScan(
selectedMsrs.foreach(plan.addMeasure)
}
- def inputRdd: CarbonScanRDD[Array[Any]] = {
+ def inputRdd: CarbonScanRDD = {
val projection = new CarbonProjection
columnProjection.foreach { attr =>
projection.addColumn(attr.name)
@@ -152,9 +152,9 @@ case class CarbonScan(
override def next(): InternalRow = {
val value = iter.next
if (outUnsafeRows) {
- unsafeProjection(new GenericMutableRow(value))
+ unsafeProjection(value)
} else {
- new GenericMutableRow(value)
+ value
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
index 4f391bb..b86886e 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
@@ -133,11 +133,11 @@ class ColumnGroupDataTypesTestCase extends QueryTest with BeforeAndAfterAll
{
try {
sql("create table colgrp_disorder (column1 string,column2 string,column3 string,column4
string,column5 string,column6 string,column7 string,column8 string,column9 string,column10
string,measure1 int,measure2 int,measure3 int,measure4 int) STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES (\"COLUMN_GROUPS\"=\"(column7,column8),(column2,column3,column4)\")")
sql("LOAD DATA LOCAL INPATH './src/test/resources/10dim_4msr.csv' INTO table colgrp_disorder
options('FILEHEADER'='column1,column2,column3,column4,column5,column6,column7,column8,column9,column10,measure1,measure2,measure3,measure4')");
- assert(true)
+ assert(true)
} catch {
case ex: Exception => assert(false)
}
-
+
}
override def afterAll {
sql("drop table colgrp")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 499ef0c..3aea985 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -18,18 +18,15 @@
*/
package org.apache.carbondata.spark.readsupport;
-import java.sql.Timestamp;
-
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
-public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row>
{
+public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<InternalRow>
{
@Override public void initialize(CarbonColumn[] carbonColumns,
AbsoluteTableIdentifier absoluteTableIdentifier) {
@@ -37,26 +34,19 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
//can initialize and generate schema here.
}
- @Override public Row readRow(Object[] data) {
+ @Override public InternalRow readRow(Object[] data) {
for (int i = 0; i < dictionaries.length; i++) {
if (data[i] == null) {
continue;
}
if (dictionaries[i] == null) {
- if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- //convert the long to timestamp in case of direct dictionary column
- if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
- data[i] = new Timestamp((long) data[i]);
- } else if(DataType.DATE == carbonColumns[i].getDataType()) {
- data[i] = new java.sql.Date((int) data[i]);
- }
- } else if(dataTypes[i].equals(DataType.INT)) {
+ if(dataTypes[i].equals(DataType.INT)) {
data[i] = ((Long)(data[i])).intValue();
} else if(dataTypes[i].equals(DataType.SHORT)) {
data[i] = ((Long)(data[i])).shortValue();
}
}
}
- return new GenericRow(data);
+ return new GenericMutableRow(data);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
new file mode 100644
index 0000000..84e5c07
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.vectorreader;
+
+import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.Decimal;
+
+public class ColumnarVectorWrapper implements CarbonColumnVector {
+
+ private ColumnVector columnVector;
+
+ public ColumnarVectorWrapper(ColumnVector columnVector) {
+ this.columnVector = columnVector;
+ }
+
+ @Override public void putShort(int rowId, short value) {
+ columnVector.putShort(rowId, value);
+ }
+
+ @Override public void putInt(int rowId, int value) {
+ columnVector.putInt(rowId, value);
+ }
+
+ @Override public void putLong(int rowId, long value) {
+ columnVector.putLong(rowId, value);
+ }
+
+ @Override public void putDecimal(int rowId, Decimal value, int precision) {
+ columnVector.putDecimal(rowId, value, precision);
+ }
+
+ @Override public void putDouble(int rowId, double value) {
+ columnVector.putDouble(rowId, value);
+ }
+
+ @Override public void putBytes(int rowId, byte[] value) {
+ columnVector.putByteArray(rowId, value);
+ }
+
+ @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+ columnVector.putByteArray(rowId, value, offset, length);
+ }
+
+ @Override public void putNull(int rowId) {
+ columnVector.putNull(rowId);
+ }
+
+ @Override public boolean isNull(int rowId) {
+ return columnVector.isNullAt(rowId);
+ }
+
+ @Override public void putObject(int rowId, Object obj) {
+ //TODO handle complex types
+ }
+
+ @Override public Object getData(int rowId) {
+ //TODO handle complex types
+ return null;
+ }
+
+ @Override public void reset() {
+// columnVector.reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
new file mode 100644
index 0000000..ba02bca
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.vectorreader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.scan.executor.QueryExecutor;
+import org.apache.carbondata.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.carbondata.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.spark.util.CarbonScalaUtil;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using
the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
+
+ private int batchIdx = 0;
+
+ private int numBatched = 0;
+
+ private ColumnarBatch columnarBatch;
+
+ private CarbonColumnarBatch carbonColumnarBatch;
+
+ /**
+ * If true, this class returns batches instead of rows.
+ */
+ private boolean returnColumnarBatch;
+
+ /**
+ * The default config on whether columnarBatch should be offheap.
+ */
+ private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+
+ private QueryModel queryModel;
+
+ private AbstractDetailQueryResultIterator iterator;
+
+ private QueryExecutor queryExecutor;
+
+ public VectorizedCarbonRecordReader(QueryModel queryModel) {
+ this.queryModel = queryModel;
+ enableReturningBatches();
+ }
+
+ /**
+ * Implementation of RecordReader API.
+ */
+ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException, UnsupportedOperationException {
+ // The input split can contain single HDFS block or multiple blocks, so firstly get all
the
+ // blocks and then set them in the query model.
+ List<CarbonInputSplit> splitList;
+ if (inputSplit instanceof CarbonInputSplit) {
+ splitList = new ArrayList<>(1);
+ splitList.add((CarbonInputSplit) inputSplit);
+ } else if (inputSplit instanceof CarbonMultiBlockSplit) {
+ // contains multiple blocks, this is an optimization for concurrent query.
+ CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
+ splitList = multiBlockSplit.getAllSplits();
+ } else {
+ throw new RuntimeException("unsupported input split type: " + inputSplit);
+ }
+ List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+ queryModel.setVectorReader(true);
+ try {
+ queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
+ } catch (QueryExecutionException e) {
+ throw new InterruptedException(e.getMessage());
+ }
+ }
+
+ @Override public void close() throws IOException {
+ if (columnarBatch != null) {
+ columnarBatch.close();
+ columnarBatch = null;
+ }
+ // clear dictionary cache
+ Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+ if (null != columnToDictionaryMapping) {
+ for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet())
{
+ CarbonUtil.clearDictionaryCache(entry.getValue());
+ }
+ }
+ try {
+ queryExecutor.finish();
+ } catch (QueryExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ resultBatch();
+
+ if (returnColumnarBatch) return nextBatch();
+
+ if (batchIdx >= numBatched) {
+ if (!nextBatch()) return false;
+ }
+ ++batchIdx;
+ return true;
+ }
+
+ @Override public Object getCurrentValue() throws IOException, InterruptedException {
+ if (returnColumnarBatch) return columnarBatch;
+ return columnarBatch.getRow(batchIdx - 1);
+ }
+
+ @Override public Void getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override public float getProgress() throws IOException, InterruptedException {
+ // TODO : Implement it based on total number of rows it is going to retrive.
+ return 0;
+ }
+
+ /**
+ * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+ * This object is reused. Calling this enables the vectorized reader. This should be called
+ * before any calls to nextKeyValue/nextBatch.
+ */
+
+ public void initBatch(MemoryMode memMode) {
+ List<QueryDimension> queryDimension = queryModel.getQueryDimension();
+ List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
+ StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
+ for (int i = 0; i < queryDimension.size(); i++) {
+ QueryDimension dim = queryDimension.get(i);
+ if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dim.getDimension().getDataType());
+ fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+ CarbonScalaUtil.convertCarbonToSparkDataType(generator.getReturnType()), true,
null);
+ } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+ fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+ CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()),
true,
+ null);
+ } else if (dim.getDimension().isComplex()) {
+ fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+ CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()),
true,
+ null);
+ } else {
+ fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+ CarbonScalaUtil.convertCarbonToSparkDataType(DataType.INT), true, null);
+ }
+ }
+
+ for (int i = 0; i < queryMeasures.size(); i++) {
+ QueryMeasure msr = queryMeasures.get(i);
+ switch (msr.getMeasure().getDataType()) {
+ case SHORT:
+ case INT:
+ case LONG:
+ fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+ CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()),
true,
+ null);
+ break;
+ case DECIMAL:
+ fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+ new DecimalType(msr.getMeasure().getPrecision(),
+ msr.getMeasure().getScale()), true, null);
+ break;
+ default:
+ fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+ CarbonScalaUtil.convertCarbonToSparkDataType(DataType.DOUBLE), true, null);
+ }
+ }
+
+ columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode);
+ CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i));
+ }
+ carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity());
+ }
+
+ private void initBatch() {
+ initBatch(DEFAULT_MEMORY_MODE);
+ }
+
+ private ColumnarBatch resultBatch() {
+ if (columnarBatch == null) initBatch();
+ return columnarBatch;
+ }
+
+ /*
+ * Can be called before any rows are returned to enable returning columnar batches directly.
+ */
+ public void enableReturningBatches() {
+ returnColumnarBatch = true;
+ }
+
+ /**
+ * Advances to the next batch of rows. Returns false if there are no more.
+ */
+ public boolean nextBatch() throws IOException {
+ columnarBatch.reset();
+ carbonColumnarBatch.reset();
+ if (iterator.hasNext()) {
+ iterator.processNextBatch(carbonColumnarBatch);
+ int actualSize = carbonColumnarBatch.getActualSize();
+ columnarBatch.setNumRows(actualSize);
+ numBatched = actualSize;
+ batchIdx = 0;
+ return true;
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 09a58ba..7087204 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.LoadTableByInsert
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
@@ -61,7 +62,7 @@ case class CarbonDatasourceHadoopRelation(
override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
- def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow]
= {
val filterExpression: Option[Expression] = filters.flatMap { filter =>
CarbonFilters.createCarbonFilter(schema, filter)
}.reduceOption(new AndExpression(_, _))
@@ -69,7 +70,7 @@ case class CarbonDatasourceHadoopRelation(
val projection = new CarbonProjection
requiredColumns.foreach(projection.addColumn)
- new CarbonScanRDD[Row](sqlContext.sparkContext, projection, filterExpression.orNull,
+ new CarbonScanRDD(sqlContext.sparkContext, projection, filterExpression.orNull,
absIdentifier, carbonTable)
}
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 9a625ee..ce5962d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -228,9 +228,9 @@ class CarbonDecoderRDD(
relations: Seq[CarbonDecoderRelation],
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation,
- prev: RDD[Row],
+ prev: RDD[InternalRow],
output: Seq[Attribute])
- extends RDD[Row](prev) {
+ extends RDD[InternalRow](prev) {
def canBeDecoded(attr: Attribute): Boolean = {
profile match {
@@ -296,7 +296,7 @@ class CarbonDecoderRDD(
dictIds
}
- override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
+ override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val storepath = CarbonEnv.get.carbonMetastore.storePath
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
@@ -319,15 +319,17 @@ class CarbonDecoderRDD(
}
}
)
- val iter = firstParent[Row].iterator(split, context)
- new Iterator[Row] {
+ val iter = firstParent[InternalRow].iterator(split, context)
+ new Iterator[InternalRow] {
var flag = true
var total = 0L
+ val dataTypes = output.map { attr => attr.dataType }
override final def hasNext: Boolean = iter.hasNext
- override final def next(): Row = {
+ override final def next(): InternalRow = {
val startTime = System.currentTimeMillis()
- val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray
+ val row: InternalRow = iter.next()
+ val data = row.toSeq(dataTypes).toArray
dictIndex.foreach { index =>
if ( data(index) != null) {
data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
@@ -335,7 +337,7 @@ class CarbonDecoderRDD(
getDictionaryColumnIds(index)._3)
}
}
- new GenericRow(data)
+ new GenericMutableRow(data)
}
}
}
@@ -365,5 +367,5 @@ class CarbonDecoderRDD(
dicts
}
- override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+ override protected def getPartitions: Array[Partition] = firstParent[InternalRow].partitions
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 7a8920f..5508a94 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.CatalystTypeConverters._
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -30,9 +31,12 @@ import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
/**
* Carbon strategy for late decode (convert dictionary key to value as late as possible),
which
@@ -67,8 +71,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
def getDecoderRDD(
logicalRelation: LogicalRelation,
projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
- rdd: RDD[Row],
- output: Seq[Attribute]): RDD[Row] = {
+ rdd: RDD[InternalRow],
+ output: Seq[Attribute]): RDD[InternalRow] = {
val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
@@ -87,19 +91,17 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
private[this] def toCatalystRDD(
relation: LogicalRelation,
output: Seq[Attribute],
- rdd: RDD[Row],
+ rdd: RDD[InternalRow],
needDecode: ArrayBuffer[AttributeReference]):
RDD[InternalRow] = {
- val newRdd = if (needDecode.size > 0) {
+ if (needDecode.size > 0) {
+ rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false)
getDecoderRDD(relation, needDecode, rdd, output)
} else {
+ rdd.asInstanceOf[CarbonScanRDD]
+ .setVectorReaderSupport(supportBatchedDataSource(relation.relation.sqlContext, output))
rdd
}
- if (relation.relation.needConversion) {
- execution.RDDConversions.rowToRowRdd(newRdd, output.map(_.dataType))
- } else {
- newRdd.asInstanceOf[RDD[InternalRow]]
- }
}
protected def pruneFilterProject(
@@ -212,29 +214,61 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
attr
}
- val scan = new execution.RowDataSourceScanExec(
+ val scan = getDataSourceScan(relation,
updateProject,
- scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
- // now carbon do not support partitioning, use UnknownPartitioning here, in future
if
- // we add bucket, we should change the partitioning
- relation.relation, UnknownPartitioning(0), metadata, None)
+ scanBuilder,
+ candidatePredicates,
+ pushedFilters,
+ metadata,
+ needDecoder,
+ updateRequestedColumns)
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
- val scan = new execution.RowDataSourceScanExec(
+ val scan = getDataSourceScan(relation,
updateRequestedColumns,
- scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
- // now carbon do not support partitioning, use UnknownPartitioning here, in future
if
- // we add bucket, we should change the partitioning
- relation.relation, UnknownPartitioning(0), metadata, None)
+ scanBuilder,
+ candidatePredicates,
+ pushedFilters,
+ metadata,
+ needDecoder,
+ updateRequestedColumns)
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
}
+ def getDataSourceScan(relation: LogicalRelation,
+ output: Seq[Attribute],
+ scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
+ ArrayBuffer[AttributeReference]) => RDD[InternalRow],
+ candidatePredicates: Seq[Expression],
+ pushedFilters: Seq[Filter],
+ metadata: Map[String, String],
+ needDecoder: ArrayBuffer[AttributeReference],
+ updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = {
+ if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
+ needDecoder.length == 0) {
+ BatchedDataSourceScanExec(
+ output,
+ scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+ relation.relation,
+ UnknownPartitioning(0),
+ metadata,
+ relation.metastoreTableIdentifier)
+ } else {
+ RowDataSourceScanExec(output,
+ scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+ relation.relation,
+ UnknownPartitioning(0),
+ metadata,
+ relation.metastoreTableIdentifier)
+ }
+ }
+
def updateRequestedColumnsFunc(requestedColumns: Seq[AttributeReference],
relation: CarbonDatasourceHadoopRelation,
needDecoder: ArrayBuffer[AttributeReference]): Seq[AttributeReference] = {
@@ -393,4 +427,17 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
case others => None
}
}
+
+ def supportBatchedDataSource(sqlContext: SQLContext, cols: Seq[Attribute]): Boolean = {
+ val enableReader = {
+ if (sqlContext.sparkSession.conf.contains(CarbonCommonConstants.ENABLE_VECTOR_READER))
{
+ sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER).toBoolean
+ } else {
+ System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+ CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT).toBoolean
+ }
+ }
+ sqlContext.conf.wholeStageEnabled && enableReader &&
+ cols.forall(_.dataType.isInstanceOf[AtomicType])
+ }
}
|