carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [22/52] [partial] incubator-carbondata git commit: move core package
Date Mon, 16 Jan 2017 14:52:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
new file mode 100644
index 0000000..00116c5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.iterator;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+
+/**
+ * It reads the data vector batch format
+ */
+public class VectorDetailQueryResultIterator extends AbstractDetailQueryResultIterator<Object> {
+
+  private final Object lock = new Object();
+
+  public VectorDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
+      ExecutorService execService) {
+    super(infos, queryModel, execService);
+  }
+
+  @Override public Object next() {
+    throw new UnsupportedOperationException("call processNextBatch instaed");
+  }
+
+  public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+    synchronized (lock) {
+      updateDataBlockIterator();
+      if (dataBlockIterator != null) {
+        dataBlockIterator.processNextBatch(columnarBatch);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
new file mode 100644
index 0000000..7d29b0f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.result.vector;
+
+import org.apache.spark.sql.types.Decimal;
+
+public interface CarbonColumnVector {
+
+  void putShort(int rowId, short value);
+
+  void putInt(int rowId, int value);
+
+  void putLong(int rowId, long value);
+
+  void putDecimal(int rowId, Decimal value, int precision);
+
+  void putDouble(int rowId, double value);
+
+  void putBytes(int rowId, byte[] value);
+
+  void putBytes(int rowId, int offset, int length, byte[] value);
+
+  void putNull(int rowId);
+
+  boolean isNull(int rowId);
+
+  void putObject(int rowId, Object obj);
+
+  Object getData(int rowId);
+
+  void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
new file mode 100644
index 0000000..faeffde
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.vector;
+
+public class CarbonColumnarBatch {
+
+  public CarbonColumnVector[] columnVectors;
+
+  private int batchSize;
+
+  private int actualSize;
+
+  private int rowCounter;
+
+  public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize) {
+    this.columnVectors = columnVectors;
+    this.batchSize = batchSize;
+  }
+
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  public int getActualSize() {
+    return actualSize;
+  }
+
+  public void setActualSize(int actualSize) {
+    this.actualSize = actualSize;
+  }
+
+  public void reset() {
+    actualSize = 0;
+    rowCounter = 0;
+    for (int i = 0; i < columnVectors.length; i++) {
+      columnVectors[i].reset();
+    }
+  }
+
+  public int getRowCounter() {
+    return rowCounter;
+  }
+
+  public void setRowCounter(int rowCounter) {
+    this.rowCounter = rowCounter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
new file mode 100644
index 0000000..852abe9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.vector;
+
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+
+public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
+  public int offset;
+  public int size;
+  public CarbonColumnVector vector;
+  public int vectorOffset;
+  public QueryDimension dimension;
+  public QueryMeasure measure;
+  public int ordinal;
+  public DirectDictionaryGenerator directDictionaryGenerator;
+  public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller;
+  public GenericQueryType genericQueryType;
+
+  @Override public int compareTo(ColumnVectorInfo o) {
+    return ordinal - o.ordinal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
new file mode 100644
index 0000000..9a19fb7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.vector;
+
+import java.math.BigDecimal;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+import org.apache.spark.sql.types.Decimal;
+
+public class MeasureDataVectorProcessor {
+
+  public interface MeasureVectorFiller {
+
+    void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info);
+
+    void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info);
+  }
+
+  public static class IntegralMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putInt(vectorOffset,
+              (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putInt(vectorOffset,
+              (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class ShortMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putShort(vectorOffset,
+              (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putShort(vectorOffset,
+              (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class LongMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putLong(vectorOffset,
+              dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putLong(vectorOffset,
+              dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class DecimalMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          BigDecimal decimal =
+              dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(i);
+          Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(decimal);
+          vector.putDecimal(vectorOffset, toDecimal, precision);
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          BigDecimal decimal =
+              dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(currentRow);
+          Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(decimal);
+          vector.putDecimal(vectorOffset, toDecimal, precision);
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class DefaultMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putDouble(vectorOffset,
+              dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putDouble(vectorOffset,
+              dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class MeasureVectorFillerFactory {
+
+    public static MeasureVectorFiller getMeasureVectorFiller(DataType dataType) {
+      switch (dataType) {
+        case SHORT:
+          return new ShortMeasureVectorFiller();
+        case INT:
+          return new IntegralMeasureVectorFiller();
+        case LONG:
+          return new LongMeasureVectorFiller();
+        case DECIMAL:
+          return new DecimalMeasureVectorFiller();
+        default:
+          return new DefaultMeasureVectorFiller();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
new file mode 100644
index 0000000..f1e7565
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.vector.impl;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class CarbonColumnVectorImpl implements CarbonColumnVector {
+
+  private Object[] data;
+
+  private int[] ints;
+
+  private long[] longs;
+
+  private Decimal[] decimals;
+
+  private byte[][] bytes;
+
+  private double[] doubles;
+
+  private BitSet nullBytes;
+
+  private DataType dataType;
+
+  public CarbonColumnVectorImpl(int batchSize, DataType dataType) {
+    nullBytes = new BitSet(batchSize);
+    this.dataType = dataType;
+    switch (dataType) {
+      case INT:
+        ints = new int[batchSize];
+        break;
+      case LONG:
+        longs = new long[batchSize];
+        break;
+      case DOUBLE:
+        doubles = new double[batchSize];
+        break;
+      case STRING:
+        bytes = new byte[batchSize][];
+        break;
+      case DECIMAL:
+        decimals = new Decimal[batchSize];
+        break;
+      default:
+        data = new Object[batchSize];
+    }
+  }
+
+  @Override public void putShort(int rowId, short value) {
+
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    ints[rowId] = value;
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    longs[rowId] = value;
+  }
+
+  @Override public void putDecimal(int rowId, Decimal value, int precision) {
+    decimals[rowId] = value;
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    doubles[rowId] = value;
+  }
+
+  @Override public void putBytes(int rowId, byte[] value) {
+    bytes[rowId] = value;
+  }
+
+  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+
+  }
+
+  @Override public void putNull(int rowId) {
+    nullBytes.set(rowId);
+  }
+
+  @Override public boolean isNull(int rowId) {
+    return nullBytes.get(rowId);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    data[rowId] = obj;
+  }
+
+  @Override public Object getData(int rowId) {
+    if (nullBytes.get(rowId)) {
+      return null;
+    }
+    switch (dataType) {
+      case INT:
+        return ints[rowId];
+      case LONG:
+        return longs[rowId];
+      case DOUBLE:
+        return doubles[rowId];
+      case STRING:
+        return UTF8String.fromBytes(bytes[rowId]);
+      case DECIMAL:
+        return decimals[rowId];
+      default:
+        return data[rowId];
+    }
+  }
+
+  @Override public void reset() {
+    nullBytes.clear();
+    switch (dataType) {
+      case INT:
+        Arrays.fill(ints, 0);
+        break;
+      case LONG:
+        Arrays.fill(longs, 0);
+        break;
+      case DOUBLE:
+        Arrays.fill(doubles, 0);
+        break;
+      case STRING:
+        Arrays.fill(bytes, null);
+        break;
+      case DECIMAL:
+        Arrays.fill(decimals, null);
+        break;
+      default:
+        Arrays.fill(data, null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
new file mode 100644
index 0000000..68f75f0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.scanner;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
+import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+
+/**
+ * Blocklet scanner class to process the block
+ */
+public abstract class AbstractBlockletScanner implements BlockletScanner {
+
+  /**
+   * scanner result
+   */
+  protected AbstractScannedResult scannedResult;
+
+  /**
+   * block execution info
+   */
+  protected BlockExecutionInfo blockExecutionInfo;
+
+  public QueryStatisticsModel queryStatisticsModel;
+
+  public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
+    this.blockExecutionInfo = tableBlockExecutionInfos;
+  }
+
+  @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
+      throws IOException, FilterUnsupportedException {
+    fillKeyValue(blocksChunkHolder);
+    return scannedResult;
+  }
+
+  protected void fillKeyValue(BlocksChunkHolder blocksChunkHolder) throws IOException {
+
+    QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+            .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
+    totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
+            totalBlockletStatistic.getCount() + 1);
+    queryStatisticsModel.getRecorder().recordStatistics(totalBlockletStatistic);
+    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel
+            .getStatisticsTypeAndObjMap().get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+    validScannedBlockletStatistic
+            .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+                    validScannedBlockletStatistic.getCount() + 1);
+    queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
+    scannedResult.reset();
+    scannedResult.setNumberOfRows(blocksChunkHolder.getDataBlock().nodeSize());
+    scannedResult.setBlockletId(
+              blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR
+                      + blocksChunkHolder.getDataBlock().nodeNumber());
+    scannedResult.setDimensionChunks(blocksChunkHolder.getDataBlock()
+        .getDimensionChunks(blocksChunkHolder.getFileReader(),
+            blockExecutionInfo.getAllSelectedDimensionBlocksIndexes()));
+    scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock()
+            .getMeasureChunks(blocksChunkHolder.getFileReader(),
+                blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()));
+    // loading delete data cache in blockexecutioninfo instance
+    DeleteDeltaCacheLoaderIntf deleteCacheLoader =
+        new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
+            blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier());
+    deleteCacheLoader.loadDeleteDeltaFileDataToCache();
+    scannedResult
+        .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
new file mode 100644
index 0000000..9484318
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.scanner;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+
+/**
+ * Interface for processing the block
+ * Processing can be filter based processing or non filter based processing
+ */
+public interface BlockletScanner {
+
+  /**
+   * Below method will used to process the block data and get the scanned result
+   *
+   * @param blocksChunkHolder block chunk which holds the block data
+   * @return scannerResult
+   * result after processing
+   */
+  AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
+      throws IOException, FilterUnsupportedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
new file mode 100644
index 0000000..1c02ddb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.scan.scanner.impl;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
+import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.impl.FilterQueryScannedResult;
+import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Below class will be used for filter query processing
+ * this class will be first apply the filter then it will read the block if
+ * required and return the scanned result
+ */
+public class FilterScanner extends AbstractBlockletScanner {
+
+  /**
+   * filter tree
+   */
+  private FilterExecuter filterExecuter;
+  /**
+   * this will be used to apply min max
+   * this will be useful for dimension column which is on the right side
+   * as node finder will always give tentative blocks, if column data stored individually
+   * and data is in sorted order then we can check whether filter is in the range of min max or not
+   * if it present then only we can apply filter on complete data.
+   * this will be very useful in case of sparse data when rows are
+   * repeating.
+   */
+  private boolean isMinMaxEnabled;
+
+  private QueryStatisticsModel queryStatisticsModel;
+
+  public FilterScanner(BlockExecutionInfo blockExecutionInfo,
+      QueryStatisticsModel queryStatisticsModel) {
+    super(blockExecutionInfo);
+    scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
+    // to check whether min max is enabled or not
+    String minMaxEnableValue = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
+            CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE);
+    if (null != minMaxEnableValue) {
+      isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue);
+    }
+    // get the filter tree
+    this.filterExecuter = blockExecutionInfo.getFilterExecuterTree();
+    this.queryStatisticsModel = queryStatisticsModel;
+  }
+
+  /**
+   * Below method will be used to process the block
+   *
+   * @param blocksChunkHolder block chunk holder which holds the data
+   * @throws FilterUnsupportedException
+   */
+  @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
+      throws IOException, FilterUnsupportedException {
+    fillScannedResult(blocksChunkHolder);
+    return scannedResult;
+  }
+
+  /**
+   * This method will process the data in below order
+   * 1. first apply min max on the filter tree and check whether any of the filter
+   * is fall on the range of min max, if not then return empty result
+   * 2. If filter falls on min max range then apply filter on actual
+   * data and get the filtered row index
+   * 3. if row index is empty then return the empty result
+   * 4. if row indexes is not empty then read only those blocks(measure or dimension)
+   * which was present in the query but not present in the filter, as while applying filter
+   * some of the blocks where already read and present in chunk holder so not need to
+   * read those blocks again, this is to avoid reading of same blocks which was already read
+   * 5. Set the blocks and filter indexes to result
+   *
+   * @param blocksChunkHolder
+   * @throws FilterUnsupportedException
+   */
+  private void fillScannedResult(BlocksChunkHolder blocksChunkHolder)
+      throws FilterUnsupportedException, IOException {
+    scannedResult.reset();
+    scannedResult.setBlockletId(
+        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
+            .getDataBlock().nodeNumber());
+    // apply min max
+    if (isMinMaxEnabled) {
+      BitSet bitSet = this.filterExecuter
+          .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
+              blocksChunkHolder.getDataBlock().getColumnsMinValue());
+      if (bitSet.isEmpty()) {
+        scannedResult.setNumberOfRows(0);
+        scannedResult.setIndexes(new int[0]);
+        CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
+            blocksChunkHolder.getMeasureDataChunk());
+        return;
+      }
+    }
+    // apply filter on actual data
+    BitSet bitSet = this.filterExecuter.applyFilter(blocksChunkHolder);
+    // if indexes is empty then return with empty result
+    if (bitSet.isEmpty()) {
+      scannedResult.setNumberOfRows(0);
+      scannedResult.setIndexes(new int[0]);
+      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
+          blocksChunkHolder.getMeasureDataChunk());
+      return;
+    }
+    // valid scanned blocklet
+    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+    validScannedBlockletStatistic
+        .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+            validScannedBlockletStatistic.getCount() + 1);
+    queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
+    // get the row indexes from bot set
+    int[] indexes = new int[bitSet.cardinality()];
+    int index = 0;
+    for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+      indexes[index++] = i;
+    }
+    // loading delete data cache in blockexecutioninfo instance
+    DeleteDeltaCacheLoaderIntf deleteCacheLoader =
+        new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
+            blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier());
+    deleteCacheLoader.loadDeleteDeltaFileDataToCache();
+    scannedResult
+        .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
+    FileHolder fileReader = blocksChunkHolder.getFileReader();
+    int[][] allSelectedDimensionBlocksIndexes =
+        blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
+    DimensionColumnDataChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
+        .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);
+
+    DimensionColumnDataChunk[] dimensionColumnDataChunk =
+        new DimensionColumnDataChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
+    // read dimension chunk blocks from file which is not present
+    for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
+      if (null != blocksChunkHolder.getDimensionDataChunk()[i]) {
+        dimensionColumnDataChunk[i] = blocksChunkHolder.getDimensionDataChunk()[i];
+      }
+    }
+    for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) {
+      System.arraycopy(projectionListDimensionChunk, allSelectedDimensionBlocksIndexes[i][0],
+          dimensionColumnDataChunk, allSelectedDimensionBlocksIndexes[i][0],
+          allSelectedDimensionBlocksIndexes[i][1] + 1 - allSelectedDimensionBlocksIndexes[i][0]);
+    }
+    MeasureColumnDataChunk[] measureColumnDataChunk =
+        new MeasureColumnDataChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
+    int[][] allSelectedMeasureBlocksIndexes =
+        blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
+    MeasureColumnDataChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
+        .getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes);
+    // read the measure chunk blocks which is not present
+    for (int i = 0; i < measureColumnDataChunk.length; i++) {
+      if (null != blocksChunkHolder.getMeasureDataChunk()[i]) {
+        measureColumnDataChunk[i] = blocksChunkHolder.getMeasureDataChunk()[i];
+      }
+    }
+    for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) {
+      System.arraycopy(projectionListMeasureChunk, allSelectedMeasureBlocksIndexes[i][0],
+          measureColumnDataChunk, allSelectedMeasureBlocksIndexes[i][0],
+          allSelectedMeasureBlocksIndexes[i][1] + 1 - allSelectedMeasureBlocksIndexes[i][0]);
+    }
+    scannedResult.setDimensionChunks(dimensionColumnDataChunk);
+    scannedResult.setIndexes(indexes);
+    scannedResult.setMeasureChunks(measureColumnDataChunk);
+    scannedResult.setNumberOfRows(indexes.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
new file mode 100644
index 0000000..cc0838f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.scanner.impl;
+
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
+import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+
+/**
+ * Non filter processor which will be used for non filter query
+ * In case of non filter query we just need to read all the blocks requested in the
+ * query and pass it to scanned result
+ */
+public class NonFilterScanner extends AbstractBlockletScanner {
+
+  public NonFilterScanner(BlockExecutionInfo blockExecutionInfo,
+                          QueryStatisticsModel queryStatisticsModel) {
+    super(blockExecutionInfo);
+    // as its a non filter query creating a non filter query scanned result object
+    scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+    super.queryStatisticsModel = queryStatisticsModel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
new file mode 100644
index 0000000..68eb946
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.wrappers;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+/**
+ * This class will store the dimension column data when query is executed
+ * This can be used as a key for aggregation
+ */
+public class ByteArrayWrapper implements Comparable<ByteArrayWrapper> {
+
+  /**
+   * to store key which is generated using
+   * key generator
+   */
+  protected byte[] dictionaryKey;
+
+  /**
+   * to store no dictionary column data
+   */
+  protected byte[][] complexTypesKeys;
+
+  /**
+   * to store no dictionary column data
+   */
+  protected byte[][] noDictionaryKeys;
+
+  /**
+   * contains value of implicit columns in byte array format
+   */
+  protected byte[] implicitColumnByteArray;
+
+  public ByteArrayWrapper() {
+  }
+
+  /**
+   * @return the dictionaryKey
+   */
+  public byte[] getDictionaryKey() {
+    return dictionaryKey;
+  }
+
+  /**
+   * @param dictionaryKey the dictionaryKey to set
+   */
+  public void setDictionaryKey(byte[] dictionaryKey) {
+    this.dictionaryKey = dictionaryKey;
+  }
+
+  /**
+   * @param noDictionaryKeys the noDictionaryKeys to set
+   */
+  public void setNoDictionaryKeys(byte[][] noDictionaryKeys) {
+    this.noDictionaryKeys = noDictionaryKeys;
+  }
+
+  /**
+   * to get the no dictionary column data
+   *
+   * @param index of the no dictionary key
+   * @return no dictionary key for the index
+   */
+  public byte[] getNoDictionaryKeyByIndex(int index) {
+    return this.noDictionaryKeys[index];
+  }
+
+  /**
+   * to get the no dictionary column data
+   *
+   * @param index of the no dictionary key
+   * @return no dictionary key for the index
+   */
+  public byte[] getComplexTypeByIndex(int index) {
+    return this.complexTypesKeys[index];
+  }
+
+  /**
+   * to generate the hash code
+   */
+  @Override public int hashCode() {
+    // first generate the has code of the dictionary column
+    int len = dictionaryKey.length;
+    int result = 1;
+    for (int j = 0; j < len; j++) {
+      result = 31 * result + dictionaryKey[j];
+    }
+    // then no dictionary column
+    for (byte[] directSurrogateValue : noDictionaryKeys) {
+      for (int i = 0; i < directSurrogateValue.length; i++) {
+        result = 31 * result + directSurrogateValue[i];
+      }
+    }
+    // then for complex type
+    for (byte[] complexTypeKey : complexTypesKeys) {
+      for (int i = 0; i < complexTypeKey.length; i++) {
+        result = 31 * result + complexTypeKey[i];
+      }
+    }
+    return result;
+  }
+
+  /**
+   * to validate the two
+   *
+   * @param other object
+   */
+  @Override public boolean equals(Object other) {
+    if (null == other || !(other instanceof ByteArrayWrapper)) {
+      return false;
+    }
+    boolean result = false;
+    // Comparison will be as follows
+    // first compare the no dictionary column
+    // if it is not equal then return false
+    // if it is equal then compare the complex column
+    // if it is also equal then compare dictionary column
+    byte[][] noDictionaryKeysOther = ((ByteArrayWrapper) other).noDictionaryKeys;
+    if (noDictionaryKeysOther.length != noDictionaryKeys.length) {
+      return false;
+    } else {
+      for (int i = 0; i < noDictionaryKeys.length; i++) {
+        result = UnsafeComparer.INSTANCE.equals(noDictionaryKeys[i], noDictionaryKeysOther[i]);
+        if (!result) {
+          return false;
+        }
+      }
+    }
+
+    byte[][] complexTypesKeysOther = ((ByteArrayWrapper) other).complexTypesKeys;
+    if (complexTypesKeysOther.length != complexTypesKeys.length) {
+      return false;
+    } else {
+      for (int i = 0; i < complexTypesKeys.length; i++) {
+        result = UnsafeComparer.INSTANCE.equals(complexTypesKeys[i], complexTypesKeysOther[i]);
+        if (!result) {
+          return false;
+        }
+      }
+    }
+
+    return UnsafeComparer.INSTANCE.equals(dictionaryKey, ((ByteArrayWrapper) other).dictionaryKey);
+  }
+
+  /**
+   * Compare method for ByteArrayWrapper class this will used to compare Two
+   * ByteArrayWrapper data object, basically it will compare two byte array
+   *
+   * @param other ArrayWrapper Object
+   */
+  @Override public int compareTo(ByteArrayWrapper other) {
+    // compare will be as follows
+    //compare dictionary column
+    // then no dictionary column
+    // then complex type column data
+    int compareTo = UnsafeComparer.INSTANCE.compareTo(dictionaryKey, other.dictionaryKey);
+    if (compareTo == 0) {
+      for (int i = 0; i < noDictionaryKeys.length; i++) {
+        compareTo =
+            UnsafeComparer.INSTANCE.compareTo(noDictionaryKeys[i], other.noDictionaryKeys[i]);
+        if (compareTo != 0) {
+          return compareTo;
+        }
+      }
+    }
+    if (compareTo == 0) {
+      for (int i = 0; i < complexTypesKeys.length; i++) {
+        compareTo =
+            UnsafeComparer.INSTANCE.compareTo(complexTypesKeys[i], other.complexTypesKeys[i]);
+        if (compareTo != 0) {
+          return compareTo;
+        }
+      }
+    }
+    return compareTo;
+  }
+
+  /**
+   * @return the complexTypesKeys
+   */
+  public byte[][] getComplexTypesKeys() {
+    return complexTypesKeys;
+  }
+
+  /**
+   * @param complexTypesKeys the complexTypesKeys to set
+   */
+  public void setComplexTypesKeys(byte[][] complexTypesKeys) {
+    this.complexTypesKeys = complexTypesKeys;
+  }
+
+  /**
+   * @return
+   */
+  public byte[] getImplicitColumnByteArray() {
+    return implicitColumnByteArray;
+  }
+
+  /**
+   * @param implicitColumnByteArray
+   */
+  public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) {
+    this.implicitColumnByteArray = implicitColumnByteArray;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java b/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java
new file mode 100644
index 0000000..bc63d01
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service;
+
+import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator;
+import org.apache.carbondata.core.service.impl.DictionaryFactory;
+import org.apache.carbondata.core.service.impl.PathFactory;
+
+/**
+ * Interface to get services
+ */
+public class CarbonCommonFactory {
+
+  /**
+   * @return dictionary service
+   */
+  public static DictionaryService getDictionaryService() {
+    return DictionaryFactory.getInstance();
+  }
+
+  /**
+   * @return path service
+   */
+  public static PathService getPathService() {
+    return PathFactory.getInstance();
+  }
+
+  /**
+   * @return unique id generator
+   */
+  public static ColumnUniqueIdService getColumnUniqueIdGenerator() {
+    return ColumnUniqueIdGenerator.getInstance();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java b/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
index cc974f0..fb4b981 100644
--- a/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
+++ b/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
@@ -18,7 +18,7 @@
  */
 package org.apache.carbondata.core.service;
 
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 /**
  * Column Unique id generator

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java b/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
index 14ec459..eabdf82 100644
--- a/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
+++ b/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
@@ -18,8 +18,8 @@
  */
 package org.apache.carbondata.core.service;
 
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
 import org.apache.carbondata.core.reader.CarbonDictionaryReader;
 import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/service/PathService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/PathService.java b/core/src/main/java/org/apache/carbondata/core/service/PathService.java
index 0a27dd4..0f8f97e 100644
--- a/core/src/main/java/org/apache/carbondata/core/service/PathService.java
+++ b/core/src/main/java/org/apache/carbondata/core/service/PathService.java
@@ -18,8 +18,8 @@
  */
 package org.apache.carbondata.core.service;
 
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
  * Create helper to get path details

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java b/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
new file mode 100644
index 0000000..b79f9db
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service.impl;
+
+import java.util.UUID;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.service.ColumnUniqueIdService;
+
+/**
+ * It returns unique id given column
+ */
+public class ColumnUniqueIdGenerator implements ColumnUniqueIdService {
+
+  private static ColumnUniqueIdService columnUniqueIdService = new ColumnUniqueIdGenerator();
+
+  @Override public String generateUniqueId(String databaseName, ColumnSchema columnSchema) {
+    return UUID.randomUUID().toString();
+  }
+
+  public static ColumnUniqueIdService getInstance() {
+    return columnUniqueIdService;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java b/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java
new file mode 100644
index 0000000..114cb50
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service.impl;
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
+import org.apache.carbondata.core.reader.CarbonDictionaryReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryReaderImpl;
+import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader;
+import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReaderImpl;
+import org.apache.carbondata.core.service.DictionaryService;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
+
+/**
+ * service to get dictionary reader and writer
+ */
+public class DictionaryFactory implements DictionaryService {
+
+  private static DictionaryService dictService = new DictionaryFactory();
+
+  /**
+   * get dictionary writer
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionaryWriter getDictionaryWriter(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionaryWriterImpl(carbonStorePath, carbonTableIdentifier, columnIdentifier);
+  }
+
+  /**
+   * get dictionary sort index writer
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionarySortIndexWriter getDictionarySortIndexWriter(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, columnIdentifier,
+        carbonStorePath);
+  }
+
+  /**
+   * get dictionary metadata reader
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionaryMetadataReader getDictionaryMetadataReader(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionaryMetadataReaderImpl(carbonStorePath, carbonTableIdentifier,
+        columnIdentifier);
+  }
+
+  /**
+   * get dictionary reader
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionaryReader getDictionaryReader(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionaryReaderImpl(carbonStorePath, carbonTableIdentifier, columnIdentifier);
+  }
+
+  /**
+   * get dictionary sort index reader
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionarySortIndexReader getDictionarySortIndexReader(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, columnIdentifier,
+        carbonStorePath);
+  }
+
+  public static DictionaryService getInstance() {
+    return dictService;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java b/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java
new file mode 100644
index 0000000..f159da7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service.impl;
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Create helper to get path details
+ */
+public class PathFactory implements PathService {
+
+  private static PathService pathService = new PathFactory();
+
+  /**
+   * @param storeLocation
+   * @param tableIdentifier
+   * @return store path related to tables
+   */
+  @Override public CarbonTablePath getCarbonTablePath(
+      String storeLocation, CarbonTableIdentifier tableIdentifier) {
+    return CarbonStorePath.getCarbonTablePath(storeLocation, tableIdentifier);
+  }
+
+  public static PathService getInstance() {
+    return pathService;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java
new file mode 100644
index 0000000..f3c02df
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.stats;
+
+/**
+ * Class will be used to record and log the query statistics
+ */
+public class DriverQueryStatisticsRecorderDummy implements QueryStatisticsRecorder{
+
+  private DriverQueryStatisticsRecorderDummy() {
+
+  }
+
+  private static DriverQueryStatisticsRecorderDummy carbonLoadStatisticsImplInstance =
+      new DriverQueryStatisticsRecorderDummy();
+
+  public static DriverQueryStatisticsRecorderDummy getInstance() {
+    return carbonLoadStatisticsImplInstance;
+  }
+
+  public void recordStatistics(QueryStatistic statistic) {
+
+  }
+
+  public void logStatistics() {
+
+  }
+
+  public void logStatisticsAsTableExecutor() {
+
+  }
+
+  /**
+   * Below method will be used to add the statistics
+   *
+   * @param statistic
+   */
+  public synchronized void recordStatisticsForDriver(QueryStatistic statistic, String queryId) {
+
+  }
+
+  /**
+   * Below method will be used to show statistic log as table
+   */
+  public void logStatisticsAsTableDriver() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java
new file mode 100644
index 0000000..35812dd
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.stats;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import static org.apache.carbondata.core.util.CarbonUtil.printLine;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Class will be used to record and log the query statistics
+ */
+public class DriverQueryStatisticsRecorderImpl implements QueryStatisticsRecorder{
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DriverQueryStatisticsRecorderImpl.class.getName());
+
+  /**
+   * singleton QueryStatisticsRecorder for driver
+   */
+  private Map<String, List<QueryStatistic>> queryStatisticsMap;
+
+  /**
+   * lock for log statistics table
+   */
+  private static final Object lock = new Object();
+
+  private DriverQueryStatisticsRecorderImpl() {
+    // use ConcurrentHashMap, it is thread-safe
+    queryStatisticsMap = new ConcurrentHashMap<String, List<QueryStatistic>>();
+  }
+
+  private static DriverQueryStatisticsRecorderImpl carbonLoadStatisticsImplInstance =
+      new DriverQueryStatisticsRecorderImpl();
+
+  public static DriverQueryStatisticsRecorderImpl getInstance() {
+    return carbonLoadStatisticsImplInstance;
+  }
+
+  public void recordStatistics(QueryStatistic statistic) {
+
+  }
+
+  public void logStatistics() {
+
+  }
+
+  public void logStatisticsAsTableExecutor() {
+
+  }
+
+  /**
+   * Below method will be used to add the statistics
+   *
+   * @param statistic
+   */
+  public void recordStatisticsForDriver(QueryStatistic statistic, String queryId) {
+    synchronized (lock) {
+      // refresh query Statistics Map
+      if (queryStatisticsMap.get(queryId) != null) {
+        queryStatisticsMap.get(queryId).add(statistic);
+      } else {
+        List<QueryStatistic> newQueryStatistics = new ArrayList<QueryStatistic>();
+        newQueryStatistics.add(statistic);
+        queryStatisticsMap.put(queryId, newQueryStatistics);
+      }
+    }
+  }
+
+  /**
+   * Below method will be used to show statistic log as table
+   */
+  public void logStatisticsAsTableDriver() {
+    synchronized (lock) {
+      Iterator<Map.Entry<String, List<QueryStatistic>>> entries =
+              queryStatisticsMap.entrySet().iterator();
+      while (entries.hasNext()) {
+        Map.Entry<String, List<QueryStatistic>> entry = entries.next();
+        String queryId = entry.getKey();
+        // clear the unknown query statistics
+        if(StringUtils.isEmpty(queryId)) {
+          entries.remove();
+        } else {
+          // clear the timeout query statistics
+          long interval = System.nanoTime() - Long.parseLong(queryId);
+          if (interval > QueryStatisticsConstants.CLEAR_STATISTICS_TIMEOUT) {
+            entries.remove();
+          } else {
+            // print sql_parse_t,load_meta_t,block_allocation_t,block_identification_t
+            // or just print block_allocation_t,block_identification_t
+            if (entry.getValue().size() >= 2) {
+              String tableInfo = collectDriverStatistics(entry.getValue(), queryId);
+              if (null != tableInfo) {
+                LOGGER.statistic(tableInfo);
+                // clear the statistics that has been printed
+                entries.remove();
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Below method will parse queryStatisticsMap and put time into table
+   */
+  public String collectDriverStatistics(List<QueryStatistic> statisticsList, String queryId) {
+    String sql_parse_time = "";
+    String load_meta_time = "";
+    String load_blocks_time = "";
+    String block_allocation_time = "";
+    String block_identification_time = "";
+    long driver_part_time_tmp = 0L;
+    long driver_part_time_tmp2 = 0L;
+    long load_blocks_time_tmp = 0L;
+    String splitChar = " ";
+    try {
+      // get statistic time from the QueryStatistic
+      for (QueryStatistic statistic : statisticsList) {
+        switch (statistic.getMessage()) {
+          case QueryStatisticsConstants.SQL_PARSE:
+            sql_parse_time += statistic.getTimeTaken() + splitChar;
+            driver_part_time_tmp += statistic.getTimeTaken();
+            break;
+          case QueryStatisticsConstants.LOAD_META:
+            load_meta_time += statistic.getTimeTaken() + splitChar;
+            driver_part_time_tmp += statistic.getTimeTaken();
+            break;
+          case QueryStatisticsConstants.LOAD_BLOCKS_DRIVER:
+            // multi segments will generate multi load_blocks_time
+            load_blocks_time_tmp += statistic.getTimeTaken();
+            driver_part_time_tmp += statistic.getTimeTaken();
+            driver_part_time_tmp2 += statistic.getTimeTaken();
+            break;
+          case QueryStatisticsConstants.BLOCK_ALLOCATION:
+            block_allocation_time += statistic.getTimeTaken() + splitChar;
+            driver_part_time_tmp += statistic.getTimeTaken();
+            driver_part_time_tmp2 += statistic.getTimeTaken();
+            break;
+          case QueryStatisticsConstants.BLOCK_IDENTIFICATION:
+            block_identification_time += statistic.getTimeTaken() + splitChar;
+            driver_part_time_tmp += statistic.getTimeTaken();
+            driver_part_time_tmp2 += statistic.getTimeTaken();
+            break;
+          default:
+            break;
+        }
+      }
+      load_blocks_time = load_blocks_time_tmp + splitChar;
+      String driver_part_time = driver_part_time_tmp + splitChar;
+      // structure the query statistics info table
+      StringBuilder tableInfo = new StringBuilder();
+      int len1 = 8;
+      int len2 = 20;
+      int len3 = 21;
+      int len4 = 24;
+      String line = "+" + printLine("-", len1) + "+" + printLine("-", len2) + "+" +
+          printLine("-", len3) + "+" + printLine("-", len4) + "+";
+      String line2 = "|" + printLine(" ", len1) + "+" + printLine("-", len2) + "+" +
+          printLine(" ", len3) + "+" + printLine("-", len4) + "+";
+      // table header
+      tableInfo.append(line).append("\n");
+      tableInfo.append("|" + printLine(" ", (len1 - "Module".length())) + "Module" + "|" +
+          printLine(" ", (len2 - "Operation Step".length())) + "Operation Step" + "|" +
+          printLine(" ", (len3 - "Total Query Cost".length())) + "Total Query Cost" + "|" +
+          printLine(" ", (len4 - "Query Cost".length())) + "Query Cost" + "|" + "\n");
+      tableInfo.append(line).append("\n");
+      // print sql_parse_t,load_meta_t,block_allocation_t,block_identification_t
+      if (!StringUtils.isEmpty(sql_parse_time) &&
+          !StringUtils.isEmpty(load_meta_time) &&
+          !StringUtils.isEmpty(block_allocation_time) &&
+          !StringUtils.isEmpty(block_identification_time)) {
+        tableInfo.append("|" + printLine(" ", len1) + "|" +
+            printLine(" ", (len2 - "SQL parse".length())) + "SQL parse" + "|" +
+            printLine(" ", len3) + "|" +
+            printLine(" ", (len4 - sql_parse_time.length())) + sql_parse_time + "|" + "\n");
+        tableInfo.append(line2).append("\n");
+        tableInfo.append("|" + printLine(" ", (len1 - "Driver".length())) + "Driver" + "|" +
+            printLine(" ", (len2 - "Load meta data".length())) + "Load meta data" + "|" +
+            printLine(" ", (len3 - driver_part_time.length())) + driver_part_time + "|" +
+            printLine(" ", (len4 - load_meta_time.length())) +
+            load_meta_time + "|" + "\n");
+        tableInfo.append(line2).append("\n");
+        tableInfo.append("|" + printLine(" ", (len1 - "Part".length())) + "Part" + "|" +
+                printLine(" ", (len2 - "Load blocks driver".length())) +
+                "Load blocks driver" + "|" +
+                printLine(" ", len3) + "|" +
+                printLine(" ", (len4 - load_blocks_time.length())) +
+                load_blocks_time + "|" + "\n");
+        tableInfo.append(line2).append("\n");
+        tableInfo.append("|" + printLine(" ", len1 ) + "|" +
+            printLine(" ", (len2 - "Block allocation".length())) + "Block allocation" + "|" +
+            printLine(" ", len3) + "|" +
+            printLine(" ", (len4 - block_allocation_time.length())) +
+            block_allocation_time + "|" + "\n");
+        tableInfo.append(line2).append("\n");
+        tableInfo.append("|" +
+            printLine(" ", len1) + "|" +
+            printLine(" ", (len2 - "Block identification".length())) +
+            "Block identification" + "|" +
+            printLine(" ", len3) + "|" +
+            printLine(" ", (len4 - block_identification_time.length())) +
+            block_identification_time + "|" + "\n");
+        tableInfo.append(line).append("\n");
+
+        // show query statistic as "query id" + "table"
+        return "Print query statistic for query id: " + queryId + "\n" + tableInfo.toString();
+      } else if (!StringUtils.isEmpty(block_allocation_time) &&
+          !StringUtils.isEmpty(block_identification_time)) {
+        // when we can't get sql parse time, we only print the last two
+        driver_part_time = driver_part_time_tmp2 + splitChar;
+        tableInfo.append("|" + printLine(" ", (len1 - "Driver".length())) + "Driver" + "|" +
+                printLine(" ", (len2 - "Load blocks driver".length())) +
+                "Load blocks driver" + "|" +
+                printLine(" ", len3) + "|" +
+                printLine(" ", (len4 - load_blocks_time.length())) +
+                load_blocks_time + "|" + "\n");
+        tableInfo.append(line2).append("\n");
+        tableInfo.append("|" + printLine(" ", (len1 - "Part".length())) + "Part" + "|" +
+            printLine(" ", (len2 - "Block allocation".length())) + "Block allocation" + "|" +
+            printLine(" ", (len3 - driver_part_time.length())) + driver_part_time + "|" +
+            printLine(" ", (len4 - block_allocation_time.length())) +
+            block_allocation_time + "|" + "\n");
+        tableInfo.append(line2).append("\n");
+        tableInfo.append("|" +
+            printLine(" ", len1) + "|" +
+            printLine(" ", (len2 - "Block identification".length())) +
+            "Block identification" + "|" +
+            printLine(" ", len3) + "|" +
+            printLine(" ", (len4 - block_identification_time.length())) +
+            block_identification_time + "|" + "\n");
+        tableInfo.append(line).append("\n");
+
+        // show query statistic as "query id" + "table"
+        return "Print query statistic for query id: " + queryId + "\n" + tableInfo.toString();
+      }
+
+      return null;
+    } catch (Exception ex) {
+      return "Put statistics into table failed, catch exception: " + ex.getMessage();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java
new file mode 100644
index 0000000..ccdc48b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.stats;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Wrapper class to maintain the query statistics for each phase of the query
+ */
+public class QueryStatistic implements Serializable {
+
+  /**
+   * serialization id
+   */
+  private static final long serialVersionUID = -5667106646135905848L;
+
+  /**
+   * statistic message
+   */
+  private String message;
+
+  /**
+   * total time take of the phase
+   */
+  private long timeTaken;
+
+  /**
+   * starttime of the phase
+   */
+  private long startTime;
+
+  /**
+   * number of count
+   */
+  private long count;
+
+  public QueryStatistic() {
+    this.startTime = System.currentTimeMillis();
+  }
+
+  /**
+   * below method will be used to add the statistic
+   *
+   * @param message     Statistic message
+   * @param currentTime current time
+   */
+  public void addStatistics(String message, long currentTime) {
+    this.timeTaken = currentTime - startTime;
+    this.message = message;
+  }
+
+  /**
+   * Below method will be used to add fixed time statistic.
+   * For example total time taken for scan or result preparation
+   *
+   * @param message   statistic message
+   * @param timetaken
+   */
+  public void addFixedTimeStatistic(String message, long timetaken) {
+    this.timeTaken = timetaken;
+    this.message = message;
+  }
+
+  public void addCountStatistic(String message, long count) {
+    this.timeTaken = -1;
+    this.count = count;
+    this.message = message;
+  }
+
+  /**
+   * Below method will be used to get the statistic message, which will
+   * be used to log
+   *
+   * @param queryWithTaskId query with task id to append in the message
+   * @return statistic message
+   */
+  public String getStatistics(String queryWithTaskId) {
+    if (StringUtils.isEmpty(queryWithTaskId)) {
+      return message + timeTaken;
+    }
+    return message + " for the taskid : " + queryWithTaskId + " Is : " + timeTaken;
+  }
+
+  public String getMessage() {
+    return this.message;
+  }
+
+  public long getTimeTaken() {
+    return  this.timeTaken;
+  }
+
+  public long getCount() {
+    return this.count;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
new file mode 100644
index 0000000..1360abc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.stats;
+
+public interface QueryStatisticsConstants {
+
+  // driver side
+  String SQL_PARSE = "Time taken to parse sql In Driver Side";
+
+  String LOAD_META = "Time taken to load meta data In Driver Side";
+
+  String LOAD_BLOCKS_DRIVER = "Time taken to load the Block(s) In Driver Side";
+
+  String BLOCK_ALLOCATION = "Total Time taken in block(s) allocation";
+
+  String BLOCK_IDENTIFICATION = "Time taken to identify Block(s) to scan";
+
+  // executor side
+  String EXECUTOR_PART =
+      "Total Time taken to execute the query in executor Side";
+
+  String LOAD_BLOCKS_EXECUTOR = "Time taken to load the Block(s) In Executor";
+
+  String SCAN_BLOCKS_NUM = "The num of blocks scanned";
+
+  String SCAN_BLOCKS_TIME = "Time taken to scan blocks";
+
+  String LOAD_DICTIONARY = "Time taken to load the Dictionary In Executor";
+
+  String PREPARE_RESULT = "Total Time taken to prepare query result";
+
+  String RESULT_SIZE = "The size of query result";
+
+  String TOTAL_BLOCKLET_NUM = "The num of total blocklet";
+
+  String VALID_SCAN_BLOCKLET_NUM = "The num of valid scanned blocklet";
+
+  // clear no-use statistics timeout
+  long CLEAR_STATISTICS_TIMEOUT = 60 * 1000 * 1000000L;
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
new file mode 100644
index 0000000..d80ca50
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueryStatisticsModel {
+  private QueryStatisticsRecorder recorder;
+  private Map<String, QueryStatistic> statisticsTypeAndObjMap =
+      new HashMap<String, QueryStatistic>();
+
+  public QueryStatisticsRecorder getRecorder() {
+    return recorder;
+  }
+
+  public void setRecorder(QueryStatisticsRecorder recorder) {
+    this.recorder = recorder;
+  }
+
+  public Map<String, QueryStatistic> getStatisticsTypeAndObjMap() {
+    return statisticsTypeAndObjMap;
+  }
+
+  public void setStatisticsTypeAndObjMap(Map<String, QueryStatistic> statisticsTypeAndObjMap) {
+    this.statisticsTypeAndObjMap = statisticsTypeAndObjMap;
+  }
+}



Mime
View raw message