carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [01/13] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability
Date Thu, 01 Feb 2018 02:27:51 GMT
Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 71c2d8ca4 -> 15b4e192e


http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 5d927df..73da878 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -35,8 +35,8 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
@@ -100,7 +100,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object>
{
   /**
    * Implementation of RecordReader API.
    */
-  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException, InterruptedException, UnsupportedOperationException {
     // The input split can contain single HDFS block or multiple blocks, so firstly get all
the
     // blocks and then set them in the query model.
@@ -145,7 +146,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object>
{
     }
   }
 
-  @Override public void close() throws IOException {
+  @Override
+  public void close() throws IOException {
     logStatistics(rowCount, queryModel.getStatisticsRecorder());
     if (columnarBatch != null) {
       columnarBatch.close();
@@ -165,10 +167,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object>
{
     }
   }
 
-  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
     resultBatch();
 
-    if (returnColumnarBatch) return nextBatch();
+    if (returnColumnarBatch) {
+      return nextBatch();
+    }
 
     if (batchIdx >= numBatched) {
       if (!nextBatch()) return false;
@@ -177,7 +182,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object>
{
     return true;
   }
 
-  @Override public Object getCurrentValue() throws IOException, InterruptedException {
+  @Override
+  public Object getCurrentValue() throws IOException, InterruptedException {
     if (returnColumnarBatch) {
       int value = columnarBatch.numValidRows();
       rowCount += value;
@@ -190,11 +196,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object>
{
     return columnarBatch.getRow(batchIdx - 1);
   }
 
-  @Override public Void getCurrentKey() throws IOException, InterruptedException {
+  @Override
+  public Void getCurrentKey() throws IOException, InterruptedException {
     return null;
   }
 
-  @Override public float getProgress() throws IOException, InterruptedException {
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
     // TODO : Implement it based on total number of rows it is going to retrive.
     return 0;
   }
@@ -206,44 +214,44 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object>
{
    */
 
   private void initBatch(MemoryMode memMode) {
-    List<QueryDimension> queryDimension = queryModel.getQueryDimension();
-    List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
+    List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
+    List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
     StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
     for (int i = 0; i < queryDimension.size(); i++) {
-      QueryDimension dim = queryDimension.get(i);
+      ProjectionDimension dim = queryDimension.get(i);
       if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
             .getDirectDictionaryGenerator(dim.getDimension().getDataType());
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(generator.getReturnType()), true,
null);
       } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()),
true,
             null);
       } else if (dim.getDimension().isComplex()) {
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()),
true,
             null);
       } else {
-        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.INT), true, null);
       }
     }
 
     for (int i = 0; i < queryMeasures.size(); i++) {
-      QueryMeasure msr = queryMeasures.get(i);
+      ProjectionMeasure msr = queryMeasures.get(i);
       DataType dataType = msr.getMeasure().getDataType();
       if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT ||
           dataType == DataTypes.INT || dataType == DataTypes.LONG) {
-        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()),
true,
             null);
       } else if (DataTypes.isDecimal(dataType)) {
-        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
             new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()),
true,
             null);
       } else {
-        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
       }
     }
@@ -261,9 +269,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object>
{
     initBatch(DEFAULT_MEMORY_MODE);
   }
 
