carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-2297] Support SEARCH_MODE for basic filter query
Date Thu, 05 Apr 2018 15:20:36 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 0311c439a -> 638ed1fa7


[CARBONDATA-2297] Support SEARCH_MODE for basic filter query

1. Add a new spark schedule type.
2.Add a new Query Executor

This closes #2123


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/638ed1fa
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/638ed1fa
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/638ed1fa

Branch: refs/heads/master
Commit: 638ed1fa7094d4a139a9a1cb08b123dfde31dd87
Parents: 0311c43
Author: Manhua <kevinjmh@qq.com>
Authored: Thu Mar 29 19:50:39 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Thu Apr 5 23:20:24 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  13 ++
 .../executor/impl/AbstractQueryExecutor.java    |   8 +-
 .../scan/executor/impl/DetailQueryExecutor.java |   2 +
 .../SearchModeVectorDetailQueryExecutor.java    |  67 ++++++++++
 .../impl/VectorDetailQueryExecutor.java         |   2 +
 .../core/scan/processor/BlockScan.java          |  98 ++++++++++++++
 .../AbstractDetailQueryResultIterator.java      |   6 +-
 .../iterator/SearchModeResultIterator.java      | 134 +++++++++++++++++++
 .../carbondata/core/util/SessionParams.java     |   2 +
 .../detailquery/SearchModeTestCase.scala        |  53 ++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  11 +-
 .../VectorizedCarbonRecordReader.java           |  11 +-
 12 files changed, 398 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 001ee72..aca317a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1629,6 +1629,19 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_SYSTEM_FOLDER_LOCATION = "carbon.system.folder.location";
 
+  @CarbonProperty
+  public static final String CARBON_SEARCH_MODE_ENABLE = "carbon.search.mode.enable";
+
+  public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";
+
+  /**
+   * Num of threads used in query executor when using search mode.
+   */
+  @CarbonProperty
+  public static final String CARBON_SEARCH_MODE_THREAD = "carbon.search.mode.thread";
+
+  public static final String CARBON_SEARCH_MODE_THREAD_DEFAULT = "3";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 676976a..f0f5bce 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -26,7 +26,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
@@ -100,6 +100,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
     queryProperties = new QueryExecutorProperties();
   }
 
