carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [33/52] [partial] incubator-carbondata git commit: Renamed packages to org.apache.carbondata and fixed errors
Date Mon, 15 Aug 2016 07:09:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfo.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfo.java
new file mode 100644
index 0000000..77bcef5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.util.List;
+
+/**
+ * Model to hold the sortIndex and sortIndexInverted data
+ */
+public class CarbonDictionarySortInfo {
+  /**
+   * Sort index after members are sorted
+   */
+  private List<Integer> sortIndex;
+  /**
+   * inverted sort index to get the member
+   */
+  private List<Integer> sortIndexInverted;
+
+  /**
+   * The constructor to instantiate the CarbonDictionarySortInfo object
+   * with sortIndex and sortInverted Index data
+   *
+   * @param sortIndex
+   * @param sortIndexInverted
+   */
+  public CarbonDictionarySortInfo(List<Integer> sortIndex, List<Integer> sortIndexInverted) {
+    this.sortIndex = sortIndex;
+    this.sortIndexInverted = sortIndexInverted;
+  }
+
+  /**
+   * return list of sortIndex
+   *
+   * @return
+   */
+  public List<Integer> getSortIndex() {
+    return sortIndex;
+  }
+
+  /**
+   * returns list of sortindexinverted
+   *
+   * @return
+   */
+  public List<Integer> getSortIndexInverted() {
+    return sortIndexInverted;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
new file mode 100644
index 0000000..3a7f0f1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryChunksWrapper;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtilException;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * The class prepares the column sort info ie sortIndex
+ * and inverted sort index info
+ */
+public class CarbonDictionarySortInfoPreparator {
+
+  /**
+   * The method returns the column Sort Info
+   *
+   * @param newDistinctValues new distinct value to be added
+   * @param dictionary        old distinct values
+   * @param dataType          DataType of columns
+   * @return CarbonDictionarySortInfo returns the column Sort Info
+   * @throws CarbonUtilException
+   */
+  public CarbonDictionarySortInfo getDictionarySortInfo(List<String> newDistinctValues,
+      Dictionary dictionary, DataType dataType) throws CarbonUtilException {
+    CarbonDictionarySortModel[] dictionarySortModels =
+        prepareDictionarySortModels(newDistinctValues, dictionary, dataType);
+    return createColumnSortInfo(dictionarySortModels);
+  }
+
+  /**
+   * The method prepares the sort_index and sort_index_inverted data
+   *
+   * @param dictionarySortModels
+   */
+  private CarbonDictionarySortInfo createColumnSortInfo(
+      CarbonDictionarySortModel[] dictionarySortModels) {
+
+    //Sort index after members are sorted
+    int[] sortIndex;
+    //inverted sort index to get the member
+    int[] sortIndexInverted;
+
+    Arrays.sort(dictionarySortModels);
+    sortIndex = new int[dictionarySortModels.length];
+    sortIndexInverted = new int[dictionarySortModels.length];
+
+    for (int i = 0; i < dictionarySortModels.length; i++) {
+      CarbonDictionarySortModel dictionarySortModel = dictionarySortModels[i];
+      sortIndex[i] = dictionarySortModel.getKey();
+      // the array index starts from 0 therefore -1 is done to avoid wastage
+      // of 0th index in array and surrogate key starts from 1 there 1 is added to i
+      // which is a counter starting from 0
+      sortIndexInverted[dictionarySortModel.getKey() - 1] = i + 1;
+    }
+    dictionarySortModels = null;
+    List<Integer> sortIndexList = convertToList(sortIndex);
+    List<Integer> sortIndexInvertedList = convertToList(sortIndexInverted);
+    return new CarbonDictionarySortInfo(sortIndexList, sortIndexInvertedList);
+  }
+
+  /**
+   * The method converts the int[] to List<Integer>
+   *
+   * @param data
+   * @return
+   */
+  private List<Integer> convertToList(int[] data) {
+    Integer[] wrapperType = ArrayUtils.toObject(data);
+    return Arrays.asList(wrapperType);
+  }
+
+  /**
+   * The method returns the array of CarbonDictionarySortModel
+   *
+   * @param distinctValues new distinct values
+   * @param dictionary The wrapper wraps the list<list<bye[]>> and provide the
+   *                   iterator to retrieve the chunks members.
+   * @param dataType   DataType of columns
+   * @return CarbonDictionarySortModel[] CarbonDictionarySortModel[] the model
+   * CarbonDictionarySortModel contains the  member's surrogate and
+   * its byte value
+   */
+  private CarbonDictionarySortModel[] prepareDictionarySortModels(List<String> distinctValues,
+      Dictionary dictionary, DataType dataType) {
+    CarbonDictionarySortModel[] dictionarySortModels = null;
+    //The wrapper wraps the list<list<bye[]>> and provide the iterator to
+    // retrieve the chunks members.
+    int surrogate = 1;
+    if (null != dictionary) {
+      DictionaryChunksWrapper dictionaryChunksWrapper = dictionary.getDictionaryChunks();
+      dictionarySortModels =
+          new CarbonDictionarySortModel[dictionaryChunksWrapper.getSize() + distinctValues.size()];
+      while (dictionaryChunksWrapper.hasNext()) {
+        dictionarySortModels[surrogate - 1] =
+            createDictionarySortModel(surrogate, dataType, dictionaryChunksWrapper.next());
+        surrogate++;
+      }
+    } else {
+      dictionarySortModels = new CarbonDictionarySortModel[distinctValues.size()];
+    }
+    // for new distinct values
+    Iterator<String> distinctValue = distinctValues.iterator();
+    while (distinctValue.hasNext()) {
+      dictionarySortModels[surrogate - 1] =
+          createDictionarySortModel(surrogate, dataType, distinctValue.next().getBytes());
+      surrogate++;
+    }
+    return dictionarySortModels;
+  }
+
+  /**
+   *
+   * @param surrogate
+   * @param dataType
+   * @param value member value
+   * @return CarbonDictionarySortModel
+   */
+  private CarbonDictionarySortModel createDictionarySortModel(int surrogate, DataType dataType,
+      byte[] value) {
+    String memberValue = new String(value, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+    return new CarbonDictionarySortModel(surrogate, dataType, memberValue);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
new file mode 100644
index 0000000..0d3040a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Dictionary sort model class holds the member byte value and corresponding key value.
+ */
+public class CarbonDictionarySortModel implements Comparable<CarbonDictionarySortModel> {
+
+  /**
+   * Surrogate key
+   */
+  private int key;
+
+  /**
+   * member value in bytes
+   */
+  private String memberValue;
+
+  /**
+   * member dataType
+   */
+  private DataType dataType;
+
+  /**
+   * Constructor to init the dictionary sort model
+   *
+   * @param key
+   * @param dataType
+   * @param memberValue
+   */
+  public CarbonDictionarySortModel(int key, DataType dataType, String memberValue) {
+    this.key = key;
+    this.dataType = dataType;
+    this.memberValue = memberValue;
+  }
+
+  /**
+   * Compare
+   */
+  @Override public int compareTo(CarbonDictionarySortModel o) {
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+      case DOUBLE:
+
+        Double d1 = null;
+        Double d2 = null;
+        try {
+          d1 = new Double(memberValue);
+        } catch (NumberFormatException e) {
+          if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(o.memberValue)) {
+            return -1;
+          }
+          return 1;
+        }
+        try {
+          d2 = new Double(o.memberValue);
+        } catch (NumberFormatException e) {
+          return -1;
+        }
+        return d1.compareTo(d2);
+      case DECIMAL:
+        java.math.BigDecimal val1 = null;
+        java.math.BigDecimal val2 = null;
+        try {
+          val1 = new java.math.BigDecimal(memberValue);
+        } catch (NumberFormatException e) {
+          if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(o.memberValue)) {
+            return -1;
+          }
+          return 1;
+        }
+        try {
+          val2 = new java.math.BigDecimal(o.memberValue);
+        } catch (NumberFormatException e) {
+          return -1;
+        }
+        return val1.compareTo(val2);
+      case TIMESTAMP:
+        SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+                CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+        Date date1 = null;
+        Date date2 = null;
+        try {
+          date1 = parser.parse(memberValue);
+        } catch (ParseException e) {
+          if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(o.memberValue)) {
+            return -1;
+          }
+          return 1;
+        }
+        try {
+          date2 = parser.parse(o.memberValue);
+        } catch (ParseException e) {
+          return -1;
+        }
+        return date1.compareTo(date2);
+      case STRING:
+      default:
+        return this.memberValue.compareTo(o.memberValue);
+    }
+  }
+
+  /**
+   * @see Object#hashCode()
+   */
+  @Override public int hashCode() {
+    int result = ((memberValue == null) ? 0 : memberValue.hashCode());
+    return result;
+  }
+
+  /**
+   * @see Object#equals(Object)
+   */
+  @Override public boolean equals(Object obj) {
+    if (obj instanceof CarbonDictionarySortModel) {
+      if (this == obj) {
+        return true;
+      }
+      CarbonDictionarySortModel other = (CarbonDictionarySortModel) obj;
+      if (memberValue == null) {
+        if (other.memberValue != null) {
+          return false;
+        }
+      } else if (!this.memberValue.equals(other.memberValue)) {
+        return false;
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * return the surrogate of the member
+   *
+   * @return
+   */
+  public int getKey() {
+    return key;
+  }
+
+  /**
+   * Returns member buye
+   *
+   * @return
+   */
+  public String getMemberValue() {
+    return memberValue;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
new file mode 100644
index 0000000..dce6ae5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.collector;
+
+import java.util.List;
+
+import org.apache.carbondata.scan.result.AbstractScannedResult;
+
+/**
+ * Interface which will be used to aggregate the scan result
+ */
+public interface ScannedResultCollector {
+
+  /**
+   * Below method will be used to aggregate the scanned result
+   *
+   * @param scannedResult scanned result
+   * @return how many records was aggregated
+   */
+  List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
new file mode 100644
index 0000000..387b54f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.collector.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.executor.util.QueryUtil;
+import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public abstract class AbstractScannedResultCollector implements ScannedResultCollector {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractScannedResultCollector.class.getName());
+
+  /**
+   * restructuring info
+   */
+  private KeyStructureInfo restructureInfos;
+
+  /**
+   * table block execution infos
+   */
+  protected BlockExecutionInfo tableBlockExecutionInfos;
+
+  /**
+   * Measure ordinals
+   */
+  protected int[] measuresOrdinal;
+
+  /**
+   * to check whether measure exists in current table block or not this to
+   * handle restructuring scenario
+   */
+  protected boolean[] isMeasureExistsInCurrentBlock;
+
+  /**
+   * default value of the measures in case of restructuring some measure wont
+   * be present in the table so in that default value will be used to
+   * aggregate the data for that measure columns
+   */
+  private Object[] measureDefaultValue;
+
+  /**
+   * measure datatypes.
+   */
+  protected DataType[] measureDatatypes;
+
+  public AbstractScannedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+    this.tableBlockExecutionInfos = blockExecutionInfos;
+    restructureInfos = blockExecutionInfos.getKeyStructureInfo();
+    measuresOrdinal = tableBlockExecutionInfos.getAggregatorInfo().getMeasureOrdinals();
+    isMeasureExistsInCurrentBlock = tableBlockExecutionInfos.getAggregatorInfo().getMeasureExists();
+    measureDefaultValue = tableBlockExecutionInfos.getAggregatorInfo().getDefaultValues();
+    this.measureDatatypes = tableBlockExecutionInfos.getAggregatorInfo().getMeasureDataTypes();
+  }
+
+  protected void fillMeasureData(Object[] msrValues, int offset,
+      AbstractScannedResult scannedResult) {
+    for (short i = 0; i < measuresOrdinal.length; i++) {
+      // if measure exists is block then pass measure column
+      // data chunk to the collector
+      if (isMeasureExistsInCurrentBlock[i]) {
+        msrValues[i + offset] = getMeasureData(scannedResult.getMeasureChunk(measuresOrdinal[i]),
+            scannedResult.getCurrenrRowId(), measureDatatypes[i]);
+      } else {
+        // if not then get the default value and use that value in aggregation
+        msrValues[i + offset] = measureDefaultValue[i];
+      }
+    }
+  }
+
+  private Object getMeasureData(MeasureColumnDataChunk dataChunk, int index, DataType dataType) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      Object msrVal;
+      switch (dataType) {
+        case INT:
+        case LONG:
+          msrVal = dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+          break;
+        case DECIMAL:
+          msrVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+          break;
+        default:
+          msrVal = dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+      }
+      return DataTypeUtil.getMeasureDataBasedOnDataType(msrVal, dataType);
+    }
+    return null;
+  }
+
+  /**
+   * Below method will used to get the result
+   */
+  protected void updateData(List<Object[]> listBasedResult) {
+    if (tableBlockExecutionInfos.isFixedKeyUpdateRequired()) {
+      updateKeyWithLatestBlockKeygenerator(listBasedResult);
+    }
+  }
+
+  /**
+   * Below method will be used to update the fixed length key with the
+   * latest block key generator
+   *
+   * @return updated block
+   */
+  private void updateKeyWithLatestBlockKeygenerator(List<Object[]> listBasedResult) {
+    try {
+      long[] data = null;
+      ByteArrayWrapper key = null;
+      for (int i = 0; i < listBasedResult.size(); i++) {
+        // get the key
+        key = (ByteArrayWrapper)listBasedResult.get(i)[0];
+        // unpack the key with table block key generator
+        data = tableBlockExecutionInfos.getBlockKeyGenerator()
+            .getKeyArray(key.getDictionaryKey(), tableBlockExecutionInfos.getMaskedByteForBlock());
+        // packed the key with latest block key generator
+        // and generate the masked key for that key
+        key.setDictionaryKey(QueryUtil
+            .getMaskedKey(restructureInfos.getKeyGenerator().generateKey(data),
+                restructureInfos.getMaxKey(), restructureInfos.getMaskByteRanges(),
+                restructureInfos.getMaskByteRanges().length));
+      }
+    } catch (KeyGenException e) {
+      LOGGER.error(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
new file mode 100644
index 0000000..108677f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.collector.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.result.AbstractScannedResult;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public class DictionaryBasedResultCollector extends AbstractScannedResultCollector {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DictionaryBasedResultCollector.class.getName());
+
+  public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+    super(blockExecutionInfos);
+  }
+
+  /**
+   * This method will add a record both key and value to list object
+   * it will keep track of how many record is processed, to handle limit scenario
+   */
+  @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
+    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
+    boolean isMsrsPresent = measureDatatypes.length > 0;
+    QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+    Map<Integer, GenericQueryType> comlexDimensionInfoMap =
+        tableBlockExecutionInfos.getComlexDimensionInfoMap();
+    boolean[] dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
+    boolean[] directDictionaryEncodingArray =
+        CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
+    boolean[] complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
+    int dimSize = queryDimensions.length;
+    boolean isDimensionsExist = dimSize > 0;
+    int[] order = new int[dimSize + queryMeasures.length];
+    for (int i = 0; i < dimSize; i++) {
+      order[i] = queryDimensions[i].getQueryOrder();
+    }
+    for (int i = 0; i < queryMeasures.length; i++) {
+      order[i + dimSize] = queryMeasures[i].getQueryOrder();
+    }
+    // scan the record and add to list
+    int rowCounter = 0;
+    int dictionaryColumnIndex = 0;
+    int noDictionaryColumnIndex = 0;
+    int complexTypeColumnIndex = 0;
+    int[] surrogateResult;
+    String[] noDictionaryKeys;
+    byte[][] complexTypeKeyArray;
+    while (scannedResult.hasNext() && rowCounter < batchSize) {
+      Object[] row = new Object[dimSize + queryMeasures.length];
+      if (isDimensionsExist) {
+        surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
+        noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray();
+        complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
+        dictionaryColumnIndex = 0;
+        noDictionaryColumnIndex = 0;
+        complexTypeColumnIndex = 0;
+        for (int i = 0; i < dimSize; i++) {
+          if (!dictionaryEncodingArray[i]) {
+            row[order[i]] = DataTypeUtil
+                .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
+                    queryDimensions[i].getDimension().getDataType());
+          } else if (directDictionaryEncodingArray[i]) {
+            DirectDictionaryGenerator directDictionaryGenerator =
+                DirectDictionaryKeyGeneratorFactory
+                    .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
+            if (directDictionaryGenerator != null) {
+              row[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
+                  surrogateResult[dictionaryColumnIndex++]);
+            }
+          } else if (complexDataTypeArray[i]) {
+            row[order[i]] = comlexDimensionInfoMap
+                .get(queryDimensions[i].getDimension().getOrdinal())
+                .getDataBasedOnDataTypeFromSurrogates(
+                    ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+          } else {
+            row[order[i]] = surrogateResult[dictionaryColumnIndex++];
+          }
+        }
+
+      } else {
+        scannedResult.incrementCounter();
+      }
+      if (isMsrsPresent) {
+        Object[] msrValues = new Object[measureDatatypes.length];
+        fillMeasureData(msrValues, 0, scannedResult);
+        for (int i = 0; i < msrValues.length; i++) {
+          row[order[i + dimSize]] = msrValues[i];
+        }
+      }
+      listBasedResult.add(row);
+      rowCounter++;
+    }
+    return listBasedResult;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/RawBasedResultCollector.java
new file mode 100644
index 0000000..74d4170
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/RawBasedResultCollector.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.collector.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public class RawBasedResultCollector extends AbstractScannedResultCollector {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RawBasedResultCollector.class.getName());
+
+  public RawBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+    super(blockExecutionInfos);
+  }
+
+  /**
+   * This method will add a record both key and value to list object
+   * it will keep track of how many record is processed, to handle limit scenario
+   */
+  @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
+    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
+    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+    ByteArrayWrapper wrapper = null;
+    // scan the record and add to list
+    int rowCounter = 0;
+    while (scannedResult.hasNext() && rowCounter < batchSize) {
+      Object[] row = new Object[1 + queryMeasures.length];
+      wrapper = new ByteArrayWrapper();
+      wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray());
+      wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray());
+      wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray());
+      row[0] = wrapper;
+      fillMeasureData(row, 1, scannedResult);
+      listBasedResult.add(row);
+      rowCounter++;
+    }
+    updateData(listBasedResult);
+    return listBasedResult;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/scan/complextypes/ArrayQueryType.java
new file mode 100644
index 0000000..8dd6749
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/complextypes/ArrayQueryType.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.scan.complextypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+import org.apache.spark.sql.catalyst.util.*;
+import org.apache.spark.sql.types.*;
+
+public class ArrayQueryType extends ComplexQueryType implements GenericQueryType {
+
+  private GenericQueryType children;
+  private int keyOrdinalForQuery;
+
+  public ArrayQueryType(String name, String parentname, int blockIndex) {
+    super(name, parentname, blockIndex);
+  }
+
+  @Override public void addChildren(GenericQueryType children) {
+    if (this.getName().equals(children.getParentname())) {
+      this.children = children;
+    } else {
+      this.children.addChildren(children);
+    }
+  }
+
+  @Override public String getName() {
+    return name;
+  }
+
+  @Override public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override public String getParentname() {
+    return parentname;
+  }
+
+  @Override public void setParentname(String parentname) {
+    this.parentname = parentname;
+
+  }
+
+  @Override public void getAllPrimitiveChildren(List<GenericQueryType> primitiveChild) {
+    if (children instanceof PrimitiveQueryType) {
+      primitiveChild.add(children);
+    } else {
+      children.getAllPrimitiveChildren(primitiveChild);
+    }
+  }
+
+  public void parseBlocksAndReturnComplexColumnByteArray(
+      DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber,
+      DataOutputStream dataOutputStream) throws IOException {
+    byte[] input = new byte[8];
+    copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input);
+    ByteBuffer byteArray = ByteBuffer.wrap(input);
+    int dataLength = byteArray.getInt();
+    dataOutputStream.writeInt(dataLength);
+    if (dataLength == 0) {
+      // b.putInt(0);
+    } else {
+      int columnIndex = byteArray.getInt();
+      for (int i = 0; i < dataLength; i++) {
+        children
+            .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, columnIndex++,
+                dataOutputStream);
+      }
+    }
+  }
+
+  @Override public int getSurrogateIndex() {
+    return 0;
+  }
+
+  @Override public void setSurrogateIndex(int surrIndex) {
+
+  }
+
+  @Override public int getBlockIndex() {
+    return blockIndex;
+  }
+
+  @Override public void setBlockIndex(int blockIndex) {
+    this.blockIndex = blockIndex;
+  }
+
+  @Override public int getColsCount() {
+    return children.getColsCount() + 1;
+  }
+
+  @Override public void parseAndGetResultBytes(ByteBuffer complexData, DataOutputStream dataOutput)
+      throws IOException {
+    int dataLength = complexData.getInt();
+    dataOutput.writeInt(dataLength);
+    for (int i = 0; i < dataLength; i++) {
+      children.parseAndGetResultBytes(complexData, dataOutput);
+    }
+  }
+
+  @Override public void setKeySize(int[] keyBlockSize) {
+    children.setKeySize(keyBlockSize);
+  }
+
+  @Override public DataType getSchemaType() {
+    return new ArrayType(null, true);
+  }
+
+  @Override public int getKeyOrdinalForQuery() {
+    return keyOrdinalForQuery;
+  }
+
+  @Override public void setKeyOrdinalForQuery(int keyOrdinalForQuery) {
+    this.keyOrdinalForQuery = keyOrdinalForQuery;
+  }
+
+  @Override public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) {
+    readBlockDataChunk(blockChunkHolder);
+    children.fillRequiredBlockData(blockChunkHolder);
+  }
+
+  @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
+    int dataLength = surrogateData.getInt();
+    if (dataLength == -1) {
+      return null;
+    }
+    Object[] data = new Object[dataLength];
+    for (int i = 0; i < dataLength; i++) {
+      data[i] = children.getDataBasedOnDataTypeFromSurrogates(surrogateData);
+    }
+    return new GenericArrayData(data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java b/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
new file mode 100644
index 0000000..0a4c999
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
@@ -0,0 +1,80 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.complextypes;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+public class ComplexQueryType {
+  protected GenericQueryType children;
+
+  protected String name;
+
+  protected String parentname;
+
+  protected int blockIndex;
+
+  public ComplexQueryType(String name, String parentname, int blockIndex) {
+    this.name = name;
+    this.parentname = parentname;
+    this.blockIndex = blockIndex;
+  }
+
+  public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) {
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    children.fillRequiredBlockData(blockChunkHolder);
+  }
+
+  /**
+   * Method will copy the block chunk holder data to the passed
+   * byte[], this method is also used by child
+   *
+   * @param rowNumber
+   * @param input
+   */
+  protected void copyBlockDataChunk(DimensionColumnDataChunk[] dimensionColumnDataChunks,
+      int rowNumber, byte[] input) {
+    byte[] data = (byte[]) dimensionColumnDataChunks[blockIndex].getCompleteDataChunk();
+    if (null != dimensionColumnDataChunks[blockIndex].getAttributes().getInvertedIndexes()) {
+      System.arraycopy(data, dimensionColumnDataChunks[blockIndex].getAttributes()
+              .getInvertedIndexesReverse()[rowNumber] * dimensionColumnDataChunks[blockIndex]
+              .getAttributes().getColumnValueSize(), input, 0,
+          dimensionColumnDataChunks[blockIndex].getAttributes().getColumnValueSize());
+    } else {
+      System.arraycopy(data,
+          rowNumber * dimensionColumnDataChunks[blockIndex].getAttributes().getColumnValueSize(),
+          input, 0, dimensionColumnDataChunks[blockIndex].getAttributes().getColumnValueSize());
+    }
+  }
+
+  /*
+   * This method will read the block data chunk from the respective block
+   */
+  protected void readBlockDataChunk(BlocksChunkHolder blockChunkHolder) {
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
new file mode 100644
index 0000000..11f4651
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.scan.complextypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.keygenerator.mdkey.Bits;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+import org.apache.spark.sql.types.*;
+
+public class PrimitiveQueryType extends ComplexQueryType implements GenericQueryType {
+
+  private int index;
+
+  private String name;
+  private String parentname;
+
+  private int keySize;
+
+  private int blockIndex;
+
+  private Dictionary dictionary;
+
+  private org.apache.carbondata.core.carbon.metadata.datatype.DataType dataType;
+
+  private boolean isDirectDictionary;
+
+  public PrimitiveQueryType(String name, String parentname, int blockIndex,
+      org.apache.carbondata.core.carbon.metadata.datatype.DataType dataType, int keySize,
+      Dictionary dictionary, boolean isDirectDictionary) {
+    super(name, parentname, blockIndex);
+    this.dataType = dataType;
+    this.keySize = keySize;
+    this.dictionary = dictionary;
+    this.name = name;
+    this.parentname = parentname;
+    this.blockIndex = blockIndex;
+    this.isDirectDictionary = isDirectDictionary;
+  }
+
+  @Override public void addChildren(GenericQueryType children) {
+
+  }
+
+  @Override public String getName() {
+    return name;
+  }
+
+  @Override public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override public String getParentname() {
+    return parentname;
+  }
+
+  @Override public void setParentname(String parentname) {
+    this.parentname = parentname;
+
+  }
+
+  @Override public void getAllPrimitiveChildren(List<GenericQueryType> primitiveChild) {
+
+  }
+
+  @Override public int getSurrogateIndex() {
+    return index;
+  }
+
+  @Override public void setSurrogateIndex(int surrIndex) {
+    index = surrIndex;
+  }
+
+  @Override public int getBlockIndex() {
+    return blockIndex;
+  }
+
+  @Override public void setBlockIndex(int blockIndex) {
+    this.blockIndex = blockIndex;
+  }
+
+  @Override public int getColsCount() {
+    return 1;
+  }
+
+  @Override public void parseBlocksAndReturnComplexColumnByteArray(
+      DimensionColumnDataChunk[] dimensionDataChunks, int rowNumber,
+      DataOutputStream dataOutputStream) throws IOException {
+    byte[] currentVal =
+        new byte[dimensionDataChunks[blockIndex].getAttributes().getColumnValueSize()];
+    copyBlockDataChunk(dimensionDataChunks, rowNumber, currentVal);
+    dataOutputStream.write(currentVal);
+  }
+
+  @Override public void setKeySize(int[] keyBlockSize) {
+    this.keySize = keyBlockSize[this.blockIndex];
+  }
+
+  @Override public void parseAndGetResultBytes(ByteBuffer complexData, DataOutputStream dataOutput)
+      throws IOException {
+  }
+
+  @Override public DataType getSchemaType() {
+    switch (dataType) {
+      case INT:
+        return IntegerType$.MODULE$;
+      case DOUBLE:
+        return DoubleType$.MODULE$;
+      case LONG:
+        return LongType$.MODULE$;
+      case BOOLEAN:
+        return BooleanType$.MODULE$;
+      case TIMESTAMP:
+        return TimestampType$.MODULE$;
+      default:
+        return IntegerType$.MODULE$;
+    }
+  }
+
+  @Override public int getKeyOrdinalForQuery() {
+    return 0;
+  }
+
+  @Override public void setKeyOrdinalForQuery(int keyOrdinalForQuery) {
+  }
+
+  @Override public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) {
+    readBlockDataChunk(blockChunkHolder);
+  }
+
+  @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
+
+    byte[] data = new byte[keySize];
+    surrogateData.get(data);
+    Bits bit = new Bits(new int[]{keySize * 8});
+    int surrgateValue = (int)bit.getKeyArray(data, 0)[0];
+    Object actualData = null;
+    if (isDirectDictionary) {
+      DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(dataType);
+      actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
+    } else {
+      String dictionaryValueForKey = dictionary.getDictionaryValueForKey(surrgateValue);
+      actualData = DataTypeUtil.getDataBasedOnDataType(dictionaryValueForKey, this.dataType);
+    }
+    return actualData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/scan/complextypes/StructQueryType.java
new file mode 100644
index 0000000..a8b188b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/complextypes/StructQueryType.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.scan.complextypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRowWithSchema;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class StructQueryType extends ComplexQueryType implements GenericQueryType {
+
+  private List<GenericQueryType> children = new ArrayList<GenericQueryType>();
+  private String name;
+  private String parentname;
+  private int blockIndex;
+  private int keyOrdinalForQuery;
+
+  public StructQueryType(String name, String parentname, int blockIndex) {
+    super(name, parentname, blockIndex);
+    this.name = name;
+    this.parentname = parentname;
+    this.blockIndex = blockIndex;
+  }
+
+  @Override public void addChildren(GenericQueryType newChild) {
+    if (this.getName().equals(newChild.getParentname())) {
+      this.children.add(newChild);
+    } else {
+      for (GenericQueryType child : this.children) {
+        child.addChildren(newChild);
+      }
+    }
+
+  }
+
+  @Override public String getName() {
+    return name;
+  }
+
+  @Override public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override public String getParentname() {
+    return parentname;
+  }
+
+  @Override public void setParentname(String parentname) {
+    this.parentname = parentname;
+
+  }
+
+  @Override public void getAllPrimitiveChildren(List<GenericQueryType> primitiveChild) {
+    for (int i = 0; i < children.size(); i++) {
+      GenericQueryType child = children.get(i);
+      if (child instanceof PrimitiveQueryType) {
+        primitiveChild.add(child);
+      } else {
+        child.getAllPrimitiveChildren(primitiveChild);
+      }
+    }
+  }
+
+  @Override public int getSurrogateIndex() {
+    return 0;
+  }
+
+  @Override public void setSurrogateIndex(int surrIndex) {
+
+  }
+
+  @Override public int getBlockIndex() {
+    return blockIndex;
+  }
+
+  @Override public void setBlockIndex(int blockIndex) {
+    this.blockIndex = blockIndex;
+  }
+
+  @Override public int getColsCount() {
+    int colsCount = 1;
+    for (int i = 0; i < children.size(); i++) {
+      colsCount += children.get(i).getColsCount();
+    }
+    return colsCount;
+  }
+
+  @Override public void parseBlocksAndReturnComplexColumnByteArray(
+      DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber,
+      DataOutputStream dataOutputStream) throws IOException {
+    byte[] input = new byte[8];
+    copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input);
+    ByteBuffer byteArray = ByteBuffer.wrap(input);
+    int childElement = byteArray.getInt();
+    dataOutputStream.writeInt(childElement);
+    if (childElement == 0) {
+      // b.putInt(0);
+    } else {
+      for (int i = 0; i < childElement; i++) {
+        children.get(i)
+            .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, rowNumber,
+                dataOutputStream);
+      }
+    }
+  }
+
+  @Override public void parseAndGetResultBytes(ByteBuffer complexData, DataOutputStream dataOutput)
+      throws IOException {
+    int childElement = complexData.getInt();
+    dataOutput.writeInt(childElement);
+    for (int i = 0; i < childElement; i++) {
+      children.get(i).parseAndGetResultBytes(complexData, dataOutput);
+    }
+  }
+
+  @Override public void setKeySize(int[] keyBlockSize) {
+    for (int i = 0; i < children.size(); i++) {
+      children.get(i).setKeySize(keyBlockSize);
+    }
+  }
+
+  @Override public DataType getSchemaType() {
+    StructField[] fields = new StructField[children.size()];
+    for (int i = 0; i < children.size(); i++) {
+      fields[i] = new StructField(children.get(i).getName(), null, true,
+          Metadata.empty());
+    }
+    return new StructType(fields);
+  }
+
+  @Override public int getKeyOrdinalForQuery() {
+    return keyOrdinalForQuery;
+  }
+
+  @Override public void setKeyOrdinalForQuery(int keyOrdinalForQuery) {
+    this.keyOrdinalForQuery = keyOrdinalForQuery;
+  }
+
+  @Override public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) {
+    readBlockDataChunk(blockChunkHolder);
+
+    for (int i = 0; i < children.size(); i++) {
+      children.get(i).fillRequiredBlockData(blockChunkHolder);
+    }
+  }
+
+  @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
+    int childLength = surrogateData.getInt();
+    Object[] fields = new Object[childLength];
+    for (int i = 0; i < childLength; i++) {
+      fields[i] =  children.get(i).getDataBasedOnDataTypeFromSurrogates(surrogateData);
+    }
+
+    return new GenericInternalRowWithSchema(fields, (StructType) getSchemaType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
new file mode 100644
index 0000000..53bf8ca
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.model.QueryModel;
+
+/**
+ * Interface for carbon query executor.
+ * Will be used to execute the query based on the query model
+ * and will return the iterator over query result
+ */
+public interface QueryExecutor<E> {
+
+  /**
+   * Below method will be used to execute the query based on query model passed from driver
+   *
+   * @param queryModel query details
+   * @return query result iterator
+   * @throws QueryExecutionException if any failure while executing the query
+   */
+  CarbonIterator<E> execute(QueryModel queryModel) throws QueryExecutionException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
new file mode 100644
index 0000000..ab75231
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor;
+
+import org.apache.carbondata.scan.executor.impl.DetailQueryExecutor;
+import org.apache.carbondata.scan.model.QueryModel;
+
+/**
+ * Factory class to get the query executor from RDD
+ * This will return the executor based on query type
+ */
+public class QueryExecutorFactory {
+
+  public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+    return new DetailQueryExecutor();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/exception/QueryExecutionException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/exception/QueryExecutionException.java b/core/src/main/java/org/apache/carbondata/scan/executor/exception/QueryExecutionException.java
new file mode 100644
index 0000000..7003184
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/exception/QueryExecutionException.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor.exception;
+
+import java.util.Locale;
+
+/**
+ * Exception class for query execution
+ *
+ * @author Administrator
+ */
+public class QueryExecutionException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param errorCode The error code for this exception.
+   * @param msg       The error message for this exception.
+   */
+  public QueryExecutionException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param errorCode The error code for this exception.
+   * @param msg       The error message for this exception.
+   */
+  public QueryExecutionException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public QueryExecutionException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
new file mode 100644
index 0000000..e204572
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.common.logging.impl.StandardLogService;
+import org.apache.carbondata.core.carbon.datastore.BlockIndexStore;
+import org.apache.carbondata.core.carbon.datastore.IndexKey;
+import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatistic;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.executor.QueryExecutor;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.executor.infos.AggregatorInfo;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.executor.util.QueryUtil;
+import org.apache.carbondata.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.scan.filter.FilterUtil;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.model.QueryModel;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+/**
+ * This class provides a skeletal implementation of the {@link QueryExecutor}
+ * interface to minimize the effort required to implement this interface. This
+ * will be used to prepare all the properties required for query execution
+ */
+public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractQueryExecutor.class.getName());
+  /**
+   * holder for query properties which will be used to execute the query
+   */
+  protected QueryExecutorProperties queryProperties;
+
+  public AbstractQueryExecutor() {
+    queryProperties = new QueryExecutorProperties();
+  }
+
+  /**
+   * Below method will be used to fill the executor properties based on query
+   * model it will parse the query model and get the detail and fill it in
+   * query properties
+   *
+   * @param queryModel
+   */
+  protected void initQuery(QueryModel queryModel) throws QueryExecutionException {
+    StandardLogService.setThreadName(StandardLogService.getPartitionID(
+        queryModel.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableName()),
+        queryModel.getQueryId());
+    LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
+        .getCarbonTableIdentifier().getTableName());
+    // Initializing statistics list to record the query statistics
+    // creating copy on write to handle concurrent scenario
+    queryProperties.queryStatisticsRecorder = new QueryStatisticsRecorder(queryModel.getQueryId());
+    queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
+    QueryUtil.resolveQueryModel(queryModel);
+    QueryStatistic queryStatistic = new QueryStatistic();
+    // get the table blocks
+    try {
+      queryProperties.dataBlocks = BlockIndexStore.getInstance()
+          .loadAndGetBlocks(queryModel.getTableBlockInfos(),
+              queryModel.getAbsoluteTableIdentifier());
+    } catch (IndexBuilderException e) {
+      throw new QueryExecutionException(e);
+    }
+    queryStatistic
+        .addStatistics("Time taken to load the Block(s) In Executor", System.currentTimeMillis());
+    queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+    //
+    // // updating the restructuring infos for the query
+    queryProperties.keyStructureInfo = getKeyStructureInfo(queryModel,
+        queryProperties.dataBlocks.get(queryProperties.dataBlocks.size() - 1).getSegmentProperties()
+            .getDimensionKeyGenerator());
+
+    // calculating the total number of aggeragted columns
+    int aggTypeCount = queryModel.getQueryMeasures().size();
+
+    int currentIndex = 0;
+    String[] aggTypes = new String[aggTypeCount];
+    DataType[] dataTypes = new DataType[aggTypeCount];
+
+    for (QueryMeasure carbonMeasure : queryModel.getQueryMeasures()) {
+      // adding the data type and aggregation type of all the measure this
+      // can be used
+      // to select the aggregator
+      aggTypes[currentIndex] = carbonMeasure.getAggregateFunction();
+      dataTypes[currentIndex] = carbonMeasure.getMeasure().getDataType();
+      currentIndex++;
+    }
+    queryProperties.measureDataTypes = dataTypes;
+    // as aggregation will be executed in following order
+    // 1.aggregate dimension expression
+    // 2. expression
+    // 3. query measure
+    // so calculating the index of the expression start index
+    // and measure column start index
+    queryProperties.aggExpressionStartIndex = queryModel.getQueryMeasures().size();
+    queryProperties.measureStartIndex = aggTypes.length - queryModel.getQueryMeasures().size();
+
+    queryProperties.complexFilterDimension =
+        QueryUtil.getAllFilterDimensions(queryModel.getFilterExpressionResolverTree());
+    queryStatistic = new QueryStatistic();
+    // dictionary column unique column id to dictionary mapping
+    // which will be used to get column actual data
+    queryProperties.columnToDictionayMapping = QueryUtil
+        .getDimensionDictionaryDetail(queryModel.getQueryDimension(),
+            queryProperties.complexFilterDimension, queryModel.getAbsoluteTableIdentifier());
+    queryStatistic
+        .addStatistics("Time taken to load the Dictionary In Executor", System.currentTimeMillis());
+    queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+    queryModel.setColumnToDictionaryMapping(queryProperties.columnToDictionayMapping);
+    // setting the sort dimension index. as it will be updated while getting the sort info
+    // so currently setting it to default 0 means sort is not present in any dimension
+    queryProperties.sortDimIndexes = new byte[queryModel.getQueryDimension().size()];
+  }
+
+  /**
+   * Below method will be used to get the key structure info for the uqery
+   *
+   * @param queryModel   query model
+   * @param keyGenerator
+   * @return key structure info
+   */
+  private KeyStructureInfo getKeyStructureInfo(QueryModel queryModel, KeyGenerator keyGenerator) {
+    // getting the masked byte range for dictionary column
+    int[] maskByteRanges =
+        QueryUtil.getMaskedByteRange(queryModel.getQueryDimension(), keyGenerator);
+
+    // getting the masked bytes for query dimension dictionary column
+    int[] maskedBytes = QueryUtil.getMaskedByte(keyGenerator.getKeySizeInBytes(), maskByteRanges);
+
+    // max key for the dictionary dimension present in the query
+    byte[] maxKey = null;
+    try {
+      // getting the max key which will be used to masked and get the
+      // masked key
+      maxKey = QueryUtil.getMaxKeyBasedOnDimensions(queryModel.getQueryDimension(), keyGenerator);
+    } catch (KeyGenException e) {
+      LOGGER.error(e, "problem while getting the max key");
+    }
+
+    KeyStructureInfo restructureInfos = new KeyStructureInfo();
+    restructureInfos.setKeyGenerator(keyGenerator);
+    restructureInfos.setMaskByteRanges(maskByteRanges);
+    restructureInfos.setMaskedBytes(maskedBytes);
+    restructureInfos.setMaxKey(maxKey);
+    return restructureInfos;
+  }
+
+  protected List<BlockExecutionInfo> getBlockExecutionInfos(QueryModel queryModel)
+      throws QueryExecutionException {
+    initQuery(queryModel);
+    List<BlockExecutionInfo> blockExecutionInfoList = new ArrayList<BlockExecutionInfo>();
+    // fill all the block execution infos for all the blocks selected in
+    // query
+    // and query will be executed based on that infos
+    for (int i = 0; i < queryProperties.dataBlocks.size(); i++) {
+      blockExecutionInfoList
+          .add(getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i)));
+    }
+    queryProperties.complexDimensionInfoMap =
+        blockExecutionInfoList.get(blockExecutionInfoList.size() - 1).getComlexDimensionInfoMap();
+    return blockExecutionInfoList;
+  }
+
+  /**
+   * Below method will be used to get the block execution info which is
+   * required to execute any block  based on query model
+   *
+   * @param queryModel query model from user query
+   * @param blockIndex block index
+   * @return block execution info
+   * @throws QueryExecutionException any failure during block info creation
+   */
+  protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
+      AbstractIndex blockIndex) throws QueryExecutionException {
+    BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
+    SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
+    List<CarbonDimension> tableBlockDimensions = segmentProperties.getDimensions();
+    KeyGenerator blockKeyGenerator = segmentProperties.getDimensionKeyGenerator();
+
+    // below is to get only those dimension in query which is present in the
+    // table block
+    List<QueryDimension> updatedQueryDimension = RestructureUtil
+        .getUpdatedQueryDimension(queryModel.getQueryDimension(), tableBlockDimensions,
+            segmentProperties.getComplexDimensions());
+    // TODO add complex dimension children
+    int[] maskByteRangesForBlock =
+        QueryUtil.getMaskedByteRange(updatedQueryDimension, blockKeyGenerator);
+    int[] maksedByte =
+        QueryUtil.getMaskedByte(blockKeyGenerator.getKeySizeInBytes(), maskByteRangesForBlock);
+    blockExecutionInfo.setQueryDimensions(
+        updatedQueryDimension.toArray(new QueryDimension[updatedQueryDimension.size()]));
+    blockExecutionInfo.setQueryMeasures(queryModel.getQueryMeasures()
+        .toArray(new QueryMeasure[queryModel.getQueryMeasures().size()]));
+    blockExecutionInfo.setDataBlock(blockIndex);
+    blockExecutionInfo.setBlockKeyGenerator(blockKeyGenerator);
+    // adding aggregation info for query
+    blockExecutionInfo.setAggregatorInfo(getAggregatorInfoForBlock(queryModel, blockIndex));
+    // adding query statistics list to record the statistics
+    blockExecutionInfo.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
+    // setting the limit
+    blockExecutionInfo.setLimit(queryModel.getLimit());
+    // setting whether detail query or not
+    blockExecutionInfo.setDetailQuery(queryModel.isDetailQuery());
+    // setting whether raw record query or not
+    blockExecutionInfo.setRawRecordDetailQuery(queryModel.isForcedDetailRawQuery());
+    // setting the masked byte of the block which will be
+    // used to update the unpack the older block keys
+    blockExecutionInfo.setMaskedByteForBlock(maksedByte);
+    // total number dimension
+    blockExecutionInfo
+        .setTotalNumberDimensionBlock(segmentProperties.getDimensionOrdinalToBlockMapping().size());
+    blockExecutionInfo
+        .setTotalNumberOfMeasureBlock(segmentProperties.getMeasuresOrdinalToBlockMapping().size());
+    blockExecutionInfo.setComplexDimensionInfoMap(QueryUtil
+        .getComplexDimensionsMap(updatedQueryDimension,
+            segmentProperties.getDimensionOrdinalToBlockMapping(),
+            segmentProperties.getEachComplexDimColumnValueSize(),
+            queryProperties.columnToDictionayMapping, queryProperties.complexFilterDimension));
+    // to check whether older block key update is required or not
+    blockExecutionInfo.setFixedKeyUpdateRequired(
+        !blockKeyGenerator.equals(queryProperties.keyStructureInfo.getKeyGenerator()));
+    IndexKey startIndexKey = null;
+    IndexKey endIndexKey = null;
+    if (null != queryModel.getFilterExpressionResolverTree()) {
+      // loading the filter executer tree for filter evaluation
+      blockExecutionInfo.setFilterExecuterTree(FilterUtil
+          .getFilterExecuterTree(queryModel.getFilterExpressionResolverTree(), segmentProperties,
+              blockExecutionInfo.getComlexDimensionInfoMap()));
+      List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
+      FilterUtil.traverseResolverTreeAndGetStartAndEndKey(segmentProperties,
+          queryModel.getAbsoluteTableIdentifier(), queryModel.getFilterExpressionResolverTree(),
+          listOfStartEndKeys);
+      startIndexKey = listOfStartEndKeys.get(0);
+      endIndexKey = listOfStartEndKeys.get(1);
+    } else {
+      try {
+        startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+        endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+      } catch (KeyGenException e) {
+        throw new QueryExecutionException(e);
+      }
+    }
+    blockExecutionInfo.setFileType(
+        FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getStorePath()));
+    //setting the start index key of the block node
+    blockExecutionInfo.setStartKey(startIndexKey);
+    //setting the end index key of the block node
+    blockExecutionInfo.setEndKey(endIndexKey);
+    // expression dimensions
+    List<CarbonDimension> expressionDimensions =
+        new ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // expression measure
+    List<CarbonMeasure> expressionMeasures =
+        new ArrayList<CarbonMeasure>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // setting all the dimension chunk indexes to be read from file
+    blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(QueryUtil
+        .getDimensionsBlockIndexes(updatedQueryDimension,
+            segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions));
+    // setting all the measure chunk indexes to be read from file
+    blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(QueryUtil
+        .getMeasureBlockIndexes(queryModel.getQueryMeasures(), expressionMeasures,
+            segmentProperties.getMeasuresOrdinalToBlockMapping()));
+    // setting the key structure info which will be required
+    // to update the older block key with new key generator
+    blockExecutionInfo.setKeyStructureInfo(queryProperties.keyStructureInfo);
+    // setting the size of fixed key column (dictionary column)
+    blockExecutionInfo.setFixedLengthKeySize(getKeySize(updatedQueryDimension, segmentProperties));
+    Set<Integer> dictionaryColumnBlockIndex = new HashSet<Integer>();
+    List<Integer> noDictionaryColumnBlockIndex = new ArrayList<Integer>();
+    // get the block index to be read from file for query dimension
+    // for both dictionary columns and no dictionary columns
+    QueryUtil.fillQueryDimensionsBlockIndexes(updatedQueryDimension,
+        segmentProperties.getDimensionOrdinalToBlockMapping(), dictionaryColumnBlockIndex,
+        noDictionaryColumnBlockIndex);
+    int[] queryDictionaryColumnBlockIndexes = ArrayUtils.toPrimitive(
+        dictionaryColumnBlockIndex.toArray(new Integer[dictionaryColumnBlockIndex.size()]));
+    // need to sort the dictionary column as for all dimension
+    // column key will be filled based on key order
+    Arrays.sort(queryDictionaryColumnBlockIndexes);
+    blockExecutionInfo.setDictionaryColumnBlockIndex(queryDictionaryColumnBlockIndexes);
+    // setting the no dictionary column block indexes
+    blockExecutionInfo.setNoDictionaryBlockIndexes(ArrayUtils.toPrimitive(
+        noDictionaryColumnBlockIndex.toArray(new Integer[noDictionaryColumnBlockIndex.size()])));
+    // setting column id to dictionary mapping
+    blockExecutionInfo.setColumnIdToDcitionaryMapping(queryProperties.columnToDictionayMapping);
+    // setting each column value size
+    blockExecutionInfo.setEachColumnValueSize(segmentProperties.getEachDimColumnValueSize());
+    blockExecutionInfo.setComplexColumnParentBlockIndexes(
+        getComplexDimensionParentBlockIndexes(updatedQueryDimension));
+    try {
+      // to set column group and its key structure info which will be used
+      // to
+      // for getting the column group column data in case of final row
+      // and in case of dimension aggregation
+      blockExecutionInfo.setColumnGroupToKeyStructureInfo(
+          QueryUtil.getColumnGroupKeyStructureInfo(updatedQueryDimension, segmentProperties));
+    } catch (KeyGenException e) {
+      throw new QueryExecutionException(e);
+    }
+    return blockExecutionInfo;
+  }
+
+  /**
+   * This method will be used to get fixed key length size this will be used
+   * to create a row from column chunk
+   *
+   * @param queryDimension    query dimension
+   * @param blockMetadataInfo block metadata info
+   * @return key size
+   */
+  private int getKeySize(List<QueryDimension> queryDimension, SegmentProperties blockMetadataInfo) {
+    List<Integer> fixedLengthDimensionOrdinal =
+        new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    int counter = 0;
+    while (counter < queryDimension.size()) {
+      if (queryDimension.get(counter).getDimension().numberOfChild() > 0) {
+        counter += queryDimension.get(counter).getDimension().numberOfChild();
+        continue;
+      } else if (!CarbonUtil.hasEncoding(queryDimension.get(counter).getDimension().getEncoder(),
+          Encoding.DICTIONARY)) {
+        counter++;
+      } else {
+        fixedLengthDimensionOrdinal.add(queryDimension.get(counter).getDimension().getKeyOrdinal());
+        counter++;
+      }
+    }
+    int[] dictioanryColumnOrdinal = ArrayUtils.toPrimitive(
+        fixedLengthDimensionOrdinal.toArray(new Integer[fixedLengthDimensionOrdinal.size()]));
+    if (dictioanryColumnOrdinal.length > 0) {
+      return blockMetadataInfo.getFixedLengthKeySplitter()
+          .getKeySizeByBlock(dictioanryColumnOrdinal);
+    }
+    return 0;
+  }
+
+  /**
+   * Below method will be used to get the aggrgator info for the query
+   *
+   * @param queryModel query model
+   * @param tableBlock table block
+   * @return aggregator info
+   */
+  private AggregatorInfo getAggregatorInfoForBlock(QueryModel queryModel,
+      AbstractIndex tableBlock) {
+    // getting the aggregate infos which will be used during aggregation
+    AggregatorInfo aggregatorInfos = RestructureUtil
+        .getAggregatorInfos(queryModel.getQueryMeasures(),
+            tableBlock.getSegmentProperties().getMeasures());
+    // setting the index of expression in measure aggregators
+    aggregatorInfos.setExpressionAggregatorStartIndex(queryProperties.aggExpressionStartIndex);
+    // setting the index of measure columns in measure aggregators
+    aggregatorInfos.setMeasureAggregatorStartIndex(queryProperties.measureStartIndex);
+    // setting the measure aggregator for all aggregation function selected
+    // in query
+    aggregatorInfos.setMeasureDataTypes(queryProperties.measureDataTypes);
+    return aggregatorInfos;
+  }
+
+  private int[] getComplexDimensionParentBlockIndexes(List<QueryDimension> queryDimensions) {
+    List<Integer> parentBlockIndexList = new ArrayList<Integer>();
+    for (QueryDimension queryDimension : queryDimensions) {
+      if (CarbonUtil.hasDataType(queryDimension.getDimension().getDataType(),
+          new DataType[] { DataType.ARRAY, DataType.STRUCT, DataType.MAP })) {
+        parentBlockIndexList.add(queryDimension.getDimension().getOrdinal());
+      }
+    }
+    return ArrayUtils
+        .toPrimitive(parentBlockIndexList.toArray(new Integer[parentBlockIndexList.size()]));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
new file mode 100644
index 0000000..716cdc7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.carbondata.scan.result.iterator.DetailQueryResultIterator;
+
+/**
+ * Below class will be used to execute the detail query
+ * For executing the detail query it will pass all the block execution
+ * info to detail query result iterator and iterator will be returned
+ */
+public class DetailQueryExecutor extends AbstractQueryExecutor {
+
+  @Override public CarbonIterator<Object[]> execute(QueryModel queryModel)
+      throws QueryExecutionException {
+    List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
+    return new DetailQueryResultIterator(blockExecutionInfoList, queryModel);
+  }
+
+}


Mime
View raw message