carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [04/56] [abbrv] incubator-carbondata git commit: [Issue 618]Supported Spark 1.6 in Carbondata (#670)
Date Thu, 23 Jun 2016 14:15:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java
new file mode 100644
index 0000000..b901878
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.sum;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.util.DataTypeUtil;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+public class SumBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
+
+  /**
+   * serialVersionUID
+   */
+  private static final long serialVersionUID = 623750056131364540L;
+
+  /**
+   * aggregate value
+   */
+  private BigDecimal aggVal;
+
+  public SumBigDecimalAggregator() {
+    aggVal = new BigDecimal(0);
+    firstTime = false;
+  }
+
+  /**
+   * This method will update the aggVal it will add new value to aggVal
+   *
+   * @param newVal new value
+   */
+  @Override public void agg(Object newVal) {
+    if (firstTime) {
+      aggVal = (BigDecimal) newVal;
+      firstTime = false;
+    } else {
+      aggVal = aggVal.add((BigDecimal) newVal);
+    }
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      BigDecimal value = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+      aggVal = aggVal.add(value);
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Below method will be used to get the value byte array
+   */
+  @Override public byte[] getByteArray() {
+    if (firstTime) {
+      return new byte[0];
+    }
+    byte[] bytes = DataTypeUtil.bigDecimalToByte(aggVal);
+    ByteBuffer allocate = ByteBuffer.allocate(4 + bytes.length);
+
+    allocate.putInt(bytes.length);
+    allocate.put(bytes);
+    allocate.rewind();
+    return allocate.array();
+  }
+
+  /**
+   * This method will return aggVal
+   *
+   * @return sum value
+   */
+  @Override public BigDecimal getBigDecimalValue() {
+    return aggVal;
+  }
+
+  /* Merge the value, it will update the sum aggregate value it will add new
+   * value to aggVal
+   *
+   * @param aggregator
+   *            SumAggregator
+   *
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    if (!aggregator.isFirstTime()) {
+      agg(aggregator.getBigDecimalValue());
+    }
+  }
+
+  /**
+   * This method return the sum value as an object
+   *
+   * @return sum value as an object
+   */
+  @Override public Object getValueObject() {
+    return aggVal;
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    aggVal = (BigDecimal) newValue;
+  }
+
+  @Override public void readData(DataInput inPut) throws IOException {
+    firstTime = inPut.readBoolean();
+    aggVal = new BigDecimal(inPut.readUTF());
+  }
+
+  @Override public void writeData(DataOutput output) throws IOException {
+    output.writeBoolean(firstTime);
+    output.writeUTF(aggVal.toString());
+
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    SumBigDecimalAggregator aggr = new SumBigDecimalAggregator();
+    aggr.aggVal = aggVal;
+    aggr.firstTime = firstTime;
+    return aggr;
+  }
+
+  @Override public void merge(byte[] value) {
+    if (0 == value.length) {
+      return;
+    }
+
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    byte[] valueByte = new byte[buffer.getInt()];
+    buffer.get(valueByte);
+    BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte);
+    aggVal = aggVal.add(valueBigDecimal);
+    firstTime = false;
+  }
+
+  public String toString() {
+    return aggVal + "";
+  }
+
+  @Override public int compareTo(MeasureAggregator o) {
+    BigDecimal value = getBigDecimalValue();
+    BigDecimal otherVal = o.getBigDecimalValue();
+    return value.compareTo(otherVal);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof SumBigDecimalAggregator)) {
+      return false;
+    }
+    SumBigDecimalAggregator o = (SumBigDecimalAggregator) obj;
+    return getBigDecimalValue().equals(o.getBigDecimalValue());
+  }
+
+  @Override public int hashCode() {
+    return getBigDecimalValue().hashCode();
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new SumBigDecimalAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java
new file mode 100644
index 0000000..777318d
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.sum;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+public class SumDoubleAggregator extends AbstractMeasureAggregatorBasic {
+
+  /**
+   * serialVersionUID
+   */
+  private static final long serialVersionUID = 623750056131364540L;
+
+  /**
+   * aggregate value
+   */
+  private double aggVal;
+
+  /**
+   * This method will update the aggVal it will add new value to aggVal
+   *
+   * @param newVal new value
+   */
+  @Override public void agg(double newVal) {
+    aggVal += newVal;
+    firstTime = false;
+  }
+
+  /**
+   * This method will update the aggVal it will add new value to aggVal
+   *
+   * @param newVal new value
+   */
+  @Override public void agg(Object newVal) {
+    aggVal += ((Number) newVal).doubleValue();
+    firstTime = false;
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      aggVal+= dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Below method will be used to get the value byte array
+   */
+  @Override public byte[] getByteArray() {
+    if (firstTime) {
+      return new byte[0];
+    }
+    ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
+    buffer.putDouble(aggVal);
+    return buffer.array();
+  }
+
+  /**
+   * This method will return aggVal
+   *
+   * @return sum value
+   */
+
+  @Override public Double getDoubleValue() {
+    return aggVal;
+  }
+
+  /* Merge the value, it will update the sum aggregate value it will add new
+   * value to aggVal
+   *
+   * @param aggregator  SumAggregator
+   *
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    if (!aggregator.isFirstTime()) {
+      agg(aggregator.getDoubleValue());
+    }
+  }
+
+  /**
+   * This method return the sum value as an object
+   *
+   * @return sum value as an object
+   */
+  @Override public Object getValueObject() {
+    return aggVal;
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    aggVal = (Double) newValue;
+  }
+
+  @Override public boolean isFirstTime() {
+    return firstTime;
+  }
+
+  @Override public void readData(DataInput inPut) throws IOException {
+    firstTime = inPut.readBoolean();
+    aggVal = inPut.readDouble();
+  }
+
+  @Override public void writeData(DataOutput output) throws IOException {
+    output.writeBoolean(firstTime);
+    output.writeDouble(aggVal);
+
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    SumDoubleAggregator aggr = new SumDoubleAggregator();
+    aggr.aggVal = aggVal;
+    aggr.firstTime = firstTime;
+    return aggr;
+  }
+
+  @Override public void merge(byte[] value) {
+    if (0 == value.length) {
+      return;
+    }
+    aggVal += ByteBuffer.wrap(value).getDouble();
+    firstTime = false;
+  }
+
+  public String toString() {
+    return aggVal + "";
+  }
+
+  @Override public int compareTo(MeasureAggregator o) {
+    double value = getDoubleValue();
+    double otherVal = o.getDoubleValue();
+    if (value > otherVal) {
+      return 1;
+    }
+    if (value < otherVal) {
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if(!(obj instanceof SumDoubleAggregator)) {
+      return false;
+    }
+    SumDoubleAggregator o = (SumDoubleAggregator) obj;
+    return getDoubleValue().equals(o.getDoubleValue());
+  }
+
+  @Override public int hashCode() {
+    return getDoubleValue().hashCode();
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new SumDoubleAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java
new file mode 100644
index 0000000..7c245d9
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.sum;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+public class SumLongAggregator extends AbstractMeasureAggregatorBasic {
+
+  /**
+   * serialVersionUID
+   */
+  private static final long serialVersionUID = 623750056131364540L;
+
+  /**
+   * aggregate value
+   */
+  private long aggVal;
+
+  /**
+   * This method will update the aggVal it will add new value to aggVal
+   *
+   * @param newVal new value
+   */
+  @Override public void agg(Object newVal) {
+    aggVal += (long) newVal;
+    firstTime = false;
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      aggVal+= dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Below method will be used to get the value byte array
+   */
+  @Override public byte[] getByteArray() {
+    if (firstTime) {
+      return new byte[0];
+    }
+    ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.LONG_SIZE_IN_BYTE);
+    buffer.putLong(aggVal);
+    return buffer.array();
+  }
+
+  /**
+   * This method will return aggVal
+   *
+   * @return sum value
+   */
+  @Override public Long getLongValue() {
+    return aggVal;
+  }
+
+  /* Merge the value, it will update the sum aggregate value it will add new
+   * value to aggVal
+   *
+   * @param aggregator SumAggregator
+   *
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    if (!aggregator.isFirstTime()) {
+      agg(aggregator.getLongValue());
+    }
+  }
+
+  /**
+   * This method return the sum value as an object
+   *
+   * @return sum long value as an object
+   */
+  @Override public Object getValueObject() {
+    return aggVal;
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    aggVal = (long) newValue;
+  }
+
+  @Override public void readData(DataInput inPut) throws IOException {
+    firstTime = inPut.readBoolean();
+    aggVal = inPut.readLong();
+  }
+
+  @Override public void writeData(DataOutput output) throws IOException {
+    output.writeBoolean(firstTime);
+    output.writeLong(aggVal);
+
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    SumLongAggregator aggr = new SumLongAggregator();
+    aggr.aggVal = aggVal;
+    aggr.firstTime = firstTime;
+    return aggr;
+  }
+
+  @Override public void merge(byte[] value) {
+    if (0 == value.length) {
+      return;
+    }
+    aggVal += ByteBuffer.wrap(value).getLong();
+    firstTime = false;
+  }
+
+  public String toString() {
+    return aggVal + "";
+  }
+
+  @Override public int compareTo(MeasureAggregator o) {
+    Long value = getLongValue();
+    Long otherVal = o.getLongValue();
+    if (value > otherVal) {
+      return 1;
+    }
+    if (value < otherVal) {
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if(!(obj instanceof SumLongAggregator)) {
+      return false;
+    }
+    SumLongAggregator o = (SumLongAggregator) obj;
+    return getLongValue().equals(o.getLongValue());
+  }
+
+  @Override public int hashCode() {
+    return getLongValue().hashCode();
+  }
+
+  @Override public MeasureAggregator getNew() {
+    // TODO Auto-generated method stub
+    return new SumLongAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java b/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java
index 483392e..b46c4de 100644
--- a/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java
+++ b/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java
@@ -24,28 +24,28 @@ import org.carbondata.core.carbon.metadata.datatype.DataType;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.query.aggregator.CustomMeasureAggregator;
 import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.impl.AvgBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.AvgDoubleAggregator;
-import org.carbondata.query.aggregator.impl.AvgLongAggregator;
-import org.carbondata.query.aggregator.impl.CountAggregator;
-import org.carbondata.query.aggregator.impl.DistinctCountAggregatorObjectSet;
-import org.carbondata.query.aggregator.impl.DistinctCountBigDecimalAggregatorObjectSet;
-import org.carbondata.query.aggregator.impl.DistinctCountLongAggregatorObjectSet;
-import org.carbondata.query.aggregator.impl.DummyBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.DummyDoubleAggregator;
-import org.carbondata.query.aggregator.impl.DummyLongAggregator;
-import org.carbondata.query.aggregator.impl.MaxAggregator;
-import org.carbondata.query.aggregator.impl.MaxBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.MaxLongAggregator;
-import org.carbondata.query.aggregator.impl.MinAggregator;
-import org.carbondata.query.aggregator.impl.MinBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.MinLongAggregator;
-import org.carbondata.query.aggregator.impl.SumBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.SumDistinctBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.SumDistinctDoubleAggregator;
-import org.carbondata.query.aggregator.impl.SumDistinctLongAggregator;
-import org.carbondata.query.aggregator.impl.SumDoubleAggregator;
-import org.carbondata.query.aggregator.impl.SumLongAggregator;
+import org.carbondata.query.aggregator.impl.avg.AvgBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.avg.AvgDoubleAggregator;
+import org.carbondata.query.aggregator.impl.avg.AvgLongAggregator;
+import org.carbondata.query.aggregator.impl.count.CountAggregator;
+import org.carbondata.query.aggregator.impl.distinct.DistinctCountAggregatorObjectSet;
+import org.carbondata.query.aggregator.impl.distinct.DistinctCountBigDecimalAggregatorObjectSet;
+import org.carbondata.query.aggregator.impl.distinct.DistinctCountLongAggregatorObjectSet;
+import org.carbondata.query.aggregator.impl.distinct.SumDistinctBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.distinct.SumDistinctDoubleAggregator;
+import org.carbondata.query.aggregator.impl.distinct.SumDistinctLongAggregator;
+import org.carbondata.query.aggregator.impl.dummy.DummyBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.dummy.DummyDoubleAggregator;
+import org.carbondata.query.aggregator.impl.dummy.DummyLongAggregator;
+import org.carbondata.query.aggregator.impl.max.MaxAggregator;
+import org.carbondata.query.aggregator.impl.max.MaxBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.max.MaxLongAggregator;
+import org.carbondata.query.aggregator.impl.min.MinAggregator;
+import org.carbondata.query.aggregator.impl.min.MinBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.min.MinLongAggregator;
+import org.carbondata.query.aggregator.impl.sum.SumBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.sum.SumDoubleAggregator;
+import org.carbondata.query.aggregator.impl.sum.SumLongAggregator;
 import org.carbondata.query.carbon.model.CustomAggregateExpression;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java b/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java
new file mode 100644
index 0000000..971e4cc
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.aggregator.dimension.impl;
+
+import java.nio.ByteBuffer;
+
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.carbon.aggregator.dimension.DimensionDataAggregator;
+import org.carbondata.query.carbon.executor.util.QueryUtil;
+import org.carbondata.query.carbon.model.DimensionAggregatorInfo;
+import org.carbondata.query.carbon.result.AbstractScannedResult;
+
+/**
+ * Class which will be used to aggregate the direct dictionary dimension data
+ */
+public class DirectDictionaryDimensionAggregator implements DimensionDataAggregator {
+
+  /**
+   * info object which store information about dimension to be aggregated
+   */
+  private DimensionAggregatorInfo dimensionAggeragtorInfo;
+
+  /**
+   * start index of the aggregator for current dimension column
+   */
+  private int aggregatorStartIndex;
+
+  /**
+   * buffer used to convert mdkey to surrogate key
+   */
+  private ByteBuffer buffer;
+
+  /**
+   * data index in the file
+   */
+  private int blockIndex;
+
+  /**
+   * to store index which will be used to aggregate
+   * number type value like sum avg
+   */
+  private int[] numberTypeAggregatorIndex;
+
+  /**
+   * DirectDictionaryGenerator
+   */
+  private DirectDictionaryGenerator directDictionaryGenerator;
+
+  /**
+   * to store index which will be used to aggregate
+   * actual type value like max, min, dictinct count
+   */
+  private int[] actualTypeAggregatorIndex;
+
+  public DirectDictionaryDimensionAggregator(DimensionAggregatorInfo dimensionAggeragtorInfo,
+      int aggregatorStartIndex, int blockIndex) {
+    this.dimensionAggeragtorInfo = dimensionAggeragtorInfo;
+    this.aggregatorStartIndex = aggregatorStartIndex;
+    this.blockIndex = blockIndex;
+    buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    numberTypeAggregatorIndex =
+        QueryUtil.getNumberTypeIndex(this.dimensionAggeragtorInfo.getAggList());
+    actualTypeAggregatorIndex =
+        QueryUtil.getActualTypeIndex(this.dimensionAggeragtorInfo.getAggList());
+    directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+        .getDirectDictionaryGenerator(this.dimensionAggeragtorInfo.getDim().getDataType());
+  }
+
+  /**
+   * Below method will be used to aggregate the dimension data
+   *
+   * @param scannedResult scanned result
+   * @param aggeragtor    aggregator used to aggregate the data
+   */
+  @Override public void aggregateDimensionData(AbstractScannedResult scannedResult,
+      MeasureAggregator[] aggeragtor) {
+    byte[] dimensionData = scannedResult.getDimensionKey(blockIndex);
+    int surrogateKey = CarbonUtil.getSurrogateKey(dimensionData, buffer);
+    Object dataBasedOnDataType =
+        (long) directDictionaryGenerator.getValueFromSurrogate(surrogateKey) / 1000;
+
+    if (actualTypeAggregatorIndex.length > 0) {
+      for (int j = 0; j < actualTypeAggregatorIndex.length; j++) {
+        aggeragtor[aggregatorStartIndex + actualTypeAggregatorIndex[j]].agg(dataBasedOnDataType);
+      }
+    }
+    if (numberTypeAggregatorIndex.length > 0) {
+      for (int j = 0; j < numberTypeAggregatorIndex.length; j++) {
+        aggeragtor[aggregatorStartIndex + numberTypeAggregatorIndex[j]].agg(dataBasedOnDataType);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java
index e13aee2..14c336d 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.carbondata.core.iterator.CarbonIterator;
 import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.impl.CountAggregator;
+import org.carbondata.query.aggregator.impl.count.CountAggregator;
 import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
 import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
index ebd90f9..789f77e 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
@@ -53,6 +53,7 @@ import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.core.util.CarbonUtilException;
 import org.carbondata.query.carbon.aggregator.dimension.DimensionDataAggregator;
 import org.carbondata.query.carbon.aggregator.dimension.impl.ColumnGroupDimensionsAggregator;
+import org.carbondata.query.carbon.aggregator.dimension.impl.DirectDictionaryDimensionAggregator;
 import org.carbondata.query.carbon.aggregator.dimension.impl.FixedLengthDimensionAggregator;
 import org.carbondata.query.carbon.aggregator.dimension.impl.VariableLengthDimensionAggregator;
 import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
@@ -724,9 +725,15 @@ public class QueryUtil {
         aggregatorStartIndex += numberOfAggregatorForColumnGroup;
         continue;
       } else {
+        if(CarbonUtil.hasEncoding(dim.getEncoder(), Encoding.DIRECT_DICTIONARY)){
+          dimensionDataAggregators.add(
+              new DirectDictionaryDimensionAggregator(entry.getValue().get(0),
+                  aggregatorStartIndex,
+                  dimensionToBlockIndexMapping.get(dim.getOrdinal())));
+        }
         // if it is a dictionary column than create a fixed length
         // aggeragtor
-        if (CarbonUtil
+        else if (CarbonUtil
             .hasEncoding(dim.getEncoder(), Encoding.DICTIONARY)) {
           dimensionDataAggregators.add(
               new FixedLengthDimensionAggregator(entry.getValue().get(0), null,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
index 0957e7a..5604ecd 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
@@ -32,9 +32,9 @@ import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerat
 import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.impl.CountAggregator;
-import org.carbondata.query.aggregator.impl.DistinctCountAggregator;
-import org.carbondata.query.aggregator.impl.DistinctStringCountAggregator;
+import org.carbondata.query.aggregator.impl.count.CountAggregator;
+import org.carbondata.query.aggregator.impl.distinct.DistinctCountAggregator;
+import org.carbondata.query.aggregator.impl.distinct.DistinctStringCountAggregator;
 import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
 import org.carbondata.query.carbon.model.DimensionAggregatorInfo;
 import org.carbondata.query.carbon.model.QueryDimension;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
index 69e1d2f..a549409 100644
--- a/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
+++ b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
@@ -63,7 +63,7 @@ object GenerateDictionaryExample {
     val tableName = carbonTableIdentifier.getTableName
     val carbonRelation = CarbonEnv.getInstance(carbonContext).carbonCatalog.
       lookupRelation1(Option(dataBaseName),
-        tableName, None) (carbonContext).asInstanceOf[CarbonRelation]
+        tableName) (carbonContext).asInstanceOf[CarbonRelation]
     val carbonTable = carbonRelation.cubeMeta.carbonTable
     val dimensions = carbonTable.getDimensionByTableName(tableName)
       .toArray.map(_.asInstanceOf[CarbonDimension])

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 1cd4be4..c4f09cc 100644
--- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -19,12 +19,13 @@ package org.apache.spark.sql.common.util
 
 import java.util.{Locale, TimeZone}
 
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import scala.collection.JavaConversions._
+
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
-import scala.collection.JavaConversions._
 
 class QueryTest extends PlanTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala
index ec23b80..bd7b596 100644
--- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala
+++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala
@@ -81,6 +81,34 @@ class AllDataTypesTestCase1 extends QueryTest with BeforeAndAfterAll {
         "Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber," +
         "Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId," +
         "gamePointId,gamePointDescription')")
+
+      sql(
+        "create table if not exists Carbon_automation_hive (imei string,deviceInformationId int," +
+        "MAC string,deviceColor string,device_backColor string,modelId string,marketName " +
+        "string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string," +
+        "productionDate timestamp,bomCode string,internalModels string, deliveryTime string, " +
+        "channelsId string, channelsName string , deliveryAreaId string, deliveryCountry " +
+        "string, deliveryProvince string, deliveryCity string,deliveryDistrict string, " +
+        "deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId " +
+        "string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict" +
+        " string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, " +
+        "Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, " +
+        "Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string," +
+        "Active_webTypeDataVerNumber string, Active_operatorsVersion string, " +
+        "Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, " +
+        "Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, " +
+        "Latest_province string, Latest_city string, Latest_district string, Latest_street " +
+        "string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion " +
+        "string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion " +
+        "string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, " +
+        "Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, " +
+        "Latest_operatorId string, , gamePointId int, gamePointDescription string" +
+        ") row format delimited fields terminated by ','"
+      )
+
+      sql("LOAD DATA LOCAL INPATH '" + currentDirectory + "/src/test/resources/100_olap.csv' INTO " +
+          "table Carbon_automation_hive ")
+
     } catch {
       case e: Exception => print("ERROR: DROP Carbon_automation_test ")
     }
@@ -88,6 +116,7 @@ class AllDataTypesTestCase1 extends QueryTest with BeforeAndAfterAll {
 
   override def afterAll {
     sql("drop cube Carbon_automation_test")
+    sql("drop table Carbon_automation_hive")
 
   }
 
@@ -853,7 +882,7 @@ class AllDataTypesTestCase1 extends QueryTest with BeforeAndAfterAll {
   test("select variance(deviceInformationId) as a   from Carbon_automation_test")({
     checkAnswer(
       sql("select variance(deviceInformationId) as a   from Carbon_automation_test"),
-      Seq(Row(9.31041555963636E9))
+      sql("select variance(deviceInformationId) as a   from Carbon_automation_hive")
     )
   }
   )

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala
index ab9121a..88ba722 100644
--- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala
+++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala
@@ -81,6 +81,32 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
         "Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber," +
         "Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId," +
         "gamePointId,gamePointDescription')")
+
+      sql(
+        "create table if not exists Carbon_automation_hive2(imei string,deviceInformationId int," +
+        "MAC string,deviceColor string,device_backColor string,modelId string,marketName " +
+        "string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string," +
+        "productionDate timestamp,bomCode string,internalModels string, deliveryTime string, " +
+        "channelsId string, channelsName string , deliveryAreaId string, deliveryCountry " +
+        "string, deliveryProvince string, deliveryCity string,deliveryDistrict string, " +
+        "deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId " +
+        "string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict" +
+        " string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, " +
+        "Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, " +
+        "Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string," +
+        "Active_webTypeDataVerNumber string, Active_operatorsVersion string, " +
+        "Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, " +
+        "Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, " +
+        "Latest_province string, Latest_city string, Latest_district string, Latest_street " +
+        "string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion " +
+        "string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion " +
+        "string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, " +
+        "Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, " +
+        "Latest_operatorId string, gamePointId int,gamePointDescription string" +
+        ") row format delimited fields terminated by ','"
+      )
+      sql("LOAD DATA LOCAL INPATH '" + currentDirectory + "/src/test/resources/100_olap.csv' INTO " +
+          "table Carbon_automation_hive2 ")
     } catch {
       case e: Exception => print("ERROR: DROP Carbon_automation_test2 ")
     }
@@ -89,6 +115,7 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
   override def afterAll {
     try {
       sql("drop cube Carbon_automation_test2")
+      sql("drop table Carbon_automation_hive2")
     } catch {
       case e: Exception => print("ERROR: DROP Carbon_automation_test2 ")
     }
@@ -7902,7 +7929,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
       sql(
         "select variance(deviceInformationId), var_pop(imei)  from Carbon_automation_test2 where activeareaid>3"
       ),
-      Seq(Row(1.477644655616972E10, null))
+      sql(
+        "select variance(deviceInformationId), var_pop(imei)  from Carbon_automation_hive2 where activeareaid>3"
+      )
     )
   }
   )
@@ -7915,7 +7944,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
       sql(
         "select variance(contractNumber), var_pop(contractNumber)  from Carbon_automation_test2 where deliveryareaid>5"
       ),
-      Seq(Row(8.508651970169495E12, 8.508651970169495E12))
+      sql(
+        "select variance(contractNumber), var_pop(contractNumber)  from Carbon_automation_hive2 where deliveryareaid>5"
+      )
     )
   }
   )
@@ -7928,7 +7959,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
       sql(
         "select variance(AMSize), var_pop(channelsid)  from Carbon_automation_test2 where channelsid>2"
       ),
-      Seq(Row(null, 2.148423005565863))
+      sql(
+        "select variance(AMSize), var_pop(channelsid)  from Carbon_automation_hive2 where channelsid>2"
+      )
     )
   }
   )
@@ -7941,7 +7974,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
       sql(
         "select variance(deviceInformationId), var_pop(deviceInformationId)  from Carbon_automation_test2 where activeareaid>3"
       ),
-      Seq(Row(1.477644655616972E10, 1.477644655616972E10))
+      sql(
+        "select variance(deviceInformationId), var_pop(deviceInformationId)  from Carbon_automation_hive2 where activeareaid>3"
+      )
     )
   }
   )

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
index 1884e5f..12f55b7 100644
--- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
+++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
@@ -53,7 +53,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
           "bomCode string,internalModels string, deliveryTime string, channelsId string, " +
           "channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince " +
           "string, deliveryCity string,deliveryDistrict string, deliveryStreet string, " +
-          "oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry " +
+          "oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry " +
           "string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet " +
           "string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, " +
           "Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, " +
@@ -65,8 +65,8 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
           "Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, " +
           "Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string," +
           " Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, " +
-          "Latest_phonePADPartitionedVersions string, Latest_operatorId string, " +
-          "gamePointDescription string, gamePointId int,contractNumber int) row format " +
+          "Latest_phonePADPartitionedVersions string, Latest_operatorId string,gamePointId int," +
+          "gamePointDescription string) row format " +
           "delimited fields terminated by ','"
       )
 
@@ -211,7 +211,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
   test("select sum( DISTINCT channelsId) a  from Carbon_automation_test6")({
     checkAnswer(
       sql("select sum( DISTINCT channelsId) a  from Carbon_automation_test6"),
-      Seq(Row(428.0)))
+      sql("select sum( DISTINCT channelsId) a  from hivetable"))
   })
 
   //TC_083
@@ -263,7 +263,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
   test("select variance(gamePointId) as a   from Carbon_automation_test6")({
     checkAnswer(
       sql("select variance(gamePointId) as a   from Carbon_automation_test6"),
-      Seq(Row(654787.843930927)))
+      sql("select variance(gamePointId) as a   from hivetable"))
   })
 
   //TC_120
@@ -732,14 +732,14 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
   test("select variance(gamepointid), var_pop(gamepointid)  from Carbon_automation_test6 where channelsid>2")({
     checkAnswer(
       sql("select variance(gamepointid), var_pop(gamepointid)  from Carbon_automation_test6 where channelsid>2"),
-      Seq(Row(622630.4599570761, 622630.4599570761)))
+      sql("select variance(gamepointid), var_pop(gamepointid)  from hivetable where channelsid>2"))
   })
 
   //TC_445
   test("select variance(bomcode), var_pop(gamepointid)  from Carbon_automation_test6 where activeareaid>3")({
     checkAnswer(
       sql("select variance(bomcode), var_pop(gamepointid)  from Carbon_automation_test6 where activeareaid>3"),
-      Seq(Row(1.4776446556169722E10, 663683.3954750763)))
+      sql("select variance(bomcode), var_pop(gamepointid)  from hivetable where activeareaid>3"))
   })
 
   //TC_447

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 0682a42..c98c9fb 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -65,17 +65,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>eigenbase</groupId>
-      <artifactId>eigenbase-xom</artifactId>
-      <version>1.3.4</version>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil</artifactId>
       <version>6.5.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala
