carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [01/54] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability
Date Thu, 08 Mar 2018 16:55:02 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 3fb406618 -> 39fa1eb58


http://git-wip-us.apache.org/repos/asf/carbondata/blob/daa64650/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index f51ced3..6a401d8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -34,20 +34,16 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
 
 /**
  * Executor class for executing the query on the selected segments to be merged.
@@ -70,6 +66,9 @@ public class CarbonCompactionExecutor {
    */
   private boolean restructuredBlockExists;
 
+  // converter for UTF8String and decimal conversion
+  private DataTypeConverter dataTypeConverter;
+
   /**
    * Constructor
    *
@@ -82,13 +81,14 @@ public class CarbonCompactionExecutor {
   public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
       SegmentProperties segmentProperties, CarbonTable carbonTable,
       Map<String, List<DataFileFooter>> dataFileMetadataSegMapping,
-      boolean restructuredBlockExists) {
+      boolean restructuredBlockExists, DataTypeConverter dataTypeConverter) {
     this.segmentMapping = segmentMapping;
     this.destinationSegProperties = segmentProperties;
     this.carbonTable = carbonTable;
     this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
     this.restructuredBlockExists = restructuredBlockExists;
-    queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    this.queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    this.dataTypeConverter = dataTypeConverter;
   }
 
   /**
@@ -100,7 +100,9 @@ public class CarbonCompactionExecutor {
     List<RawResultIterator> resultList =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<TableBlockInfo> list = null;
-    queryModel = prepareQueryModel(list);
+    queryModel = carbonTable.createQueryModelWithProjectAllColumns(dataTypeConverter);
+    queryModel.setReadPageByPage(enablePageLevelReaderForCompaction());
+    queryModel.setForcedDetailRawQuery(true);
     // iterate each seg ID
     for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
       String segmentId = taskMap.getKey();
@@ -156,7 +158,7 @@ public class CarbonCompactionExecutor {
    * @param blockList
    * @return
    */