-  private ColumnarBatch resultBatch() {
+  private void resultBatch() {
     if (columnarBatch == null) initBatch();
-    return columnarBatch;
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index f51ced3..6a401d8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -34,20 +34,16 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
 
 /**
  * Executor class for executing the query on the selected segments to be merged.
@@ -70,6 +66,9 @@ public class CarbonCompactionExecutor {
    */
   private boolean restructuredBlockExists;
 
+  // converter for UTF8String and decimal conversion
+  private DataTypeConverter dataTypeConverter;
+
   /**
    * Constructor
    *
@@ -82,13 +81,14 @@ public class CarbonCompactionExecutor {
   public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
       SegmentProperties segmentProperties, CarbonTable carbonTable,
       Map<String, List<DataFileFooter>> dataFileMetadataSegMapping,
-      boolean restructuredBlockExists) {
+      boolean restructuredBlockExists, DataTypeConverter dataTypeConverter) {
     this.segmentMapping = segmentMapping;
     this.destinationSegProperties = segmentProperties;
     this.carbonTable = carbonTable;
     this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
     this.restructuredBlockExists = restructuredBlockExists;
-    queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    this.queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    this.dataTypeConverter = dataTypeConverter;
   }
 
   /**
@@ -100,7 +100,9 @@ public class CarbonCompactionExecutor {
     List<RawResultIterator> resultList =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<TableBlockInfo> list = null;
-    queryModel = prepareQueryModel(list);
+    queryModel = carbonTable.createQueryModelWithProjectAllColumns(dataTypeConverter);
+    queryModel.setReadPageByPage(enablePageLevelReaderForCompaction());
+    queryModel.setForcedDetailRawQuery(true);
     // iterate each seg ID
     for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
       String segmentId = taskMap.getKey();
@@ -156,7 +158,7 @@ public class CarbonCompactionExecutor {
    * @param blockList
    * @return
    */
-  private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+  private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList)
       throws QueryExecutionException, IOException {
     queryModel.setTableBlockInfos(blockList);
     QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
@@ -195,48 +197,6 @@ public class CarbonCompactionExecutor {
   }
 
   /**
-   * Preparing of the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
-    QueryModel model = new QueryModel();
-    model.setTableBlockInfos(blockList);
-    model.setForcedDetailRawQuery(true);
-    model.setFilterExpressionResolverTree(null);
-    model.setConverter(DataTypeUtil.getDataTypeConverter());
-    model.setReadPageByPage(enablePageLevelReaderForCompaction());
-
-    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getTableName());
-    for (CarbonDimension dim : dimensions) {
-      // check if dimension is deleted
-      QueryDimension queryDimension = new QueryDimension(dim.getColName());
-      queryDimension.setDimension(dim);
-      dims.add(queryDimension);
-    }
-    model.setQueryDimension(dims);
-
-    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getTableName());
-    for (CarbonMeasure carbonMeasure : measures) {
-      // check if measure is deleted
-      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
-      queryMeasure.setMeasure(carbonMeasure);
-      msrs.add(queryMeasure);
-    }
-    model.setQueryMeasures(msrs);
-    model.setQueryId(System.nanoTime() + "");
-    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
-    model.setTable(carbonTable);
-    return model;
-  }
-
-  /**
    * Whether to enable page level reader for compaction or not.
    */
   private boolean enablePageLevelReaderForCompaction() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
index 79e9e5a..b6f12a5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
@@ -23,7 +23,6 @@ import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.processing.partition.DataPartitioner;
 import org.apache.carbondata.processing.partition.Partition;
 
@@ -46,9 +45,8 @@ public final class QueryPartitionHelper {
   /**
    * Get partitions applicable for query based on filters applied in query
    */
-  public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
-    String tableUniqueName =
-        CarbonTable.buildUniqueName(queryPlan.getDatabaseName(), queryPlan.getTableName());
+  public List<Partition> getPartitionsForQuery(String databaseName, String tableName)
{
+    String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
 
     DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
index 36e022b..01db4f6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.processing.partition.spliter;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -26,19 +25,14 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 public abstract class AbstractCarbonQueryExecutor {
@@ -47,8 +41,8 @@ public abstract class AbstractCarbonQueryExecutor {
       LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName());
   protected CarbonTable carbonTable;
   protected QueryModel queryModel;
-  protected QueryExecutor queryExecutor;
-  protected Map<String, TaskBlockInfo> segmentMapping;
+  private QueryExecutor queryExecutor;
+  Map<String, TaskBlockInfo> segmentMapping;
 
   /**
    * get executor and execute the query model.
@@ -56,7 +50,7 @@ public abstract class AbstractCarbonQueryExecutor {
    * @param blockList
    * @return
    */
-  protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo>
blockList)
+  CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList)
       throws QueryExecutionException, IOException {
     queryModel.setTableBlockInfos(blockList);
     this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
@@ -64,46 +58,6 @@ public abstract class AbstractCarbonQueryExecutor {
   }
 
   /**
-   * Preparing of the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
-    QueryModel model = new QueryModel();
-    model.setTableBlockInfos(blockList);
-    model.setForcedDetailRawQuery(true);
-    model.setFilterExpressionResolverTree(null);
-
-    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getTableName());
-    for (CarbonDimension dim : dimensions) {
-      // check if dimension is deleted
-      QueryDimension queryDimension = new QueryDimension(dim.getColName());
-      queryDimension.setDimension(dim);
-      dims.add(queryDimension);
-    }
-    model.setQueryDimension(dims);
-
-    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getTableName());
-    for (CarbonMeasure carbonMeasure : measures) {
-      // check if measure is deleted
-      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
-      queryMeasure.setMeasure(carbonMeasure);
-      msrs.add(queryMeasure);
-    }
-    model.setQueryMeasures(msrs);
-    model.setQueryId(System.nanoTime() + "");
-    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
-    model.setTable(carbonTable);
-    return model;
-  }
-
-  /**
    * Below method will be used
    * for cleanup
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
index 6afec0b..b18207d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
 
 /**
  * Used to read carbon blocks when add/split partition
@@ -48,7 +49,8 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
   public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId)
       throws QueryExecutionException, IOException {
     List<TableBlockInfo> list = null;
-    queryModel = prepareQueryModel(list);
+    queryModel = carbonTable.createQueryModelWithProjectAllColumns(new DataTypeConverterImpl());
+    queryModel.setForcedDetailRawQuery(true);
     List<PartitionSpliterRawResultIterator> resultList
         = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
index ec91472..4abdf3c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryProjection;
 import org.apache.carbondata.processing.partition.Partition;
 import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer;
 import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl;
@@ -46,7 +46,7 @@ public class CarbonQueryUtil {
    * It creates the one split for each region server.
    */
   public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
-      CarbonQueryPlan queryPlan) {
+      QueryProjection queryPlan) {
 
     //Just create splits depends on locations of region servers
     List<Partition> allPartitions = null;
@@ -55,7 +55,7 @@ public class CarbonQueryUtil {
           QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
     } else {
       allPartitions =
-          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
+          QueryPartitionHelper.getInstance().getPartitionsForQuery(databaseName, tableName);
     }
     TableSplit[] splits = new TableSplit[allPartitions.size()];
     for (int i = 0; i < splits.length; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 36a5a15..197cb14 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -150,7 +150,7 @@ class StreamHandoffRDD[K, V](
     CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
     val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
     val format = new CarbonTableInputFormat[Array[Object]]()
-    val model = format.getQueryModel(inputSplit, attemptContext)
+    val model = format.createQueryModel(inputSplit, attemptContext)
     val inputFormat = new CarbonStreamInputFormat
     val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
       .asInstanceOf[CarbonStreamRecordReader]


Mime
View raw message