deleted file mode 100644
index 93cf675..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.HashMap
-
-import scala.Array.{canBuildFrom, fallbackCanBuildFrom}
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-
-/**
- * :: DeveloperApi ::
- * Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each
- * group.
- *
- * @param partial              if true then aggregation is done partially on local data without
- *                             shuffling to
- *                             ensure all values where `groupingExpressions` are equal are present.
- * @param groupingExpressions  expressions that are evaluated to determine grouping.
- * @param aggregateExpressions expressions that are computed for each group.
- * @param child                the input data source.
- */
-@DeveloperApi
-case class CarbonAggregate(
-    partial: Boolean,
-    groupingExpressions: Seq[Expression],
-    aggregateExpressions: Seq[NamedExpression],
-    child: SparkPlan)(@transient sqlContext: SQLContext)
-  extends UnaryNode {
-
-  override def requiredChildDistribution: Seq[Distribution] = {
-    if (partial) {
-      UnspecifiedDistribution :: Nil
-    } else {
-      if (groupingExpressions == Nil) {
-        AllTuples :: Nil
-      } else {
-        ClusteredDistribution(groupingExpressions) :: Nil
-      }
-    }
-  }
-
-  override def otherCopyArgs: Seq[AnyRef] = sqlContext :: Nil
-
-  // HACK: Generators don't correctly preserve their output through serializations so we grab
-  // out child's output attributes statically here.
-  private[this] val childOutput = child.output
-
-  override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
-
-  /**
-   * An aggregate that needs to be computed for each row in a group.
-   *
-   * @param unbound         Unbound version of this aggregate, used for result substitution.
-   * @param aggregate       A bound copy of this aggregate used to create a new aggregation buffer.
-   * @param resultAttribute An attribute used to refer to the result of this aggregate in the final
-   *                        output.
-   */
-  case class ComputedAggregate(unbound: AggregateExpression1,
-      aggregate: AggregateExpression1,
-      resultAttribute: AttributeReference)
-
-  /** A list of aggregates that need to be computed for each group. */
-  private[this] val computedAggregates = aggregateExpressions.flatMap { agg =>
-    agg.collect {
-      case a: AggregateExpression1 =>
-        ComputedAggregate(
-          a,
-          BindReferences.bindReference(a, childOutput),
-          AttributeReference(s"aggResult:$a", a.dataType, a.nullable)())
-    }
-  }.toArray
-
-  /** The schema of the result of all aggregate evaluations */
-  private[this] val computedSchema = computedAggregates.map(_.resultAttribute)
-
-  /** Creates a new aggregate buffer for a group. */
-  private[this] def newAggregateBuffer(): Array[AggregateFunction1] = {
-    val buffer = new Array[AggregateFunction1](computedAggregates.length)
-    var i = 0
-    while (i < computedAggregates.length) {
-      buffer(i) = computedAggregates(i).aggregate.newInstance()
-      i += 1
-    }
-    buffer
-  }
-
-  /** Named attributes used to substitute grouping attributes into the final result. */
-  private[this] val namedGroups = groupingExpressions.map {
-    case ne: NamedExpression => ne -> ne.toAttribute
-    case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
-  }
-
-  /**
-   * A map of substitutions that are used to insert the aggregate expressions and grouping
-   * expression into the final result expression.
-   */
-  private[this] val resultMap =
-    (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap
-
-  /**
-   * Substituted version of aggregateExpressions expressions which are used to compute final
-   * output rows given a group and the result of all aggregate computations.
-   */
-  private[this] val resultExpressions = aggregateExpressions.map { agg =>
-    agg.transform {
-      case e: Expression if resultMap.contains(e) => resultMap(e)
-    }
-  }
-
-  override def doExecute(): RDD[InternalRow] = {
-    attachTree(this, "execute") {
-      if (groupingExpressions.isEmpty) {
-        child.execute().mapPartitions { iter =>
-          val buffer = newAggregateBuffer()
-          var currentRow: InternalRow = null
-          while (iter.hasNext) {
-            currentRow = iter.next()
-            var i = 0
-            while (i < buffer.length) {
-              buffer(i).update(currentRow)
-              i += 1
-            }
-          }
-          val resultProjection = new InterpretedProjection(resultExpressions, computedSchema)
-          val aggregateResults = new GenericMutableRow(computedAggregates.length)
-
-          var i = 0
-          while (i < buffer.length) {
-            aggregateResults(i) = buffer(i).eval(EmptyRow)
-            i += 1
-          }
-
-          Iterator(resultProjection(aggregateResults))
-        }
-      } else {
-        child.execute().mapPartitions { iter =>
-          val hashTable = new HashMap[InternalRow, Array[AggregateFunction1]]
-          val groupingProjection = new InterpretedMutableProjection(groupingExpressions,
-            childOutput)
-
-          var currentRow: InternalRow = null
-          while (iter.hasNext) {
-            currentRow = iter.next()
-            val currentGroup = groupingProjection(currentRow)
-            var currentBuffer = hashTable.get(currentGroup)
-            if (currentBuffer == null) {
-              currentBuffer = newAggregateBuffer()
-              hashTable.put(currentGroup.copy(), currentBuffer)
-            }
-
-            var i = 0
-            while (i < currentBuffer.length) {
-              currentBuffer(i).update(currentRow)
-              i += 1
-            }
-          }
-
-          new Iterator[InternalRow] {
-            private[this] val hashTableIter = hashTable.entrySet().iterator()
-            private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
-            private[this] val resultProjection =
-              new InterpretedMutableProjection(resultExpressions,
-                computedSchema ++ namedGroups.map(_._2))
-            private[this] val joinedRow = new JoinedRow
-
-            override final def hasNext: Boolean = hashTableIter.hasNext
-
-            override final def next(): InternalRow = {
-              val currentEntry = hashTableIter.next()
-              val currentGroup = currentEntry.getKey
-              val currentBuffer = currentEntry.getValue
-
-              var i = 0
-              while (i < currentBuffer.length) {
-                // Evaluating an aggregate buffer returns the result.  No row is required since we
-                // already added all rows in the group using update.
-                aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
-                i += 1
-              }
-              resultProjection(joinedRow(aggregateResults, currentGroup))
-            }
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index e574348..f728a32 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -19,18 +19,18 @@ package org.apache.spark.sql
 
 import scala.collection.mutable.MutableList
 
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.agg.{CarbonAverage, CarbonCount}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
-import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.execution.command.tableModel
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
-import org.apache.spark.sql.types.{BooleanType, DataType, StringType, TimestampType}
-
-import org.carbondata.spark.agg._
+import org.apache.spark.sql.types._
 
 /**
  * Top command
@@ -94,7 +94,7 @@ case class ShowCubeCommand(schemaNameOp: Option[String]) extends LogicalPlan wit
   override def children: Seq[LogicalPlan] = Seq.empty
 
   override def output: Seq[Attribute] = {
-    Seq(AttributeReference("cubeName", StringType, nullable = false)(),
+    Seq(AttributeReference("tableName", StringType, nullable = false)(),
       AttributeReference("isRegisteredWithSpark", BooleanType, nullable = false)())
   }
 }
@@ -107,8 +107,8 @@ case class ShowAllCubeCommand() extends LogicalPlan with Command {
   override def children: Seq[LogicalPlan] = Seq.empty
 
   override def output: Seq[Attribute] = {
-    Seq(AttributeReference("schemaName", StringType, nullable = false)(),
-      AttributeReference("cubeName", StringType, nullable = false)(),
+    Seq(AttributeReference("dbName", StringType, nullable = false)(),
+      AttributeReference("tableName", StringType, nullable = false)(),
       AttributeReference("isRegisteredWithSpark", BooleanType, nullable = false)())
   }
 }
@@ -161,7 +161,7 @@ case class ShowLoadsCommand(schemaNameOp: Option[String], cube: String, limit: O
 /**
  * Describe formatted for hive table
  */
-case class DescribeFormattedCommand(sql: String, tblIdentifier: Seq[String])
+case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier)
   extends LogicalPlan with Command {
   override def children: Seq[LogicalPlan] = Seq.empty
 
@@ -181,16 +181,18 @@ case class CarbonDictionaryCatalystDecoder(
   override def output: Seq[Attribute] = child.output
 }
 
-abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable{
+abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
   def isEmpty: Boolean = attributes.isEmpty
 }
+
 case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
+
 case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
 
 case class FakeCarbonCast(child: Literal, dataType: DataType)
   extends LeafExpression with CodegenFallback {
 
-  override def toString: String = s"FakeCarbonCast($child as ${dataType.simpleString})"
+  override def toString: String = s"FakeCarbonCast($child as ${ dataType.simpleString })"
 
   override def checkInputDataTypes(): TypeCheckResult = {
     TypeCheckResult.TypeCheckSuccess
@@ -255,7 +257,7 @@ object PhysicalOperation1 extends PredicateHelper {
         val (fields, filters, other, aliases, _, sortOrder, limit) = collectProjectsAndFilters(
           child)
 
-        var aggExps: Seq[AggregateExpression1] = Nil
+        var aggExps: Seq[AggregateExpression] = Nil
         aggregateExpressions.foreach(v => {
           val list = findAggreagateExpression(v)
           aggExps = aggExps ++ list
@@ -276,12 +278,12 @@ object PhysicalOperation1 extends PredicateHelper {
     }
   }
 
-  def findAggreagateExpression(expr: Expression): Seq[AggregateExpression1] = {
+  def findAggreagateExpression(expr: Expression): Seq[AggregateExpression] = {
     val exprList = expr match {
-      case d: AggregateExpression1 => d :: Nil
+      case d: AggregateExpression => d :: Nil
       case Alias(ref, name) => findAggreagateExpression(ref)
       case other =>
-        var listout: Seq[AggregateExpression1] = Nil
+        var listout: Seq[AggregateExpression] = Nil
 
         other.children.foreach(v => {
           val list = findAggreagateExpression(v)
@@ -317,7 +319,7 @@ object PhysicalOperation1 extends PredicateHelper {
           case Alias(ref, name) => ref
           case others => others
         }.filter {
-          case d: AggregateExpression1 => true
+          case d: AggregateExpression => true
           case _ => false
         }
         (fields, filters, other, aliases ++ collectAliases(aggregateExpressions), Some(
@@ -352,6 +354,28 @@ object PhysicalOperation1 extends PredicateHelper {
   }
 }
 
+case class PositionLiteral(expr: Expression, intermediateDataType: DataType)
+  extends LeafExpression with CodegenFallback {
+  override def dataType: DataType = expr.dataType
+
+  override def nullable: Boolean = false
+
+  type EvaluatedType = Any
+  var position = -1
+
+  def setPosition(pos: Int): Unit = position = pos
+
+  override def toString: String = s"PositionLiteral($position : $expr)"
+
+  override def eval(input: InternalRow): Any = {
+    if (position != -1) {
+      input.get(position, intermediateDataType)
+    } else {
+      expr.eval(input)
+    }
+  }
+}
+
 /**
  * Matches a logical aggregation that can be performed on distributed data in two steps.  The first
  * operates on the data in each partition performing partial aggregation for each group.  The second
@@ -367,85 +391,98 @@ object PhysicalOperation1 extends PredicateHelper {
  * - Partial aggregate expressions.
  * - Input to the aggregation.
  */
-object PartialAggregation {
-  type ReturnType =
-  (Seq[Attribute], Seq[NamedExpression], Seq[Expression], Seq[NamedExpression], LogicalPlan)
+object CarbonAggregation {
+  type ReturnType = (Seq[Expression], Seq[NamedExpression], LogicalPlan)
 
   private def convertAggregatesForPushdown(convertUnknown: Boolean,
-      rewrittenAggregateExpressions: Seq[Expression]) = {
-    var counter: Int = 0
-    var updatedExpressions = MutableList[Expression]()
-    rewrittenAggregateExpressions.foreach(v => {
-      val updated = convertAggregate(v, counter, convertUnknown)
-      updatedExpressions += updated
-      counter = counter + 1
-    })
-    updatedExpressions
+      rewrittenAggregateExpressions: Seq[Expression],
+      oneAttr: AttributeReference) = {
+    if (canBeConvertedToCarbonAggregate(rewrittenAggregateExpressions)) {
+      var counter: Int = 0
+      var updatedExpressions = MutableList[Expression]()
+      rewrittenAggregateExpressions.foreach(v => {
+        val updated = convertAggregate(v, counter, convertUnknown, oneAttr)
+        updatedExpressions += updated
+        counter = counter + 1
+      })
+      updatedExpressions
+    } else {
+      rewrittenAggregateExpressions
+    }
   }
 
-  def makePositionLiteral(expr: Expression, index: Int): PositionLiteral = {
-    val posLiteral = PositionLiteral(expr, MeasureAggregatorUDT)
+  def makePositionLiteral(expr: Expression, index: Int, dataType: DataType): PositionLiteral = {
+    val posLiteral = PositionLiteral(expr, dataType)
     posLiteral.setPosition(index)
     posLiteral
   }
 
-  def convertAggregate(current: Expression, index: Int, convertUnknown: Boolean): Expression = {
-    if (convertUnknown) {
+  def convertAggregate(current: Expression,
+      index: Int,
+      convertUnknown: Boolean,
+      oneAttr: AttributeReference): Expression = {
+    if (!convertUnknown && canBeConverted(current)) {
       current.transform {
-        case a@SumCarbon(_, _) => a
-        case a@AverageCarbon(_, _) => a
-        case a@MinCarbon(_, _) => a
-        case a@MaxCarbon(_, _) => a
-        case a@SumDistinctCarbon(_, _) => a
-        case a@CountDistinctCarbon(_) => a
-        case a@CountCarbon(_) => a
-        case anyAggr: AggregateExpression1 => anyAggr
+        case Average(attr: AttributeReference) =>
+          val convertedDataType = transformArrayType(attr)
+          CarbonAverage(makePositionLiteral(convertedDataType, index, convertedDataType.dataType))
+        case Average(Cast(attr: AttributeReference, dataType)) =>
+          val convertedDataType = transformArrayType(attr)
+          CarbonAverage(
+              makePositionLiteral(convertedDataType, index, convertedDataType.dataType))
+        case Count(Seq(s: Literal)) =>
+          CarbonCount(s, Some(makePositionLiteral(transformLongType(oneAttr), index, LongType)))
+        case Count(Seq(attr: AttributeReference)) =>
+          CarbonCount(makePositionLiteral(transformLongType(attr), index, LongType))
+        case Sum(attr: AttributeReference) =>
+          Sum(makePositionLiteral(attr, index, attr.dataType))
+        case Sum(Cast(attr: AttributeReference, dataType)) =>
+          Sum(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
+        case Min(attr: AttributeReference) => Min(makePositionLiteral(attr, index, attr.dataType))
+        case Min(Cast(attr: AttributeReference, dataType)) =>
+          Min(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
+        case Max(attr: AttributeReference) =>
+          Max(makePositionLiteral(attr, index, attr.dataType))
+        case Max(Cast(attr: AttributeReference, dataType)) =>
+          Max(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
       }
     } else {
-      current.transform {
-        case a@Sum(attr: AttributeReference) => SumCarbon(makePositionLiteral(attr, index))
-        case a@Sum(cast@Cast(attr: AttributeReference, _)) => SumCarbon(
-          makePositionLiteral(attr, index), cast.dataType)
-        case a@Average(attr: AttributeReference) => AverageCarbon(makePositionLiteral(attr, index))
-        case a@Average(cast@Cast(attr: AttributeReference, _)) => AverageCarbon(
-          makePositionLiteral(attr, index), cast.dataType)
-        case a@Min(attr: AttributeReference) => MinCarbon(makePositionLiteral(attr, index))
-        case a@Min(cast@Cast(attr: AttributeReference, _)) => MinCarbon(
-          makePositionLiteral(attr, index), cast.dataType)
-        case a@Max(attr: AttributeReference) => MaxCarbon(makePositionLiteral(attr, index))
-        case a@Max(cast@Cast(attr: AttributeReference, _)) => MaxCarbon(
-          makePositionLiteral(attr, index), cast.dataType)
-        case a@SumDistinct(attr: AttributeReference) => SumDistinctCarbon(
-          makePositionLiteral(attr, index))
-        case a@SumDistinct(cast@Cast(attr: AttributeReference, _)) => SumDistinctCarbon(
-          makePositionLiteral(attr, index), cast.dataType)
-        case a@CountDistinct(attr: AttributeReference) => CountDistinctCarbon(
-          makePositionLiteral(attr, index))
-        case a@CountDistinct(childSeq) if childSeq.size == 1 =>
-          childSeq.head match {
-            case attr: AttributeReference => CountDistinctCarbon(makePositionLiteral(attr, index))
-            case _ => a
-          }
-        case a@Count(s@Literal(_, _)) =>
-          CountCarbon(makePositionLiteral(s, index))
-        case a@Count(attr: AttributeReference) =>
-          if (attr.name.equals("*")) {
-            CountCarbon(makePositionLiteral(Literal("*"), index))
-          } else {
-            CountCarbon(makePositionLiteral(attr, index))
-          }
-      }
+      current
     }
   }
 
+  def canBeConverted(current: Expression): Boolean = current match {
+    case Alias(AggregateExpression(Average(attr: AttributeReference), _, false), _) => true
+    case Alias(AggregateExpression(Average(Cast(attr: AttributeReference, _)), _, false), _) => true
+    case Alias(AggregateExpression(Count(Seq(s: Literal)), _, false), _) => true
+    case Alias(AggregateExpression(Count(Seq(attr: AttributeReference)), _, false), _) => true
+    case Alias(AggregateExpression(Sum(attr: AttributeReference), _, false), _) => true
+    case Alias(AggregateExpression(Sum(Cast(attr: AttributeReference, _)), _, false), _) => true
+    case Alias(AggregateExpression(Min(attr: AttributeReference), _, false), _) => true
+    case Alias(AggregateExpression(Min(Cast(attr: AttributeReference, _)), _, false), _) => true
+    case Alias(AggregateExpression(Max(attr: AttributeReference), _, false), _) => true
+    case Alias(AggregateExpression(Max(Cast(attr: AttributeReference, _)), _, false), _) => true
+    case _ => false
+  }
+
+  def transformArrayType(attr: AttributeReference): AttributeReference = {
+    AttributeReference(attr.name, ArrayType(DoubleType), attr.nullable, attr.metadata)(attr.exprId,
+      attr.qualifiers)
+  }
+
+  def transformLongType(attr: AttributeReference): AttributeReference = {
+    AttributeReference(attr.name, LongType, attr.nullable, attr.metadata)(attr.exprId,
+      attr.qualifiers)
+  }
+
   /**
    * There should be sync between carbonOperators validation and here. we should not convert to
    * carbon aggregates if the validation does not satisfy.
    */
-  private def canBeConvertedToCarbonAggregate(expressions: Seq[Expression]): Boolean = {
+  def canBeConvertedToCarbonAggregate(expressions: Seq[Expression]): Boolean = {
     val detailQuery = expressions.map {
       case attr@AttributeReference(_, _, _, _) => true
-      case par: Alias if par.children.head.isInstanceOf[AggregateExpression1] => true
+      case Alias(agg: AggregateExpression, name) => true
       case _ => false
     }.exists(!_)
     !detailQuery
@@ -454,6 +491,7 @@ object PartialAggregation {
   def unapply(plan: LogicalPlan): Option[ReturnType] = unapply((plan, false))
 
   def unapply(combinedPlan: (LogicalPlan, Boolean)): Option[ReturnType] = {
+    val oneAttr = getOneAttribute(combinedPlan._1)
     combinedPlan._1 match {
       case Aggregate(groupingExpressions, aggregateExpressionsOrig, child) =>
 
@@ -463,99 +501,28 @@ object PartialAggregation {
             aggregateExpressionsOrig
           }
           else {
-            // First calculate partialComputation before converting and then check whether it could
-            // be converted or not. This type of checks are necessary for queries like
-            // select sum(col)+10 from table. Here the aggregates are different for
-            // partialComputation and aggregateExpressionsOrig. So first check on partialComputation
-            val preCheckEval = getPartialEvaluation(groupingExpressions, aggregateExpressionsOrig)
-            preCheckEval match {
-              case Some(allExprs) =>
-                if (canBeConvertedToCarbonAggregate(allExprs._1)) {
-                  convertAggregatesForPushdown(false, aggregateExpressionsOrig)
-                } else {
-                  aggregateExpressionsOrig
-                }
-              case _ => aggregateExpressionsOrig
-            }
+            convertAggregatesForPushdown(false, aggregateExpressionsOrig, oneAttr)
           }
-        val evaluation = getPartialEvaluation(groupingExpressions, aggregateExpressions)
-
-        evaluation match {
-          case(Some((partialComputation,
-              rewrittenAggregateExpressions,
-              namedGroupingAttributes))) =>
-            // Convert the other aggregations for push down to Carbon layer.
-            // Here don't touch earlier converted native carbon aggregators.
-            val convertedPartialComputation =
-              if (combinedPlan._2) {
-                partialComputation
-              }
-              else {
-                convertAggregatesForPushdown(true, partialComputation)
-                  .asInstanceOf[Seq[NamedExpression]]
-              }
-
-            Some(
-              (namedGroupingAttributes,
-                rewrittenAggregateExpressions,
-                groupingExpressions,
-                convertedPartialComputation,
-                child))
-          case _ => None
-        }
-
+        Some((groupingExpressions, aggregateExpressions.asInstanceOf[Seq[NamedExpression]], child))
       case _ => None
     }
   }
 
-  def getPartialEvaluation(groupingExpressions: Seq[Expression],
-      aggregateExpressions: Seq[Expression]):
-      Option[(Seq[NamedExpression], Seq[NamedExpression], Seq[Attribute])] = {
-    // Collect all aggregate expressions.
-    val allAggregates =
-      aggregateExpressions.flatMap(_ collect { case a: AggregateExpression1 => a })
-    // Collect all aggregate expressions that can be computed partially.
-    val partialAggregates =
-      aggregateExpressions.flatMap(_ collect { case p: PartialAggregate1 => p })
-
-    // Only do partial aggregation if supported by all aggregate expressions.
-    if (allAggregates.size == partialAggregates.size) {
-      // Create a map of expressions to their partial evaluations for all aggregate expressions.
-      val partialEvaluations: Map[TreeNodeRef, SplitEvaluation] =
-        partialAggregates.map(a => (new TreeNodeRef(a), a.asPartial)).toMap
-
-      // We need to pass all grouping expressions though so the grouping can happen a second
-      // time. However some of them might be unnamed so we alias them allowing them to be
-      // referenced in the second aggregation.
-      val namedGroupingExpressions: Map[Expression, NamedExpression] = groupingExpressions.map {
-        case n: NamedExpression => (n, n)
-        case other => (other, Alias(other, "PartialGroup")())
-      }.toMap
-
-      // Replace aggregations with a new expression that computes the result from the already
-      // computed partial evaluations and grouping values.
-      val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp {
-        case e: Expression if partialEvaluations.contains(new TreeNodeRef(e)) =>
-          partialEvaluations(new TreeNodeRef(e)).finalEvaluation
-
-        case e: Expression =>
-          // Should trim aliases around `GetField`s. These aliases are introduced while
-          // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
-          // (Should we just turn `GetField` into a `NamedExpression`?)
-          namedGroupingExpressions.collectFirst {
-            case (expr, ne) if expr semanticEquals e => ne.toAttribute
-          }.getOrElse(e)
-      }).asInstanceOf[Seq[NamedExpression]]
-
-      val partialComputation =
-        (namedGroupingExpressions.values ++
-         partialEvaluations.values.flatMap(_.partialEvaluations)).toSeq
-      val namedGroupingAttributes = namedGroupingExpressions.values.map(_.toAttribute).toSeq
-      Some(partialComputation, rewrittenAggregateExpressions, namedGroupingAttributes)
+  def getOneAttribute(plan: LogicalPlan): AttributeReference = {
+    var relation: LogicalRelation = null
+    plan collect {
+      case l: LogicalRelation => relation = l
+    }
+    if (relation != null) {
+      relation.output.find { p =>
+        p.dataType match {
+          case n: NumericType => true
+          case _ => false
+        }
+      }.getOrElse(relation.output.head)
     } else {
-      None
+      null
     }
-
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 79d8ffa..2bf50da 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import scala.language.implicitConversions
 
 import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
 import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -52,7 +53,7 @@ class CarbonContext(val sc: SparkContext, val storePath: String) extends HiveCon
   override protected[sql] lazy val optimizer: Optimizer =
     new CarbonOptimizer(DefaultOptimizer, conf)
 
-  override protected[sql] def dialectClassName = classOf[CarbonSQLDialect].getCanonicalName
+  protected[sql] override def getSQLDialect(): ParserDialect = new CarbonSQLDialect(this)
 
   experimental.extraStrategies = CarbonStrategy.getStrategy(self)
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index f95acf4..94b38a4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -24,6 +24,7 @@ import scala.language.implicitConversions
 
 import org.apache.hadoop.fs.Path
 import org.apache.spark._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -56,7 +57,11 @@ class CarbonSource
       case _ =>
         val options = new CarbonOption(parameters)
         val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
-        CarbonDatasourceRelation(tableIdentifier, None)(sqlContext)
+        val ident = tableIdentifier match {
+          case Seq(name) => TableIdentifier(name)
+          case Seq(db, name) => TableIdentifier(name, Some(db))
+        }
+        CarbonDatasourceRelation(ident, None)(sqlContext)
     }
 
   }
@@ -120,14 +125,14 @@ class CarbonSource
  * This relation is stored to hive metastore
  */
 private[sql] case class CarbonDatasourceRelation(
-    tableIdentifier: Seq[String],
+    tableIdentifier: TableIdentifier,
     alias: Option[String])
   (@transient context: SQLContext)
   extends BaseRelation with Serializable with Logging {
 
   def carbonRelation: CarbonRelation = {
     CarbonEnv.getInstance(context)
-      .carbonCatalog.lookupRelation2(tableIdentifier, None)(sqlContext)
+      .carbonCatalog.lookupRelation1(tableIdentifier, None)(sqlContext)
       .asInstanceOf[CarbonRelation]
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index a4ac246..600519f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -32,6 +32,7 @@ import org.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueI
 import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.carbondata.core.carbon.metadata.datatype.DataType
 import org.carbondata.core.carbon.metadata.encoder.Encoding
+import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.carbondata.query.carbon.util.DataTypeUtil
 
 /**
@@ -62,7 +63,7 @@ case class CarbonDictionaryDecoder(
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
             canBeDecoded(attr)) {
           val newAttr = AttributeReference(a.name,
-            convertCarbonToSparkDataType(carbonDimension.getDataType),
+            convertCarbonToSparkDataType(carbonDimension),
             a.nullable,
             a.metadata)(a.exprId,
             a.qualifiers).asInstanceOf[Attribute]
@@ -88,8 +89,8 @@ case class CarbonDictionaryDecoder(
     }
   }
 
-  def convertCarbonToSparkDataType(dataType: DataType): types.DataType = {
-    dataType match {
+  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension): types.DataType = {
+    carbonDimension.getDataType match {
       case DataType.STRING => StringType
       case DataType.INT => IntegerType
       case DataType.LONG => LongType
@@ -125,6 +126,9 @@ case class CarbonDictionaryDecoder(
     dictIds
   }
 
+
+  override def outputsUnsafeRows: Boolean = true
+
   override def doExecute(): RDD[InternalRow] = {
     attachTree(this, "execute") {
       val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastoreCatalog].storePath
@@ -143,20 +147,21 @@ case class CarbonDictionaryDecoder(
           val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
             forwardDictionaryCache)
           new Iterator[InternalRow] {
+            val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
             override final def hasNext: Boolean = iter.hasNext
 
             override final def next(): InternalRow = {
               val row: InternalRow = iter.next()
               val data = row.toSeq(dataTypes).toArray
               for (i <- data.indices) {
-                if (dicts(i) != null) {
+                if (dicts(i) != null && data(i) != null) {
                   data(i) = toType(DataTypeUtil
                     .getDataBasedOnDataType(dicts(i)
                       .getDictionaryValueForKey(data(i).asInstanceOf[Integer]),
                       getDictionaryColumnIds(i)._3))
                 }
               }
-              new GenericMutableRow(data)
+              unsafeProjection(new GenericMutableRow(data))
             }
           }
         }


Mime
View raw message