carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [03/56] [abbrv] incubator-carbondata git commit: [Issue 618]Supported Spark 1.6 in Carbondata (#670)
Date Thu, 23 Jun 2016 14:15:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index b8fb295..8796707 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -25,30 +25,35 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.agg.{CarbonAverage, CarbonCount}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, _}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Max, Min, Sum}
+import org.apache.spark.sql.catalyst.util.GenericArrayData
 import org.apache.spark.sql.execution.LeafNode
 import org.apache.spark.sql.hive.CarbonMetastoreCatalog
+import org.apache.spark.sql.types.{DataType, Decimal}
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.carbon.{AbsoluteTableIdentifier}
+import org.carbondata.core.carbon.AbsoluteTableIdentifier
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.util.CarbonProperties
 import org.carbondata.hadoop.CarbonInputFormat
-import org.carbondata.query.aggregator.impl.CountAggregator
+import org.carbondata.query.aggregator.MeasureAggregator
+import org.carbondata.query.aggregator.impl.avg.AbstractAvgAggregator
+import org.carbondata.query.aggregator.impl.count.CountAggregator
+import org.carbondata.query.aggregator.impl.max.{MaxAggregator, MaxBigDecimalAggregator, MaxLongAggregator}
+import org.carbondata.query.aggregator.impl.min.{MinAggregator, MinBigDecimalAggregator, MinLongAggregator}
+import org.carbondata.query.aggregator.impl.sum.{SumBigDecimalAggregator, SumDoubleAggregator, SumLongAggregator}
 import org.carbondata.query.carbon.model.{CarbonQueryPlan, QueryDimension, QueryMeasure, QueryModel, SortOrderType}
 import org.carbondata.query.carbon.result.RowResult
-import org.carbondata.query.expression.{ColumnExpression => CarbonColumnExpression}
-import org.carbondata.query.expression.{Expression => CarbonExpression}
-import org.carbondata.query.expression.{LiteralExpression => CarbonLiteralExpression}
+import org.carbondata.query.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.carbondata.query.expression.arithmetic.{AddExpression, DivideExpression, MultiplyExpression, SubstractExpression}
 import org.carbondata.query.expression.conditional._
 import org.carbondata.query.expression.logical.{AndExpression, OrExpression}
 import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue}
-import org.carbondata.spark.agg._
-import org.carbondata.spark.KeyVal
-import org.carbondata.spark.KeyValImpl
+import org.carbondata.spark.{KeyVal, KeyValImpl}
 import org.carbondata.spark.rdd.CarbonQueryRDD
 import org.carbondata.spark.util.{CarbonScalaUtil, QueryPlanUtil}
 