+  public void setExecutorService(ExecutorService executorService) {
+    // add executor service for query execution
+    queryProperties.executorService = executorService;
+  }
   /**
    * Below method will be used to fill the executor properties based on query
    * model it will parse the query model and get the detail and fill it in
@@ -113,8 +117,6 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
         queryModel.getQueryId());
     LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
         .getCarbonTableIdentifier().getTableName());
-    // add executor service for query execution
-    queryProperties.executorService = Executors.newCachedThreadPool();
     // Initializing statistics list to record the query statistics
     // creating copy on write to handle concurrent scenario
     queryProperties.queryStatisticsRecorder =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
index 93d696b..46ef43d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.scan.executor.impl;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
@@ -36,6 +37,7 @@ public class DetailQueryExecutor extends AbstractQueryExecutor<RowBatch>
{
   @Override
   public CarbonIterator<RowBatch> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {
+    this.setExecutorService(Executors.newCachedThreadPool());
     List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
     this.queryIterator = new DetailQueryResultIterator(
         blockExecutionInfoList,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
new file mode 100644
index 0000000..a729966
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.executor.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Below class will be used to execute the detail query and returns columnar vectors.
+ */
+public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object>
{
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
+  private static ExecutorService executorService;
+
+  static {
+    int nThread;
+    try {
+      nThread = Integer.parseInt(CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_THREAD,
+                      CarbonCommonConstants.CARBON_SEARCH_MODE_THREAD_DEFAULT));
+    } catch (NumberFormatException e) {
+      nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_THREAD_DEFAULT);
+      LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " +
nThread);
+    }
+    executorService = Executors.newFixedThreadPool(nThread);
+  }
+
+  @Override
+  public CarbonIterator<Object> execute(QueryModel queryModel)
+      throws QueryExecutionException, IOException {
+    List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
+    this.queryIterator = new SearchModeResultIterator(
+        blockExecutionInfoList,
+        queryModel,
+        executorService
+    );
+    return this.queryIterator;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
index ad1a558..7787e4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.scan.executor.impl;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
@@ -33,6 +34,7 @@ public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object>
{
   @Override
   public CarbonIterator<Object> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {
+    this.setExecutorService(Executors.newCachedThreadPool());
     List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
     this.queryIterator = new VectorDetailQueryResultIterator(
         blockExecutionInfoList,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
new file mode 100644
index 0000000..eb41071
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.scan.collector.ResultCollectorFactory;
+import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+
+public class BlockScan {
+  private BlockExecutionInfo blockExecutionInfo;
+  private FileReader fileReader;
+  private BlockletScanner blockletScanner;
+  private BlockletIterator blockletIterator;
+  private ScannedResultCollector scannerResultAggregator;
+
+  private List<BlockletScannedResult> scannedResults = new LinkedList<>();
+  private int nextResultIndex = 0;
+  private BlockletScannedResult curResult;
+
+  public BlockScan(BlockExecutionInfo blockExecutionInfo, FileReader fileReader,
+      QueryStatisticsModel queryStatisticsModel) {
+    this.blockExecutionInfo = blockExecutionInfo;
+    this.fileReader = fileReader;
+    this.blockletIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+        blockExecutionInfo.getNumberOfBlockToScan());
+    if (blockExecutionInfo.getFilterExecuterTree() != null) {
+      blockletScanner = new BlockletFilterScanner(blockExecutionInfo, queryStatisticsModel);
+    } else {
+      blockletScanner = new BlockletFullScanner(blockExecutionInfo, queryStatisticsModel);
+    }
+    this.scannerResultAggregator =
+        ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo);
+  }
+
+  public void scan() throws Exception {
+    BlockletScannedResult blockletScannedResult = null;
+    while (blockletIterator.hasNext()) {
+      DataRefNode dataBlock = blockletIterator.next();
+      if (dataBlock.getColumnsMaxValue() == null || blockletScanner.isScanRequired(dataBlock))
{
+        RawBlockletColumnChunks rawBlockletColumnChunks =  RawBlockletColumnChunks.newInstance(
+            blockExecutionInfo.getTotalNumberDimensionToRead(),
+            blockExecutionInfo.getTotalNumberOfMeasureToRead(), fileReader, dataBlock);
+        blockletScanner.readBlocklet(rawBlockletColumnChunks);
+        blockletScannedResult = blockletScanner.scanBlocklet(rawBlockletColumnChunks);
+        if (blockletScannedResult != null && blockletScannedResult.hasNext()) {
+          scannedResults.add(blockletScannedResult);
+        }
+      }
+    }
+    fileReader.finish();
+  }
+
+  public boolean hasNext() {
+    if (curResult != null && curResult.hasNext()) {
+      return true;
+    } else {
+      if (null != curResult) {
+        curResult.freeMemory();
+      }
+      if (nextResultIndex < scannedResults.size()) {
+        curResult = scannedResults.get(nextResultIndex++);
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+    this.scannerResultAggregator.collectResultInColumnarBatch(curResult, columnarBatch);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 4e628fe..e8a61fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -62,11 +62,11 @@ public abstract class AbstractDetailQueryResultIterator<E> extends
CarbonIterato
   private static final Map<DeleteDeltaInfo, Object> deleteDeltaToLockObjectMap =
       new ConcurrentHashMap<>();
 
-  private ExecutorService execService;
+  protected ExecutorService execService;
   /**
    * execution info of the block
    */
-  private List<BlockExecutionInfo> blockExecutionInfos;
+  protected List<BlockExecutionInfo> blockExecutionInfos;
 
   /**
    * file reader which will be used to execute the query
@@ -78,7 +78,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends
CarbonIterato
   /**
    * QueryStatisticsRecorder
    */
-  private QueryStatisticsRecorder recorder;
+  protected QueryStatisticsRecorder recorder;
   /**
    * number of cores which can be used
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java
new file mode 100644
index 0000000..ae46242
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.result.iterator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.processor.BlockScan;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+
+public class SearchModeResultIterator extends AbstractDetailQueryResultIterator<Object>
{
+
+  private final Object lock = new Object();
+
+  private FileFactory.FileType fileType;
+  private List<Future<BlockScan>> taskSubmitList;
+  private BlockScan curBlockScan;
+  private int nextBlockScanIndex = 0;
+
+  public SearchModeResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
+      ExecutorService execService) {
+    super(infos, queryModel, execService);
+    this.fileType = FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getTablePath());
+    scanAll();
+  }
+
+  private void scanAll() {
+    taskSubmitList = new ArrayList<>(blockExecutionInfos.size());
+    for (final BlockExecutionInfo info: blockExecutionInfos) {
+      taskSubmitList.add(execService.submit(new Callable<BlockScan>() {
+
+        @Override
+        public BlockScan call() throws Exception {
+          BlockScan blockScan = new BlockScan(info, FileFactory.getFileHolder(fileType),
+              buildQueryStatiticsModel(recorder));
+          blockScan.scan();
+          return blockScan;
+        }
+      }));
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      while ((curBlockScan == null || !curBlockScan.hasNext()) &&
+              nextBlockScanIndex < taskSubmitList.size()) {
+        curBlockScan = taskSubmitList.get(nextBlockScanIndex++).get();
+      }
+      return curBlockScan != null && curBlockScan.hasNext();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Object next() {
+    throw new UnsupportedOperationException("call processNextBatch instead");
+  }
+
+  @Override
+  public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+    synchronized (lock) {
+      if (curBlockScan.hasNext()) {
+        curBlockScan.processNextBatch(columnarBatch);
+      }
+    }
+  }
+
+  private QueryStatisticsModel buildQueryStatiticsModel(QueryStatisticsRecorder recorder)
{
+    QueryStatisticsModel queryStatisticsModel = new QueryStatisticsModel();
+    queryStatisticsModel.setRecorder(recorder);
+    QueryStatistic queryStatisticTotalBlocklet = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, queryStatisticTotalBlocklet);
+    queryStatisticsModel.getRecorder().recordStatistics(queryStatisticTotalBlocklet);
+
+    QueryStatistic queryStatisticValidScanBlocklet = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, queryStatisticValidScanBlocklet);
+    queryStatisticsModel.getRecorder().recordStatistics(queryStatisticValidScanBlocklet);
+
+    QueryStatistic totalNumberOfPages = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.TOTAL_PAGE_SCANNED, totalNumberOfPages);
+    queryStatisticsModel.getRecorder().recordStatistics(totalNumberOfPages);
+
+    QueryStatistic validPages = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.VALID_PAGE_SCANNED, validPages);
+    queryStatisticsModel.getRecorder().recordStatistics(validPages);
+
+    QueryStatistic scannedPages = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.PAGE_SCANNED, scannedPages);
+    queryStatisticsModel.getRecorder().recordStatistics(scannedPages);
+
+    QueryStatistic scanTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, scanTime);
+    queryStatisticsModel.getRecorder().recordStatistics(scanTime);
+
+    QueryStatistic readTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.READ_BLOCKlET_TIME, readTime);
+    queryStatisticsModel.getRecorder().recordStatistics(readTime);
+    return queryStatisticsModel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 63af23a..58dc218 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
 import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION;
 import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE;
@@ -147,6 +148,7 @@ public class SessionParams implements Serializable, Cloneable {
       case CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE:
       case CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD:
       case CARBON_OPTIONS_SINGLE_PASS:
+      case CARBON_SEARCH_MODE_ENABLE:
         isValid = CarbonUtil.validateBoolean(value);
         if (!isValid) {
           throw new InvalidConfigurationException("Invalid value " + value + " for key "
+ key);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
new file mode 100644
index 0000000..0b8f92b
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.detailquery
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test Class for detailed query on multiple datatypes
+ */
+
+class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("CREATE TABLE alldatatypestable (empno int, empname String, designation String, doj
Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE alldatatypestable
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+
+    sql("CREATE TABLE alldatatypestable_hive (empno int, empname String, designation String,
doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,salary int)row format delimited fields terminated by ','")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO TABLE alldatatypestable_hive""")
+
+  }
+
+  test("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'")
{
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
"true")
+    checkAnswer(
+      sql("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'"),
+      sql("select empno,empname,utilization from alldatatypestable_hive where empname = 'ayushi'"))
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
+      CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT)
+  }
+
+  override def afterAll {
+    sql("drop table alldatatypestable")
+    sql("drop table alldatatypestable_hive")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index d34d009..256e43d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -206,7 +206,7 @@ class CarbonScanRDD(
 
     var statistic = new QueryStatistic()
     val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val parallelism = sparkContext.defaultParallelism
+    var parallelism = sparkContext.defaultParallelism
     val result = new ArrayList[Partition](parallelism)
     var noOfBlocks = 0
     var noOfNodes = 0
@@ -241,7 +241,14 @@ class CarbonScanRDD(
             CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
             "false").toBoolean ||
           carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM)
-        if (useCustomDistribution) {
+        val enableSearchMode = CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
+          CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).toBoolean
+        if (useCustomDistribution || enableSearchMode) {
+          if (enableSearchMode) {
+            // force to assign only one task contains multiple splits each node
+            parallelism = 0
+          }
           // create a list of block based on split
           val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 903bf44..eb71daa 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -35,12 +36,14 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.AbstractRecordReader;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
@@ -131,7 +134,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object>
{
     queryModel.setTableBlockInfos(tableBlockInfoList);
     queryModel.setVectorReader(true);
     try {
-      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+      if (CarbonProperties.getInstance().getProperty(
+              CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
+              CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).equals("true")) {
+        queryExecutor = new SearchModeVectorDetailQueryExecutor();
+      } else {
+        queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+      }
       iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
     } catch (QueryExecutionException e) {
       Throwable ext = e;


Mime
View raw message