-  private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+  private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList)
       throws QueryExecutionException, IOException {
     queryModel.setTableBlockInfos(blockList);
     QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
@@ -195,48 +197,6 @@ public class CarbonCompactionExecutor {
   }
 
   /**
-   * Preparing of the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
-    QueryModel model = new QueryModel();
-    model.setTableBlockInfos(blockList);
-    model.setForcedDetailRawQuery(true);
-    model.setFilterExpressionResolverTree(null);
-    model.setConverter(DataTypeUtil.getDataTypeConverter());
-    model.setReadPageByPage(enablePageLevelReaderForCompaction());
-
-    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getTableName());
-    for (CarbonDimension dim : dimensions) {
-      // check if dimension is deleted
-      QueryDimension queryDimension = new QueryDimension(dim.getColName());
-      queryDimension.setDimension(dim);
-      dims.add(queryDimension);
-    }
-    model.setQueryDimension(dims);
-
-    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getTableName());
-    for (CarbonMeasure carbonMeasure : measures) {
-      // check if measure is deleted
-      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
-      queryMeasure.setMeasure(carbonMeasure);
-      msrs.add(queryMeasure);
-    }
-    model.setQueryMeasures(msrs);
-    model.setQueryId(System.nanoTime() + "");
-    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
-    model.setTable(carbonTable);
-    return model;
-  }
-
-  /**
    * Whether to enable page level reader for compaction or not.
    */
   private boolean enablePageLevelReaderForCompaction() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/daa64650/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
index 79e9e5a..b6f12a5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
@@ -23,7 +23,6 @@ import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.processing.partition.DataPartitioner;
 import org.apache.carbondata.processing.partition.Partition;
 
@@ -46,9 +45,8 @@ public final class QueryPartitionHelper {
   /**
    * Get partitions applicable for query based on filters applied in query
    */
-  public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
-    String tableUniqueName =
-        CarbonTable.buildUniqueName(queryPlan.getDatabaseName(), queryPlan.getTableName());
+  public List<Partition> getPartitionsForQuery(String databaseName, String tableName)
{
+    String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
 
     DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/daa64650/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
index 36e022b..01db4f6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.processing.partition.spliter;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -26,19 +25,14 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 public abstract class AbstractCarbonQueryExecutor {
@@ -47,8 +41,8 @@ public abstract class AbstractCarbonQueryExecutor {
       LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName());
   protected CarbonTable carbonTable;
   protected QueryModel queryModel;
-  protected QueryExecutor queryExecutor;
-  protected Map<String, TaskBlockInfo> segmentMapping;
+  private QueryExecutor queryExecutor;
+  Map<String, TaskBlockInfo> segmentMapping;
 
   /**
    * get executor and execute the query model.
@@ -56,7 +50,7 @@ public abstract class AbstractCarbonQueryExecutor {
    * @param blockList
    * @return
    */
-  protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo>
blockList)
+  CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList)
       throws QueryExecutionException, IOException {
     queryModel.setTableBlockInfos(blockList);
     this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
@@ -64,46 +58,6 @@ public abstract class AbstractCarbonQueryExecutor {
   }
 
   /**
-   * Preparing of the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
-    QueryModel model = new QueryModel();
-    model.setTableBlockInfos(blockList);
-    model.setForcedDetailRawQuery(true);
-    model.setFilterExpressionResolverTree(null);
-
-    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getTableName());
-    for (CarbonDimension dim : dimensions) {
-      // check if dimension is deleted
-      QueryDimension queryDimension = new QueryDimension(dim.getColName());
-      queryDimension.setDimension(dim);
-      dims.add(queryDimension);
-    }
-    model.setQueryDimension(dims);
-
-    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getTableName());
-    for (CarbonMeasure carbonMeasure : measures) {
-      // check if measure is deleted
-      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
-      queryMeasure.setMeasure(carbonMeasure);
-      msrs.add(queryMeasure);
-    }
-    model.setQueryMeasures(msrs);
-    model.setQueryId(System.nanoTime() + "");
-    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
-    model.setTable(carbonTable);
-    return model;
-  }
-
-  /**
    * Below method will be used
    * for cleanup
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/daa64650/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
index 6afec0b..b18207d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
 
 /**
  * Used to read carbon blocks when add/split partition
@@ -48,7 +49,8 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
   public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId)
       throws QueryExecutionException, IOException {
     List<TableBlockInfo> list = null;
-    queryModel = prepareQueryModel(list);
+    queryModel = carbonTable.createQueryModelWithProjectAllColumns(new DataTypeConverterImpl());
+    queryModel.setForcedDetailRawQuery(true);
     List<PartitionSpliterRawResultIterator> resultList
         = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/daa64650/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
index ec91472..4abdf3c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryProjection;
 import org.apache.carbondata.processing.partition.Partition;
 import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer;
 import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl;
@@ -46,7 +46,7 @@ public class CarbonQueryUtil {
    * It creates the one split for each region server.
    */
   public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
-      CarbonQueryPlan queryPlan) {
+      QueryProjection queryPlan) {
 
     //Just create splits depends on locations of region servers
     List<Partition> allPartitions = null;
@@ -55,7 +55,7 @@ public class CarbonQueryUtil {
           QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
     } else {
       allPartitions =
-          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
+          QueryPartitionHelper.getInstance().getPartitionsForQuery(databaseName, tableName);
     }
     TableSplit[] splits = new TableSplit[allPartitions.size()];
     for (int i = 0; i < splits.length; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/daa64650/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 5e0fce5..578f0cd 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -157,7 +157,7 @@ class StreamHandoffRDD[K, V](
     CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
     val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
     val format = new CarbonTableInputFormat[Array[Object]]()
-    val model = format.getQueryModel(inputSplit, attemptContext)
+    val model = format.createQueryModel(inputSplit, attemptContext)
     val inputFormat = new CarbonStreamInputFormat
     val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
       .asInstanceOf[CarbonStreamRecordReader]


Mime
View raw message