@@ -70,14 +75,35 @@ case class CarbonTableScan(
   var outputColumns = scala.collection.mutable.MutableList[Attribute]()
   var extraPreds: Seq[Expression] = Nil
   val allDims = new scala.collection.mutable.HashSet[String]()
-  // val carbonTable = CarbonMetadata.getInstance().getCarbonTable(cubeName)
   @transient val carbonCatalog = sqlContext.catalog.asInstanceOf[CarbonMetastoreCatalog]
 
-  def processAggregateExpr(plan: CarbonQueryPlan, currentAggregate: AggregateExpression1,
-      queryOrder: Int): Int = {
-
+  def processAggregateExpr(plan: CarbonQueryPlan,
+      currentAggregate: AggregateExpression,
+      queryOrder: Int,
+      aggCount: Int): Int = {
     currentAggregate match {
-      case SumCarbon(posLiteral@PositionLiteral(attr: AttributeReference, _), _) =>
+      case AggregateExpression(Sum(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
+        outputColumns += attr
+        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
+        if (msrs.nonEmpty) {
+          val m1 = new QueryMeasure(attr.name)
+          m1.setAggregateFunction(CarbonCommonConstants.SUM)
+          m1.setQueryOrder(queryOrder)
+          plan.addMeasure(m1)
+        } else {
+          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
+          if (dims.nonEmpty) {
+            val d1 = new QueryDimension(attr.name)
+            d1.setQueryOrder(queryOrder)
+            plan.addAggDimAggInfo(d1.getColumnName, "sum", d1.getQueryOrder)
+          }
+        }
+        p.setPosition(queryOrder + aggCount)
+        queryOrder + 1
+
+      case AggregateExpression(
+        Sum(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
+        outputColumns += attr
         val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
         if (msrs.nonEmpty) {
           val m1 = new QueryMeasure(attr.name)
@@ -92,10 +118,12 @@ case class CarbonTableScan(
             plan.addAggDimAggInfo(d1.getColumnName, "sum", d1.getQueryOrder)
           }
         }
-        posLiteral.setPosition(queryOrder)
+        p.setPosition(queryOrder + aggCount)
         queryOrder + 1
 
-      case CountCarbon(posLiteral@PositionLiteral(attr: AttributeReference, _)) =>
+      case AggregateExpression(
+        CarbonCount(p@PositionLiteral(attr: AttributeReference, _), None), _, false) =>
+        outputColumns += attr
         val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
         if (msrs.nonEmpty) {
           val m1 = new QueryMeasure(attr.name)
@@ -110,33 +138,28 @@ case class CarbonTableScan(
             plan.addAggDimAggInfo(d1.getColumnName, "count", d1.getQueryOrder)
           }
         }
-        posLiteral.setPosition(queryOrder)
+        p.setPosition(queryOrder + aggCount)
         queryOrder + 1
 
-      case CountCarbon(posLiteral@PositionLiteral(Literal(star, _), _)) if star == "*" =>
+      case AggregateExpression(
+        CarbonCount(lt: Literal, Some(p@PositionLiteral(attr: AttributeReference, _))), _, false)
+        if lt.value == "*" || lt.value == 1 =>
+        outputColumns += attr
         val m1 = new QueryMeasure("count(*)")
         m1.setAggregateFunction(CarbonCommonConstants.COUNT)
         m1.setQueryOrder(queryOrder)
-        posLiteral.setPosition(queryOrder)
         plan.addMeasure(m1)
         plan.setCountStartQuery(true)
-        posLiteral.setPosition(queryOrder)
+        p.setPosition(queryOrder + aggCount)
         queryOrder + 1
 
-      case curr@CountCarbon(posLiteral@PositionLiteral(one, _)) =>
-        val m1 = new QueryMeasure("count(*)")
-        m1.setAggregateFunction(CarbonCommonConstants.COUNT)
-        m1.setQueryOrder(queryOrder)
-        posLiteral.setPosition(queryOrder)
-        plan.addMeasure(m1)
-        plan.setCountStartQuery(true)
-        posLiteral.setPosition(queryOrder)
-        queryOrder + 1
-      case CountDistinctCarbon(posLiteral@PositionLiteral(attr: AttributeReference, _)) =>
+      case AggregateExpression(
+        CarbonAverage(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
+        outputColumns += attr
         val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
         if (msrs.nonEmpty) {
           val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.DISTINCT_COUNT)
+          m1.setAggregateFunction(CarbonCommonConstants.AVERAGE)
           m1.setQueryOrder(queryOrder)
           plan.addMeasure(m1)
         } else {
@@ -144,13 +167,15 @@ case class CarbonTableScan(
           if (dims.nonEmpty) {
             val d1 = new QueryDimension(attr.name)
             d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "distinct-count", d1.getQueryOrder)
+            plan.addAggDimAggInfo(d1.getColumnName, "avg", d1.getQueryOrder)
           }
         }
-        posLiteral.setPosition(queryOrder)
+        p.setPosition(queryOrder + aggCount)
         queryOrder + 1
 
-      case AverageCarbon(posLiteral@PositionLiteral(attr: AttributeReference, _), _) =>
+      case AggregateExpression(
+        CarbonAverage(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
+        outputColumns += attr
         val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
         if (msrs.nonEmpty) {
           val m1 = new QueryMeasure(attr.name)
@@ -165,10 +190,31 @@ case class CarbonTableScan(
             plan.addAggDimAggInfo(d1.getColumnName, "avg", d1.getQueryOrder)
           }
         }
-        posLiteral.setPosition(queryOrder)
+        p.setPosition(queryOrder + aggCount)
+        queryOrder + 1
+
+      case AggregateExpression(Min(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
+        outputColumns += attr
+        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
+        if (msrs.nonEmpty) {
+          val m1 = new QueryMeasure(attr.name)
+          m1.setAggregateFunction(CarbonCommonConstants.MIN)
+          m1.setQueryOrder(queryOrder)
+          plan.addMeasure(m1)
+        } else {
+          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
+          if (dims != null) {
+            val d1 = new QueryDimension(attr.name)
+            d1.setQueryOrder(queryOrder)
+            plan.addAggDimAggInfo(d1.getColumnName, "min", d1.getQueryOrder)
+          }
+        }
+        p.setPosition(queryOrder + aggCount)
         queryOrder + 1
 
-      case MinCarbon(posLiteral@PositionLiteral(attr: AttributeReference, _), _) =>
+      case AggregateExpression(
+        Min(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
+        outputColumns += attr
         val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
         if (msrs.nonEmpty) {
           val m1 = new QueryMeasure(attr.name)
@@ -183,10 +229,11 @@ case class CarbonTableScan(
             plan.addAggDimAggInfo(d1.getColumnName, "min", d1.getQueryOrder)
           }
         }
-        posLiteral.setPosition(queryOrder)
+        p.setPosition(queryOrder + aggCount)
         queryOrder + 1
 
-      case MaxCarbon(posLiteral@PositionLiteral(attr: AttributeReference, _), _) =>
+      case AggregateExpression(Max(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
+        outputColumns += attr
         val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
         if (msrs.nonEmpty) {
           val m1 = new QueryMeasure(attr.name)
@@ -201,26 +248,27 @@ case class CarbonTableScan(
             plan.addAggDimAggInfo(d1.getColumnName, "max", d1.getQueryOrder)
           }
         }
-        posLiteral.setPosition(queryOrder)
+        p.setPosition(queryOrder + aggCount)
         queryOrder + 1
 
-      case SumDistinctCarbon(posLiteral@PositionLiteral(attr: AttributeReference, _), _) =>
+      case AggregateExpression(
+        Max(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
+        outputColumns += attr
         val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
         if (msrs.nonEmpty) {
           val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.SUM_DISTINCT)
+          m1.setAggregateFunction(CarbonCommonConstants.MAX)
           m1.setQueryOrder(queryOrder)
           plan.addMeasure(m1)
         } else {
           val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims != null) {
-            //            plan.removeDimensionFromDimList(dims(0));
+          if (dims.nonEmpty) {
             val d1 = new QueryDimension(attr.name)
             d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "sum-distinct", queryOrder)
+            plan.addAggDimAggInfo(d1.getColumnName, "max", d1.getQueryOrder)
           }
         }
-        posLiteral.setPosition(queryOrder)
+        p.setPosition(queryOrder + aggCount)
         queryOrder + 1
 
       case _ => throw new
@@ -239,7 +287,6 @@ case class CarbonTableScan(
         val carbonDimension = carbonTable.getDimensionByName(carbonTable.getFactTableName
           , attr.name)
         if (carbonDimension != null) {
-          // TODO if we can add ordina in carbonDimension, it will be good
           allDims += attr.name
           val dim = new QueryDimension(attr.name)
           dim.setQueryOrder(queryOrder)
@@ -257,50 +304,63 @@ case class CarbonTableScan(
         }
       })
     queryOrder = 0
+
+    // It is required to calculate as spark aggregators uses joined row with the current aggregates.
+    val aggCount = aggExprs match {
+      case Some(a: Seq[Expression]) =>
+        a.map {
+          case Alias(AggregateExpression(CarbonAverage(_), _, _), name) => 2
+          case Alias(agg: AggregateExpression, name) => 1
+          case _ => 0
+        }.reduceLeftOption((left, right) => left + right).getOrElse(0)
+      case _ => 0
+    }
     // Separately handle group by columns, known or unknown partial aggregations and other
     // expressions. All single column & known aggregate expressions will use native aggregates for
     // measure and dimensions
     // Unknown aggregates & Expressions will use custom aggregator
+
     aggExprs match {
       case Some(a: Seq[Expression]) if !forceDetailedQuery =>
         a.foreach {
           case attr@AttributeReference(_, _, _, _) => // Add all the references to carbon query
-            val carbonDimension = selectedDims
-              .filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-            if (carbonDimension.nonEmpty) {
-              val dim = new QueryDimension(attr.name)
-              dim.setQueryOrder(queryOrder)
-              plan.addDimension(dim)
-              queryOrder = queryOrder + 1
-            } else {
-              val carbonMeasure = selectedMsrs
-                .filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-              if (carbonMeasure.nonEmpty) {
-                // added by vishal as we are adding for dimension so need to add to measure list
-                // Carbon does not support group by on measure column so throwing exception to
-                // make it detail query
-                throw new
-                    Exception("Some Aggregate functions cannot be pushed, force to detailequery")
-              }
-              else {
-                // Some unknown attribute name is found. this may be a derived column.
-                // So, let's fall back to detailed query flow
-                throw new Exception(
-                  "Some attributes referred looks derived columns. So, force to detailequery " +
-                  attr.name)
-              }
-            }
+            addCarbonColumn(attr)
             outputColumns += attr
-          case par: Alias if par.children.head.isInstanceOf[AggregateExpression1] =>
-            outputColumns += par.toAttribute
-            queryOrder = processAggregateExpr(plan,
-              par.children.head.asInstanceOf[AggregateExpression1], queryOrder)
-
+          case al@ Alias(agg: AggregateExpression, name) =>
+            queryOrder = processAggregateExpr(plan, agg, queryOrder, aggCount)
           case _ => forceDetailedQuery = true
         }
       case _ => forceDetailedQuery = true
     }
 
+    def addCarbonColumn(attr: Attribute): Unit = {
+      val carbonDimension = selectedDims
+        .filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
+      if (carbonDimension.nonEmpty) {
+        val dim = new QueryDimension(attr.name)
+        dim.setQueryOrder(queryOrder)
+        plan.addDimension(dim)
+        queryOrder = queryOrder + 1
+      } else {
+        val carbonMeasure = selectedMsrs
+          .filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
+        if (carbonMeasure.nonEmpty) {
+          // added by vishal as we are adding for dimension so need to add to measure list
+          // Carbon does not support group by on measure column so throwing exception to
+          // make it detail query
+          throw new
+              Exception("Some Aggregate functions cannot be pushed, force to detailequery")
+        }
+        else {
+          // Some unknown attribute name is found. this may be a derived column.
+          // So, let's fall back to detailed query flow
+          throw new Exception(
+            "Some attributes referred looks derived columns. So, force to detailequery " +
+            attr.name)
+        }
+      }
+    }
+
     if (forceDetailedQuery) {
       // First clear the model if Msrs, Expressions and AggDimAggInfo filled
       plan.getDimensions.clear()
@@ -313,6 +373,12 @@ case class CarbonTableScan(
       selectedMsrs.foreach(plan.addMeasure)
     }
     else {
+      attributes.foreach { attr =>
+        if (!outputColumns.exists(_.name.equals(attr.name))) {
+          addCarbonColumn(attr)
+          outputColumns += attr
+        }
+      }
       attributes = outputColumns
     }
 
@@ -322,16 +388,13 @@ case class CarbonTableScan(
     sortExprs match {
       case Some(a: Seq[SortOrder]) =>
         a.foreach {
-          case SortOrder(SumCarbon(attr: AttributeReference, _), order) => plan.getMeasures
-            .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
-            .setSortOrder(getSortDirection(order))
-          case SortOrder(CountCarbon(attr: AttributeReference), order) => plan.getMeasures
+          case SortOrder(Sum(attr: AttributeReference), order) => plan.getMeasures
             .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
             .setSortOrder(getSortDirection(order))
-          case SortOrder(CountDistinctCarbon(attr: AttributeReference), order) => plan.getMeasures
+          case SortOrder(CarbonCount(attr: AttributeReference, _), order) => plan.getMeasures
             .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
             .setSortOrder(getSortDirection(order))
-          case SortOrder(AverageCarbon(attr: AttributeReference, _), order) => plan.getMeasures
+          case SortOrder(CarbonAverage(attr: AttributeReference), order) => plan.getMeasures
             .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
             .setSortOrder(getSortDirection(order))
           case SortOrder(attr: AttributeReference, order) =>
@@ -476,7 +539,7 @@ case class CarbonTableScan(
   def inputRdd: CarbonQueryRDD[CarbonKey, CarbonValue] = {
     val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     // Update the FilterExpressions with extra conditions added through join pushdown
-    if (extraPreds.nonEmpty) {
+    if (extraPreds.nonEmpty) {attributes
       val exps = preProcessExpressions(extraPreds)
       val expressionVal = transformExpression(exps.head)
       val oldExpressionVal = buildCarbonPlan.getFilterExpression
@@ -514,13 +577,35 @@ case class CarbonTableScan(
     big
   }
 
+
+  override def outputsUnsafeRows: Boolean = false
+
   def doExecute(): RDD[InternalRow] = {
     def toType(obj: Any): Any = {
       obj match {
         case s: String => UTF8String.fromString(s)
+        case avg: AbstractAvgAggregator =>
+          if (avg.isFirstTime) {
+            null
+          } else {
+            new GenericArrayData(avg.getAvgState.asInstanceOf[Array[Any]])
+          }
+        case c: CountAggregator => c.getLongValue
+        case s: SumDoubleAggregator => s.getDoubleValue
+        case s: SumBigDecimalAggregator => Decimal(s.getBigDecimalValue)
+        case s: SumLongAggregator => s.getLongValue
+        case m: MaxBigDecimalAggregator => Decimal(m.getBigDecimalValue)
+        case m: MaxLongAggregator => m.getLongValue
+        case m: MaxAggregator => toType(m.getValueObject)
+        case m: MinBigDecimalAggregator => Decimal(m.getBigDecimalValue)
+        case m: MinLongAggregator => m.getLongValue
+        case m: MinAggregator => toType(m.getValueObject)
+        case m: MeasureAggregator => toType(m.getValueObject)
         case _ => obj
       }
     }
+
+//    val unsafeProjection = UnsafeProjection.create(attributes.map(_.dataType).toArray)
     // count(*) query executed in driver by querying from Btree
     if (isCountQuery) {
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
@@ -531,20 +616,26 @@ case class CarbonTableScan(
       val countAgg = new CountAggregator()
       countAgg.setNewValue(rowCount)
       sparkContext.parallelize(
-        Seq(new GenericMutableRow(Seq(countAgg).toArray.asInstanceOf[Array[Any]]))
+        Seq(new GenericMutableRow(Seq(countAgg.getLongValue).toArray.asInstanceOf[Array[Any]]))
       )
     } else {
       // all the other queries are sent to executor
-      inputRdd.map { row =>
-        val dims = row._1.getKey.map(toType)
-        val values = dims
-        new GenericMutableRow(values.asInstanceOf[Array[Any]])
+      inputRdd.mapPartitions { iter =>
+        new Iterator[InternalRow] {
+//          val unsafeProjection = UnsafeProjection.create(attributes.map(_.dataType).toArray)
+          override def hasNext: Boolean = iter.hasNext
+
+          override def next(): InternalRow = {
+            new GenericMutableRow(iter.next()._1.getKey.map(toType))
+          }
+        }
       }
     }
   }
 
   /**
-   * return true if query is count query
+   * return true if query is count queryUtils
+ *
    * @return
    */
   def isCountQuery: Boolean = {
@@ -560,6 +651,5 @@ case class CarbonTableScan(
   def output: Seq[Attribute] = {
     attributes
   }
-
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawAggregate.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawAggregate.scala
deleted file mode 100644
index 27df43f..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawAggregate.scala
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.HashMap
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.unsafe.types.UTF8String
-
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper
-
-/**
- * :: DeveloperApi ::
- * Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each
- * group.
- *
- * @param partial              if true then aggregation is done partially on local data without
- *                             shuffling to
- *                             ensure all values where `groupingExpressions` are equal are present.
- * @param groupingExpressions  expressions that are evaluated to determine grouping.
- * @param aggregateExpressions expressions that are computed for each group.
- * @param child                the input data source.
- */
-@DeveloperApi
-case class CarbonRawAggregate(
-  partial: Boolean,
-  groupingExpressions: Seq[Expression],
-  aggregateExpressions: Seq[NamedExpression],
-  child: SparkPlan)
-  extends UnaryNode {
-
-  override private[sql] lazy val metrics = Map(
-    "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
-  override def requiredChildDistribution: List[Distribution] = {
-    if (partial) {
-      UnspecifiedDistribution :: Nil
-    } else {
-      if (groupingExpressions == Nil) {
-        AllTuples :: Nil
-      } else {
-        ClusteredDistribution(groupingExpressions) :: Nil
-      }
-    }
-  }
-
-  override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
-
-  /**
-   * An aggregate that needs to be computed for each row in a group.
-   *
-   * @param unbound         Unbound version of this aggregate, used for result substitution.
-   * @param aggregate       A bound copy of this aggregate used to create a new aggregation buffer.
-   * @param resultAttribute An attribute used to refer to the result of this aggregate in the final
-   *                        output.
-   */
-  case class ComputedAggregate(
-    unbound: AggregateExpression1,
-    aggregate: AggregateExpression1,
-    resultAttribute: AttributeReference)
-
-  /** A list of aggregates that need to be computed for each group. */
-  private[this] val computedAggregates = aggregateExpressions.flatMap { agg =>
-    agg.collect {
-      case a: AggregateExpression1 =>
-        ComputedAggregate(
-          a,
-          BindReferences.bindReference(a, child.output),
-          AttributeReference(s"aggResult:$a", a.dataType, a.nullable)())
-    }
-  }.toArray
-
-  val order = aggregateExpressions.flatMap { agg =>
-    agg.collect {
-      case a: Expression =>
-        BindReferences.bindReference(a, child.output).collect {
-          case b: BoundReference => b.ordinal
-        }
-    }.flatMap(f => f)
-  }.toArray
-
-  /** The schema of the result of all aggregate evaluations */
-  private[this] val computedSchema = computedAggregates.map(_.resultAttribute)
-
-  /** Creates a new aggregate buffer for a group. */
-  private[this] def newAggregateBuffer(): Array[AggregateFunction1] = {
-    val buffer = new Array[AggregateFunction1](computedAggregates.length)
-    var i = 0
-    while (i < computedAggregates.length) {
-      buffer(i) = computedAggregates(i).aggregate.newInstance()
-      i += 1
-    }
-    buffer
-  }
-
-  /** Named attributes used to substitute grouping attributes into the final result. */
-  private[this] val namedGroups = groupingExpressions.map {
-    case ne: NamedExpression => ne -> ne.toAttribute
-    case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
-  }
-
-  /**
-   * A map of substitutions that are used to insert the aggregate expressions and grouping
-   * expression into the final result expression.
-   */
-  private[this] val resultMap =
-    (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap
-
-  /**
-   * Substituted version of aggregateExpressions expressions which are used to compute final
-   * output rows given a group and the result of all aggregate computations.
-   */
-  private[this] val resultExpressions = aggregateExpressions.map { agg =>
-    agg.transform {
-      case e: Expression if resultMap.contains(e) => resultMap(e)
-    }
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    attachTree(this, "execute") {
-      val numInputRows = longMetric("numInputRows")
-      val numOutputRows = longMetric("numOutputRows")
-      val pOrder = order.zipWithIndex.map {f =>
-        if (f._2 > 0 && order(f._2-1) == f._1) {
-          f._1 + 1
-        } else {
-          f._1
-        }
-      }
-      if (groupingExpressions.isEmpty) {
-        child.execute().mapPartitions { iter =>
-          val buffer = newAggregateBuffer()
-          var currentRow: CarbonRawMutableRow = null
-          while (iter.hasNext) {
-            currentRow = iter.next().asInstanceOf[CarbonRawMutableRow]
-            while (currentRow.hasNext) {
-              numInputRows += 1
-              var i = 0
-              while (i < buffer.length) {
-                buffer(i).update(currentRow)
-                i += 1
-              }
-              currentRow.next()
-            }
-          }
-          val resultProjection = new InterpretedProjection(resultExpressions, computedSchema)
-          val aggregateResults = new GenericMutableRow(computedAggregates.length)
-
-          var i = 0
-          while (i < buffer.length) {
-            aggregateResults(i) = buffer(i).eval(EmptyRow)
-            i += 1
-          }
-
-          numOutputRows += 1
-          Iterator(resultProjection(aggregateResults))
-        }
-      } else {
-        child.execute().mapPartitions { iter =>
-          val hashTable = new HashMap[ByteArrayWrapper, Array[AggregateFunction1]](10000)
-          var currentRow: CarbonRawMutableRow = null
-          while (iter.hasNext) {
-            currentRow = iter.next().asInstanceOf[CarbonRawMutableRow]
-            while (currentRow.hasNext) {
-              numInputRows += 1
-              val currentGroup = currentRow.getKey
-              var currentBuffer = hashTable.get(currentGroup)
-              if (currentBuffer == null) {
-                currentBuffer = newAggregateBuffer()
-                hashTable.put(currentGroup, currentBuffer)
-              }
-
-              var i = 0
-              while (i < currentBuffer.length) {
-                currentBuffer(i).update(currentRow)
-                i += 1
-              }
-              currentRow.next()
-            }
-          }
-
-          new Iterator[InternalRow] {
-            private[this] val hashTableIter = hashTable.entrySet().iterator()
-            private[this] val aggregateResults = new Array[Any](computedAggregates.length)
-
-            override final def hasNext: Boolean = hashTableIter.hasNext
-
-            override final def next(): InternalRow = {
-              val currentEntry = hashTableIter.next()
-              val currentGroup = currentEntry.getKey
-              val currentBuffer = currentEntry.getValue
-              numOutputRows += 1
-              var i = 0
-              while (i < currentBuffer.length) {
-                // Evaluating an aggregate buffer returns the result.  No row is required since we
-                // already added all rows in the group using update.
-                aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
-                i += 1
-              }
-              new GenericMutableRow(
-                currentRow.parseKey(
-                  currentGroup, aggregateResults.asInstanceOf[Array[Object]], pOrder).map(toType))
-            }
-          }
-        }
-      }
-    }
-  }
-
-  def toType(obj: Any): Any = {
-    obj match {
-      case s: String => UTF8String.fromString(s)
-      case _ => obj
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
index 8912def..2005300 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql
 
-import java.util
 import java.util.ArrayList
 
 import scala.collection.JavaConverters._
@@ -27,18 +26,19 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.execution.LeafNode
 import org.apache.spark.sql.hive.CarbonMetastoreCatalog
 import org.apache.spark.sql.types.{DataType, Decimal}
 import org.apache.spark.unsafe.types.UTF8String
 
-import org.carbondata.core.carbon.{AbsoluteTableIdentifier}
+import org.carbondata.core.carbon.AbsoluteTableIdentifier
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.util.CarbonProperties
 import org.carbondata.query.carbon.model._
 import org.carbondata.query.carbon.result.BatchRawResult
 import org.carbondata.query.carbon.wrappers.ByteArrayWrapper
-import org.carbondata.spark.{CarbonFilters, RawKeyVal, RawKeyValImpl}
+import org.carbondata.spark.{CarbonFilters, RawKey, RawKeyImpl, RawKeyVal, RawKeyValImpl}
 import org.carbondata.spark.rdd.CarbonRawQueryRDD
 
 
@@ -54,7 +54,7 @@ case class CarbonRawTableScan(
   val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
   @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
 
-  val attributesNeedToDecode = new util.HashSet[AttributeReference]()
+  val attributesNeedToDecode = new java.util.HashSet[AttributeReference]()
   val unprocessedExprs = new ArrayBuffer[Expression]()
 
   val buildCarbonPlan: CarbonQueryPlan = {
@@ -64,7 +64,7 @@ case class CarbonRawTableScan(
     val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
     val dimAttr = new Array[Attribute](dimensions.size())
     val msrAttr = new Array[Attribute](measures.size())
-    attributesRaw.map { attr =>
+    attributesRaw.foreach { attr =>
       val carbonDimension =
         carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
       if(carbonDimension != null) {
@@ -78,10 +78,10 @@ case class CarbonRawTableScan(
       }
     }
 
-    attributesRaw = (dimAttr.filter(f => f != null)) ++ (msrAttr.filter(f => f != null))
+    attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
 
     var queryOrder: Integer = 0
-    attributesRaw.map { attr =>
+    attributesRaw.foreach { attr =>
         val carbonDimension =
           carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
         if (carbonDimension != null) {
@@ -104,7 +104,7 @@ case class CarbonRawTableScan(
     aggExprsRaw match {
       case Some(aggExprs) =>
         aggExprs.foreach {
-          case Alias(agg: AggregateExpression1, name) =>
+          case Alias(agg: AggregateExpression, name) =>
             agg.collect {
               case attr: AttributeReference =>
                 val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
@@ -136,8 +136,11 @@ case class CarbonRawTableScan(
 
   def processFilterExpressions(plan: CarbonQueryPlan) {
     if (dimensionPredicatesRaw.nonEmpty) {
-      val expressionVal = CarbonFilters
-        .processExpression(dimensionPredicatesRaw, attributesNeedToDecode, unprocessedExprs)
+      val expressionVal = CarbonFilters.processExpression(
+        dimensionPredicatesRaw,
+        attributesNeedToDecode,
+        unprocessedExprs,
+        carbonTable)
       expressionVal match {
         case Some(ce) =>
           // adding dimension used in expression in querystats
@@ -181,14 +184,14 @@ case class CarbonRawTableScan(
   }
 
 
-  def inputRdd: CarbonRawQueryRDD[BatchRawResult, Any] = {
+  def inputRdd: CarbonRawQueryRDD[Array[Any], Any] = {
 
     val conf = new Configuration()
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     buildCarbonPlan.getDimAggregatorInfos.clear()
     val model = QueryModel.createModel(
       absoluteTableIdentifier, buildCarbonPlan, carbonTable)
-    val kv: RawKeyVal[BatchRawResult, Any] = new RawKeyValImpl()
+    val kv: RawKey[Array[Any], Any] = new RawKeyImpl()
     // setting queryid
     buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
 
@@ -208,6 +211,9 @@ case class CarbonRawTableScan(
     big
   }
 
+
+  override def outputsUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
+
   override def doExecute(): RDD[InternalRow] = {
     def toType(obj: Any): Any = {
       obj match {
@@ -215,19 +221,18 @@ case class CarbonRawTableScan(
         case _ => obj
       }
     }
-
-    if (useBinaryAggregator) {
-      inputRdd.map { row =>
-        //      val dims = row._1.map(toType)
-        new CarbonRawMutableRow(row._1.getAllRows, row._1.getQuerySchemaInfo)
-      }
-    } else {
-      inputRdd.flatMap { row =>
-        val buffer = new ArrayBuffer[GenericMutableRow]()
-        while (row._1.hasNext) {
-          buffer += new GenericMutableRow(row._1.next().map(toType))
-        }
-        buffer
+    val outUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
+    inputRdd.mapPartitions { iter =>
+      val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+      new Iterator[InternalRow] {
+        override def hasNext: Boolean = iter.hasNext
+
+        override def next(): InternalRow =
+          if (outUnsafeRows) {
+            unsafeProjection(new GenericMutableRow(iter.next()._1.map(toType)))
+          } else {
+            new GenericMutableRow(iter.next()._1.map(toType))
+          }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index ecaa711..a72aaa1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -1231,12 +1231,12 @@ class CarbonSqlParser()
       case ef ~ db ~ tbl =>
         val tblIdentifier = db match {
           case Some(dbName) =>
-            Seq(dbName, tbl.toLowerCase())
+            TableIdentifier(tbl.toLowerCase, Some(dbName))
           case None =>
-            Seq(tbl.toLowerCase())
+            TableIdentifier(tbl.toLowerCase)
         }
         if (ef.isDefined && "FORMATTED".equalsIgnoreCase(ef.get)) {
-          new DescribeFormattedCommand("describe formatted " + tblIdentifier.mkString("."),
+          new DescribeFormattedCommand("describe formatted " + tblIdentifier,
             tblIdentifier)
         }
         else {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownCarbonAggregator.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownCarbonAggregator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownCarbonAggregator.scala
deleted file mode 100644
index d5d8bee..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownCarbonAggregator.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.io.{DataInput, DataOutput}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.catalyst.expressions.{AggregateExpression1, AggregateFunction1, GenericMutableRow}
-
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk
-import org.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder
-import org.carbondata.query.aggregator.{CustomMeasureAggregator, MeasureAggregator}
-import org.carbondata.query.carbonfilterinterface.RowIntf
-import org.carbondata.query.expression.ColumnExpression
-
-/**
- * Custom Aggregator serialized and used to pushdown all aggregate functions from spark layer with
- * expressions to Carbon layer
- */
-@SerialVersionUID(-3787749110799088697L)
-class SparkUnknownCarbonAggregator(partialAggregate: AggregateExpression1)
-  extends CustomMeasureAggregator {
-
-  val result = scala.collection.mutable.MutableList[GenericMutableRow]()
-  @transient var partialFunction: AggregateFunction1 = null
-
-  @transient var allColumns: java.util.List[ColumnExpression] = null
-  var isRowsAggregated: Boolean = false
-
-  def this() = this(null) // For serializattion
-
-  override def agg(newVal: Double): Unit = {
-
-    throw new UnsupportedOperationException("agg(double) is not a valid method for aggregation")
-  }
-
-  override def agg(newVal: Any): Unit = {
-    throw new UnsupportedOperationException("agg(Object) is not a valid method for aggregation")
-  }
-
-  override def agg(newVal: MeasureColumnDataChunk, index: Int): Unit = {
-    throw new UnsupportedOperationException(
-      "agg(CarbonReadDataHolder, int) is not a valid method for aggregation"
-    )
-  }
-
-  override def getByteArray: Array[Byte] = {
-    throw new UnsupportedOperationException("getByteArray  is not implemented yet")
-  }
-
-  override def getDoubleValue: java.lang.Double = {
-    throw new UnsupportedOperationException("getValue() is not a valid method for result")
-  }
-
-  override def getLongValue: java.lang.Long = {
-    throw new UnsupportedOperationException("getLongValue() is not a valid method for result")
-  }
-
-  override def getBigDecimalValue: java.math.BigDecimal = {
-    throw new
-        UnsupportedOperationException("getBigDecimalValue() is not a valid method for result")
-  }
-
-  override def getValueObject: Object = {
-
-    result.iterator.foreach(v => getPartialFunction.update(v))
-
-    val output = getPartialFunction.eval(null)
-
-    output.asInstanceOf[Object]
-  }
-
-  override def merge(aggregator: MeasureAggregator): Unit = {
-    if (result.nonEmpty) {
-      result.iterator.foreach(v => {
-        getPartialFunction.update(v)
-      }
-      )
-
-      // clear result after submitting to partial function
-      result.clear
-    }
-
-    aggregator match {
-      case s: SparkUnknownCarbonAggregator =>
-        s.result.iterator.foreach(v => getPartialFunction.update(v))
-        s.result.clear
-      case _ => throw new Exception("Invalid merge expected type is" + this.getClass.getName);
-    }
-  }
-
-  private def getPartialFunction = {
-    if (partialFunction == null) {
-      partialFunction = partialAggregate.newInstance
-    }
-    partialFunction
-  }
-
-  override def isFirstTime: Boolean = {
-    isRowsAggregated
-  }
-
-  override def writeData(output: DataOutput): Unit = {
-    throw new UnsupportedOperationException()
-  }
-
-  override def readData(inPut: DataInput): Unit = {
-    throw new UnsupportedOperationException()
-  }
-
-  override def merge(value: Array[Byte]): Unit = {
-
-    throw new UnsupportedOperationException()
-  }
-
-  override def get(): MeasureAggregator = {
-    // Get means, Partition level aggregation is done and pending for merge with other or getValue
-    // So evaluate and store the temporary result here
-
-    this
-  }
-
-  override def compareTo(aggre: MeasureAggregator): Int = {
-    0
-  }
-
-  override def getCopy: MeasureAggregator = {
-    new SparkUnknownCarbonAggregator(partialAggregate)
-  }
-
-  override def setNewValue(newVal: Object): Unit = {
-
-  }
-
-  override def getColumns: java.util.List[ColumnExpression] = {
-    if (allColumns == null) {
-      allColumns = partialAggregate.flatMap(_ collect { case a: CarbonBoundReference => a.colExp })
-        .asJava
-    }
-    allColumns
-  }
-
-  override def agg(row: RowIntf): Unit = {
-    isRowsAggregated = true
-    val values = row.getValues.toSeq.map {
-      case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
-      // solve: java.math.BigDecimal cannot be cast to org.apache.spark.sql.types.Decimal
-      case d: java.math.BigDecimal =>
-        val javaDecVal = new java.math.BigDecimal(d.toString)
-        val scalaDecVal = new scala.math.BigDecimal(javaDecVal)
-        val decConverter = new org.apache.spark.sql.types.Decimal()
-
-        decConverter.set(scalaDecVal)
-      case others => others
-    }
-    result += new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray)
-  }
-
-  override def getNew: MeasureAggregator = {
-    new SparkUnknownCarbonAggregator()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAggregationExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAggregationExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAggregationExpression.scala
new file mode 100644
index 0000000..64930c7
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAggregationExpression.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.agg
+
+import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+case class CarbonAvgHelperExpression(
+    left: Expression,
+    right: Expression,
+    index: Int,
+    dataType: DataType)
+  extends BinaryExpression with CodegenFallback {
+
+  protected override def nullSafeEval(input1: Any, input2: Any): Any = {
+    val left = input1 match {
+      case g: ArrayData => g.getDouble(index)
+      case others => others.toString.toDouble
+    }
+    val right = input2 match {
+      case g: ArrayData => g.getDouble(index)
+      case others => others.toString.toDouble
+    }
+    if (index == 1) {
+      (left + right).toLong
+    } else {
+      dataType match {
+        case d: DecimalType =>
+        case _ => left + right
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAverage.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAverage.scala b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAverage.scala
new file mode 100644
index 0000000..9400b0c
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAverage.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.agg
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
+import org.apache.spark.sql.types._
+
+@ExpressionDescription(
+  usage = "_FUNC_(x) - Returns the mean calculated from values of a group.")
+case class CarbonAverage(child: Expression) extends DeclarativeAggregate {
+
+  override def prettyName: String = "avg"
+
+  override def children: Seq[Expression] = child :: Nil
+
+  override def nullable: Boolean = true
+
+  // Return data type.
+  override def dataType: DataType = resultType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
+
+  override def checkInputDataTypes(): TypeCheckResult = TypeCheckResult.TypeCheckSuccess
+//    TypeUtils.checkForNumericExpr(child.dataType, "function average")
+
+  private lazy val resultType = child.dataType match {
+    case DecimalType.Fixed(p, s) =>
+      DecimalType.bounded(p + 4, s + 4)
+    case _ => DoubleType
+  }
+
+  private lazy val sumDataType = child.dataType match {
+    case _ @ DecimalType.Fixed(p, s) => DecimalType.bounded(p + 10, s)
+    case _ => DoubleType
+  }
+
+  private lazy val sum = AttributeReference("sum", sumDataType)()
+  private lazy val count = AttributeReference("count", LongType)()
+
+  override lazy val aggBufferAttributes = sum :: count :: Nil
+
+  override lazy val initialValues = Seq(
+    /* sum = */ Cast(Literal(0), sumDataType),
+    /* count = */ Literal(0L)
+  )
+
+  override lazy val updateExpressions = Seq(
+    /* sum = */
+    CarbonAvgHelperExpression(
+      sum,
+      Coalesce(child :: Cast(Literal(0), sumDataType) :: Nil), 0, sumDataType),
+    CarbonAvgHelperExpression(
+      count,
+      Coalesce(child :: Cast(Literal(0), LongType) :: Nil), 1, LongType)
+  )
+
+  override lazy val mergeExpressions = Seq(
+    /* sum = */ sum.left + sum.right,
+    /* count = */ count.left + count.right
+  )
+
+  // If all input are nulls, count will be 0 and we will get null after the division.
+  override lazy val evaluateExpression = child.dataType match {
+    case DecimalType.Fixed(p, s) =>
+      // increase the precision and scale to prevent precision loss
+      val dt = DecimalType.bounded(p + 14, s + 4)
+      Cast(Cast(sum, dt) / Cast(count, dt), resultType)
+    case _ =>
+      Cast(sum, resultType) / Cast(count, resultType)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonCount.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonCount.scala b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonCount.scala
new file mode 100644
index 0000000..f0e85b1
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonCount.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.agg
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """_FUNC_(*) - Returns the total number of retrieved rows, including rows containing NULL values.
+    _FUNC_(expr) - Returns the number of rows for which the supplied expression is non-NULL.
+    _FUNC_(DISTINCT expr[, expr...]) - Returns the number of rows for which the supplied expression(s) are unique and non-NULL.""")
+// scalastyle:on line.size.limit
+case class CarbonCount(
+    child: Expression,
+    extraChild: Option[Expression] = None) extends DeclarativeAggregate {
+
+  override def children: Seq[Expression] = child :: Nil
+
+  override def nullable: Boolean = false
+
+  // Return data type.
+  override def dataType: DataType = LongType
+
+  // Expected input data type.
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
+
+  private lazy val count = AttributeReference("count", LongType, nullable = false)()
+
+  override lazy val aggBufferAttributes = count :: Nil
+
+  override lazy val initialValues = Seq(
+    /* count = */ Literal(0L)
+  )
+  private lazy val zero = Cast(Literal(0), dataType)
+
+  private val actualChild = extraChild.getOrElse(child)
+
+  override lazy val updateExpressions = {
+    if (actualChild.nullable) {
+      Seq(
+        /* sum = */
+        Coalesce(Seq(Add(Coalesce(Seq(count, zero)), Cast(actualChild, dataType)), count))
+      )
+    } else {
+      Seq(
+        /* sum = */
+        Add(Coalesce(Seq(count, zero)), Cast(actualChild, dataType))
+      )
+    }
+  }
+
+  override lazy val mergeExpressions = Seq(
+    /* count = */ count.left + count.right
+  )
+
+  override lazy val evaluateExpression = count
+
+  override def defaultResult: Option[Literal] = Option(Literal(0L))
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 6cbae27..fcc2e8a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
 import org.apache.spark.sql.hive.HiveContext
@@ -1101,15 +1102,9 @@ private[sql] case class ShowCreateTable(cm: tableModel, override val output: Seq
 
         if (sqlContext.tableNames(dbName).map(x => x.toLowerCase())
           .contains(tableName.toLowerCase())) {
-          if (dbName.nonEmpty) {
-            dataFrame = DataFrame(sqlContext,
-              sqlContext.catalog.lookupRelation(Seq(dbName, tableName)))
-          }
-          else {
-            dataFrame = DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName)))
-          }
-        }
-        else {
+          dataFrame = DataFrame(sqlContext,
+            sqlContext.catalog.lookupRelation(TableIdentifier(tableName, Some(dbName))))
+        } else {
           LOGGER.error(s"Input source table $tableName does not exists")
           sys.error(s"Input source table $tableName does not exists")
         }
@@ -1164,7 +1159,7 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
 
     val relation =
       CarbonEnv.getInstance(sqlContext).carbonCatalog
-        .lookupRelation1(Option(schemaName), tableName, None)(sqlContext)
+        .lookupRelation1(Option(schemaName), tableName)(sqlContext)
         .asInstanceOf[CarbonRelation]
     if (relation == null) {
       sys.error(s"Table $schemaName.$tableName does not exist")
@@ -1243,20 +1238,20 @@ private[sql] case class CreateCube(cm: tableModel) extends RunnableCommand {
       try {
         sqlContext.sql(
           s"""CREATE TABLE $dbName.$tbName USING org.apache.spark.sql.CarbonSource""" +
-          s""" OPTIONS (cubename "$dbName.$tbName", tablePath "$cubePath") """).collect
+          s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$cubePath") """).collect
       } catch {
         case e: Exception =>
 
+          val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
           val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-            .lookupRelation2(Seq(dbName, tbName))(sqlContext).asInstanceOf[CarbonRelation]
+            .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
           if (relation != null) {
             LOGGER.audit(s"Deleting Table [$tbName] under Database [$dbName]" +
                          "as create TABLE failed")
             CarbonEnv.getInstance(sqlContext).carbonCatalog
-              .dropCube(relation.cubeMeta.partitioner.partitionCount,
+              .dropTable(relation.cubeMeta.partitioner.partitionCount,
                 relation.cubeMeta.storePath,
-                dbName,
-                tbName)(sqlContext)
+                identifier)(sqlContext)
           }
 
           LOGGER.audit(s"Table creation with Database name [$dbName] " +
@@ -1289,24 +1284,21 @@ private[sql] case class DeleteLoadsById(
 
     // validate load ids first
     validateLoadIds
-    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
 
+    val identifier = TableIdentifier(tableName, Option(dbName))
     val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
-      Option(schemaName),
-      tableName,
-      None)(sqlContext).asInstanceOf[CarbonRelation]
+      identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     if (relation == null) {
-      LOGGER.audit(s"The delete load by Id is failed. Table $schemaName.$tableName does not exist")
-      sys.error(s"Table $schemaName.$tableName does not exist")
+      LOGGER.audit(s"The delete load by Id is failed. Table $dbName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
     }
 
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(schemaName + '_' + tableName)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
 
     if (null == carbonTable) {
-      CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
-        Option(schemaName),
-        tableName,
-        None)(sqlContext).asInstanceOf[CarbonRelation]
+      CarbonEnv.getInstance(sqlContext).carbonCatalog
+        .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     }
     val path = carbonTable.getMetaDataFilepath
 
@@ -1359,30 +1351,24 @@ private[sql] case class DeleteLoadsByLoadDate(
   def run(sqlContext: SQLContext): Seq[Row] = {
 
     LOGGER.audit("The delete load by load date request has been received.")
-    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
-
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
-      Option(schemaName),
-      tableName,
-     None
-    )(sqlContext).asInstanceOf[CarbonRelation]
+    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+      .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     if (relation == null) {
       LOGGER
-        .audit(s"The delete load by load date is failed. Table $schemaName.$tableName does not " +
+        .audit(s"The delete load by load date is failed. Table $dbName.$tableName does not " +
          s"exist")
-      sys.error(s"Table $schemaName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
     }
 
     var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-      .getCarbonTable(schemaName + '_' + tableName)
+      .getCarbonTable(dbName + '_' + tableName)
     var segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
 
     if (null == carbonTable) {
-      var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
-        Option(schemaName),
-        tableName,
-        None
-      )(sqlContext).asInstanceOf[CarbonRelation]
+      var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+        .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     }
     var path = carbonTable.getMetaDataFilepath()
 
@@ -1409,19 +1395,20 @@ private[sql] case class LoadCube(
 
   def run(sqlContext: SQLContext): Seq[Row] = {
 
-    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val identifier = TableIdentifier(tableName, Option(dbName))
     if (isOverwriteExist) {
-      sys.error("Overwrite is not supported for carbon table with " + schemaName + "." + tableName)
+      sys.error("Overwrite is not supported for carbon table with " + dbName + "." + tableName)
     }
     if (null == org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-      .getCarbonTable(schemaName + "_" + tableName)) {
-      logError("Data loading failed. table not found: " + schemaName + "_" + tableName)
-      LOGGER.audit("Data loading failed. table not found: " + schemaName + "_" + tableName)
-      sys.error("Data loading failed. table not found: " + schemaName + "_" + tableName)
+      .getCarbonTable(dbName + "_" + tableName)) {
+      logError("Data loading failed. table not found: " + dbName + "_" + tableName)
+      LOGGER.audit("Data loading failed. table not found: " + dbName + "_" + tableName)
+      sys.error("Data loading failed. table not found: " + dbName + "_" + tableName)
     }
     CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory.getCarbonLockObj(org.carbondata.core.
-      carbon.metadata.CarbonMetadata.getInstance().getCarbonTable(schemaName + "_" + tableName).
+      carbon.metadata.CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName).
       getMetaDataFilepath, LockUsage.METADATA_LOCK)
     try {
       if (carbonLock.lockWithRetries()) {
@@ -1432,12 +1419,10 @@ private[sql] case class LoadCube(
       }
 
       val factPath = FileUtils.getPaths(CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
-      val relation =
-        CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .lookupRelation1(Option(schemaName), tableName, None)(sqlContext)
-          .asInstanceOf[CarbonRelation]
+      val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+        .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
       if (relation == null) {
-        sys.error(s"Table $schemaName.$tableName does not exist")
+        sys.error(s"Table $dbName.$tableName does not exist")
       }
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(relation.cubeMeta.carbonTableIdentifier.getTableName)
@@ -1511,7 +1496,7 @@ private[sql] case class LoadCube(
         if (null == relation.cubeMeta.partitioner.partitionColumn ||
             relation.cubeMeta.partitioner.partitionColumn(0).isEmpty) {
           LOGGER.info("Initiating Direct Load for the Table : (" +
-                      schemaName + "." + tableName + ")")
+                      dbName + "." + tableName + ")")
           carbonLoadModel.setFactFilePath(factPath)
           carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
           carbonLoadModel.setCsvHeader(fileHeader)
@@ -1526,9 +1511,9 @@ private[sql] case class LoadCube(
           partitionLocation += System.currentTimeMillis()
           FileFactory.mkdirs(partitionLocation, fileType)
           LOGGER.info("Initiating Data Partitioning for the Table : (" +
-                      schemaName + "." + tableName + ")")
+                      dbName + "." + tableName + ")")
           partitionStatus = CarbonContext.partitionData(
-            schemaName,
+            dbName,
             tableName,
             factPath,
             partitionLocation,
@@ -1605,8 +1590,9 @@ private[sql] case class PartitionData(databaseName: String, tableName: String, f
   var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
 
   def run(sqlContext: SQLContext): Seq[Row] = {
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
-      Option(databaseName), tableName, None)(sqlContext).asInstanceOf[CarbonRelation]
+    val identifier = TableIdentifier(tableName, Option(databaseName))
+    val relation = CarbonEnv.getInstance(sqlContext)
+      .carbonCatalog.lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     val dimNames = relation.cubeMeta.carbonTable
       .getDimensionByTableName(tableName).asScala.map(_.getColName)
     val msrNames = relation.cubeMeta.carbonTable
@@ -1624,60 +1610,18 @@ private[sql] case class PartitionData(databaseName: String, tableName: String, f
   }
 }
 
-private[sql] case class LoadAggregationTable(
-    newSchema: CarbonTable,
-    schemaName: String,
-    cubeName: String,
-    aggTableName: String) extends RunnableCommand {
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
-      Option(schemaName),
-      cubeName,
-      None)(sqlContext).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      sys.error(s"Table $schemaName.$cubeName does not exist")
-    }
-    val carbonLoadModel = new CarbonLoadModel()
-    carbonLoadModel.setTableName(cubeName)
-    val table = relation.cubeMeta.carbonTable
-    carbonLoadModel.setAggTableName(aggTableName)
-    carbonLoadModel.setTableName(table.getFactTableName)
-    carbonLoadModel.setAggLoadRequest(true)
-    var storeLocation = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
-        System.getProperty("java.io.tmpdir"))
-    storeLocation = storeLocation + "/carbonstore/" + System.currentTimeMillis()
-    val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
-    var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
-    if (null == kettleHomePath) {
-      kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
-    }
-    if (kettleHomePath == null) {
-      sys.error(s"carbon.kettle.home is not set")
-    }
-    CarbonDataRDDFactory.loadCarbonData(
-      sqlContext,
-      carbonLoadModel,
-      storeLocation,
-      relation.cubeMeta.storePath,
-      kettleHomePath,
-      relation.cubeMeta.partitioner, columinar, isAgg = true)
-    Seq.empty
-  }
-}
-
-
 private[sql] case class ShowAllTablesInSchema(
     schemaNameOp: Option[String],
     override val output: Seq[Attribute]
 ) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
-    CarbonEnv.getInstance(sqlContext).carbonCatalog.getCubes(Some(schemaName))(sqlContext).map(
-      x => Row(x._1,
-        sqlContext.asInstanceOf[HiveContext].catalog.tableExists(Seq(schemaName, x._1))))
+    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    CarbonEnv.getInstance(sqlContext).carbonCatalog.getTables(Some(dbName))(sqlContext)
+      .map{x =>
+        Row(x._1, sqlContext.asInstanceOf[HiveContext]
+          .catalog.tableExists(TableIdentifier(x._1, Some(dbName))))
+      }
   }
 }
 
@@ -1685,9 +1629,9 @@ private[sql] case class ShowAllTables(override val output: Seq[Attribute])
   extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    CarbonEnv.getInstance(sqlContext).carbonCatalog.getAllCubes()(sqlContext)
+    CarbonEnv.getInstance(sqlContext).carbonCatalog.getAllTables()(sqlContext)
       .map { x =>
-        Row(x._1, x._2, sqlContext.asInstanceOf[HiveContext].catalog.tableExists(Seq(x._1, x._2)))
+        Row(x.database.get, x.table, sqlContext.asInstanceOf[HiveContext].catalog.tableExists(x))
       }
   }
 
@@ -1705,18 +1649,19 @@ private[sql] case class ShowAllTablesDetail(
   }
 }
 
-private[sql] case class MergeTable(schemaName: String, cubeName: String, tableName: String)
+private[sql] case class MergeTable(dbName: String, cubeName: String, tableName: String)
   extends RunnableCommand {
 
   def run(sqlContext: SQLContext): Seq[Row] = {
+    val identifier = TableIdentifier(tableName, Option(cubeName))
     val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-      .lookupRelation2(Seq(schemaName, cubeName), None)(sqlContext).asInstanceOf[CarbonRelation]
+      .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     if (relation == null) {
-      sys.error(s"Table $schemaName.$cubeName does not exist")
+      sys.error(s"Table $dbName.$cubeName does not exist")
     }
     val carbonLoadModel = new CarbonLoadModel()
     carbonLoadModel.setTableName(cubeName)
-    carbonLoadModel.setDatabaseName(schemaName)
+    carbonLoadModel.setDatabaseName(dbName)
     val table = relation.cubeMeta.carbonTable
     var isTablePresent = false
     if (table.getFactTableName.equals(tableName)) {
@@ -1749,34 +1694,35 @@ private[sql] case class MergeTable(schemaName: String, cubeName: String, tableNa
 }
 
 private[sql] case class DropCubeCommand(ifExistsSet: Boolean, schemaNameOp: Option[String],
-    cubeName: String)
+    tableName: String)
   extends RunnableCommand {
 
   def run(sqlContext: SQLContext): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val identifier = TableIdentifier(tableName, Option(dbName))
     val tmpTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-      .getCarbonTable(schemaName + "_" + cubeName)
+      .getCarbonTable(dbName + "_" + tableName)
     if (null == tmpTable) {
       if (!ifExistsSet) {
         LOGGER
-          .audit(s"Dropping carbon table with Database name [$schemaName] and Table name" +
+          .audit(s"Dropping carbon table with Database name [$dbName] and Table name" +
                  "[$cubeName] failed")
-        LOGGER.error(s"Carbon Table $schemaName.$cubeName metadata does not exist")
+        LOGGER.error(s"Carbon Table $dbName.$tableName metadata does not exist")
       }
-      if (sqlContext.tableNames(schemaName).map(x => x.toLowerCase())
-        .contains(cubeName.toLowerCase())) {
+      if (sqlContext.tableNames(dbName).map(x => x.toLowerCase())
+        .contains(tableName.toLowerCase())) {
         try {
           sqlContext.asInstanceOf[HiveContext].catalog.client.
-            runSqlHive(s"DROP TABLE IF EXISTS $schemaName.$cubeName")
+            runSqlHive(s"DROP TABLE IF EXISTS $dbName.$tableName")
         } catch {
           case e: RuntimeException =>
             LOGGER.audit(
-              s"Error While deleting the table $schemaName.$cubeName during drop carbon table" +
+              s"Error While deleting the table $dbName.$tableName during drop carbon table" +
               e.getMessage)
         }
       } else if (!ifExistsSet) {
-        sys.error(s"Carbon Table $schemaName.$cubeName does not exist")
+        sys.error(s"Carbon Table $dbName.$tableName does not exist")
       }
     } else {
       CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
@@ -1787,32 +1733,33 @@ private[sql] case class DropCubeCommand(ifExistsSet: Boolean, schemaNameOp: Opti
           logInfo("Successfully able to get the table metadata file lock")
         } else {
           LOGGER.audit(
-            s"Dropping table with Database name [$schemaName] and Table name [$cubeName] " +
+            s"Dropping table with Database name [$dbName] and Table name [$tableName] " +
             s"failed as the Table is locked")
           sys.error("Table is locked for updation. Please try after some time")
         }
 
         val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .lookupRelation2(Seq(schemaName, cubeName))(sqlContext).asInstanceOf[CarbonRelation]
+          .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
 
         if (relation == null) {
           if (!ifExistsSet) {
-            sys.error(s"Table $schemaName.$cubeName does not exist")
+            sys.error(s"Table $dbName.$tableName does not exist")
           }
         } else {
-          LOGGER.audit(s"Deleting table [$cubeName] under database [$schemaName]")
+          LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
 
           CarbonEnv.getInstance(sqlContext).carbonCatalog
-            .dropCube(relation.cubeMeta.partitioner.partitionCount,
+            .dropTable(relation.cubeMeta.partitioner.partitionCount,
               relation.cubeMeta.storePath,
-              relation.cubeMeta.carbonTableIdentifier.getDatabaseName,
-              relation.cubeMeta.carbonTableIdentifier.getTableName)(sqlContext)
+              TableIdentifier(relation.cubeMeta.carbonTableIdentifier.getTableName,
+                Some(relation.cubeMeta.carbonTableIdentifier.getDatabaseName))
+              )(sqlContext)
           CarbonDataRDDFactory
-            .dropCube(sqlContext.sparkContext, schemaName, cubeName,
+            .dropCube(sqlContext.sparkContext, dbName, tableName,
               relation.cubeMeta.partitioner)
-          QueryPartitionHelper.getInstance().removePartition(schemaName, cubeName)
+          QueryPartitionHelper.getInstance().removePartition(dbName, tableName)
 
-          LOGGER.audit(s"Deleted table [$cubeName] under database [$schemaName]")
+          LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
         }
       }
       finally {
@@ -1825,7 +1772,7 @@ private[sql] case class DropCubeCommand(ifExistsSet: Boolean, schemaNameOp: Opti
               CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
             }
             // delete bad record log after drop cube
-            val badLogPath = CarbonUtil.getBadLogPath(schemaName +  File.separator + cubeName)
+            val badLogPath = CarbonUtil.getBadLogPath(dbName +  File.separator + tableName)
             val badLogFileType = FileFactory.getFileType(badLogPath)
             if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
               val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
@@ -1847,19 +1794,20 @@ private[sql] case class DropAggregateTableCommand(ifExistsSet: Boolean,
     tableName: String) extends RunnableCommand {
 
   def run(sqlContext: SQLContext): Seq[Row] = {
-    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.
-      lookupRelation1(Some(schemaName), tableName, None)(sqlContext).asInstanceOf[CarbonRelation]
+    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+      .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
 
     if (relation == null) {
       if (!ifExistsSet) {
-        sys.error(s"Aggregate Table $schemaName.$tableName does not exist")
+        sys.error(s"Aggregate Table $dbName.$tableName does not exist")
       }
     }
     else {
       CarbonDataRDDFactory.dropAggregateTable(
         sqlContext.sparkContext,
-        schemaName,
+        dbName,
         tableName,
         relation.cubeMeta.partitioner)
     }
@@ -1941,12 +1889,12 @@ private[sql] case class ShowAggregateTables(
 private[sql] case class DescribeCommandFormatted(
     child: SparkPlan,
     override val output: Seq[Attribute],
-    tblIdentifier: Seq[String])
+    tblIdentifier: TableIdentifier)
   extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-      .lookupRelation2(tblIdentifier, None)(sqlContext).asInstanceOf[CarbonRelation]
+      .lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation]
     var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
       val comment = if (relation.metaData.dims.contains(field.name)) {
         val dimension = relation.metaData.carbonTable.getDimensionByName(
@@ -2036,15 +1984,16 @@ private[sql] case class DeleteLoadByDate(
   def run(sqlContext: SQLContext): Seq[Row] = {
 
     LOGGER.audit("The delete load by date request has been received.")
-    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val identifier = TableIdentifier(cubeName, Option(dbName))
     val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-      .lookupRelation1(Some(schemaName), cubeName, None)(sqlContext).asInstanceOf[CarbonRelation]
+      .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
     var level: String = ""
     var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata
-         .getInstance().getCarbonTable(schemaName + '_' + cubeName)
+         .getInstance().getCarbonTable(dbName + '_' + cubeName)
     if (relation == null) {
-      LOGGER.audit(s"The delete load by date is failed. Table $schemaName.$cubeName does not exist")
-      sys.error(s"Table $schemaName.$cubeName does not exist")
+      LOGGER.audit(s"The delete load by date is failed. Table $dbName.$cubeName does not exist")
+      sys.error(s"Table $dbName.$cubeName does not exist")
     }
 
     val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
@@ -2054,8 +2003,8 @@ private[sql] case class DeleteLoadByDate(
     if (matches.isEmpty) {
       LOGGER.audit(
         "The delete load by date is failed. " +
-        "Table $schemaName.$cubeName does not contain date field " + dateField)
-      sys.error(s"Table $schemaName.$cubeName does not contain date field " + dateField)
+        "Table $dbName.$cubeName does not contain date field " + dateField)
+      sys.error(s"Table $dbName.$cubeName does not contain date field " + dateField)
     }
     else {
       level = matches.asJava.get(0).name
@@ -2067,7 +2016,7 @@ private[sql] case class DeleteLoadByDate(
     CarbonDataRDDFactory.deleteLoadByDate(
       sqlContext,
       new CarbonDataLoadSchema(carbonTable),
-      schemaName,
+      dbName,
       cubeName,
       tableName,
       CarbonEnv.getInstance(sqlContext).carbonCatalog.storePath,
@@ -2089,13 +2038,14 @@ private[sql] case class CleanFiles(
 
   def run(sqlContext: SQLContext): Seq[Row] = {
     LOGGER.audit("The clean files request has been received.")
-    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+    val identifier = TableIdentifier(cubeName, Option(dbName))
     val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-      .lookupRelation1(Some(schemaName), cubeName, None)(sqlContext).
+      .lookupRelation1(identifier)(sqlContext).
       asInstanceOf[CarbonRelation]
     if (relation == null) {
-      LOGGER.audit(s"The clean files request is failed. Table $schemaName.$cubeName does not exist")
-      sys.error(s"Table $schemaName.$cubeName does not exist")
+      LOGGER.audit(s"The clean files request is failed. Table $dbName.$cubeName does not exist")
+      sys.error(s"Table $dbName.$cubeName does not exist")
     }
 
     val carbonLoadModel = new CarbonLoadModel()


Mime
View raw message