carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [41/56] [abbrv] incubator-carbondata git commit: Supported both Spark 1.5.2 and 1.6.1 versions in Carbondata (#693)
Date Thu, 23 Jun 2016 14:16:29 GMT
Supported both Spark 1.5.2 and 1.6.1 versions in Carbondata (#693)

* Supported both Spark 1.5.2 and 1.6.1 versions in Carbon

* Fixed bug

* Changed default version to 1.5.2


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/19590dba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/19590dba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/19590dba

Branch: refs/heads/master
Commit: 19590dba38f69c262c8865e012e9620f71a3f0ab
Parents: 1c725f5
Author: Ravindra Pesala <ravi.pesala@gmail.com>
Authored: Sun Jun 19 14:34:16 2016 +0530
Committer: Jacky Li <jacky.likun@huawei.com>
Committed: Sun Jun 19 17:04:16 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/common/util/QueryTest.scala       | 16 ----
 .../spark/sql/CarbonDatasourceRelation.scala    |  2 +-
 .../org/apache/spark/sql/CarbonOperators.scala  |  8 +-
 .../sql/agg/CarbonAggregationExpression.scala   | 50 -----------
 .../apache/spark/sql/agg/CarbonAverage.scala    | 89 --------------------
 .../org/apache/spark/sql/agg/CarbonCount.scala  | 77 -----------------
 .../spark/sql/catalyst/TableIdentifier.scala    | 57 +++++++++++++
 .../spark/sql/hive/CarbonMetastoreCatalog.scala | 24 ++----
 .../spark/sql/hive/CarbonSQLDialect.scala       |  6 +-
 .../spark/sql/hive/CarbonStrategies.scala       |  6 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  2 +-
 .../spark/sql/common/util/QueryTest.scala       | 16 ----
 pom.xml                                         | 15 +++-
 13 files changed, 93 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index c4f09cc..f9960d3 100644
--- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConversions._
 
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 
@@ -71,21 +70,6 @@ class QueryTest extends PlanTest {
     }
   }
 
-  /**
-   * Asserts that a given [[DataFrame]] will be executed using the given number of cached
results.
-   */
-  def assertCached(query: DataFrame, numCachedTables: Int = 1): Unit = {
-    val planWithCaching = query.queryExecution.withCachedData
-    val cachedData = planWithCaching collect {
-      case cached: InMemoryRelation => cached
-    }
-
-    assert(
-      cachedData.size == numCachedTables,
-      s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n"
+
-        planWithCaching)
-  }
-
   protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = {
     checkAnswer(df, Seq(expectedAnswer))
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 94b38a4..e14f86a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -58,7 +58,7 @@ class CarbonSource
         val options = new CarbonOption(parameters)
         val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
         val ident = tableIdentifier match {
-          case Seq(name) => TableIdentifier(name)
+          case Seq(name) => TableIdentifier(name, None)
           case Seq(db, name) => TableIdentifier(name, Some(db))
         }
         CarbonDatasourceRelation(ident, None)(sqlContext)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index 6b2bdc2..782b87c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -39,7 +39,8 @@ import org.carbondata.spark.rdd.CarbonScanRDD
 case class CarbonScan(
     var attributesRaw: Seq[Attribute],
     relationRaw: CarbonRelation,
-    dimensionPredicatesRaw: Seq[Expression])(@transient val ocRaw: SQLContext) extends LeafNode
{
+    dimensionPredicatesRaw: Seq[Expression],
+    useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode
{
   val carbonTable = relationRaw.metaData.carbonTable
   val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
   val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
@@ -184,10 +185,11 @@ case class CarbonScan(
   }
 
 
-  override def outputsUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
+  override def outputsUnsafeRows: Boolean =
+    (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
 
   override def doExecute(): RDD[InternalRow] = {
-    val outUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
+    val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
     inputRdd.mapPartitions { iter =>
       val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
       new Iterator[InternalRow] {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAggregationExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAggregationExpression.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAggregationExpression.scala
deleted file mode 100644
index 64930c7..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAggregationExpression.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.agg
-
-import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.types._
-
-case class CarbonAvgHelperExpression(
-    left: Expression,
-    right: Expression,
-    index: Int,
-    dataType: DataType)
-  extends BinaryExpression with CodegenFallback {
-
-  protected override def nullSafeEval(input1: Any, input2: Any): Any = {
-    val left = input1 match {
-      case g: ArrayData => g.getDouble(index)
-      case others => others.toString.toDouble
-    }
-    val right = input2 match {
-      case g: ArrayData => g.getDouble(index)
-      case others => others.toString.toDouble
-    }
-    if (index == 1) {
-      (left + right).toLong
-    } else {
-      dataType match {
-        case d: DecimalType =>
-        case _ => left + right
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAverage.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAverage.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAverage.scala
deleted file mode 100644
index 9400b0c..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonAverage.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.agg
-
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
-import org.apache.spark.sql.types._
-
-@ExpressionDescription(
-  usage = "_FUNC_(x) - Returns the mean calculated from values of a group.")
-case class CarbonAverage(child: Expression) extends DeclarativeAggregate {
-
-  override def prettyName: String = "avg"
-
-  override def children: Seq[Expression] = child :: Nil
-
-  override def nullable: Boolean = true
-
-  // Return data type.
-  override def dataType: DataType = resultType
-
-  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
-
-  override def checkInputDataTypes(): TypeCheckResult = TypeCheckResult.TypeCheckSuccess
-//    TypeUtils.checkForNumericExpr(child.dataType, "function average")
-
-  private lazy val resultType = child.dataType match {
-    case DecimalType.Fixed(p, s) =>
-      DecimalType.bounded(p + 4, s + 4)
-    case _ => DoubleType
-  }
-
-  private lazy val sumDataType = child.dataType match {
-    case _ @ DecimalType.Fixed(p, s) => DecimalType.bounded(p + 10, s)
-    case _ => DoubleType
-  }
-
-  private lazy val sum = AttributeReference("sum", sumDataType)()
-  private lazy val count = AttributeReference("count", LongType)()
-
-  override lazy val aggBufferAttributes = sum :: count :: Nil
-
-  override lazy val initialValues = Seq(
-    /* sum = */ Cast(Literal(0), sumDataType),
-    /* count = */ Literal(0L)
-  )
-
-  override lazy val updateExpressions = Seq(
-    /* sum = */
-    CarbonAvgHelperExpression(
-      sum,
-      Coalesce(child :: Cast(Literal(0), sumDataType) :: Nil), 0, sumDataType),
-    CarbonAvgHelperExpression(
-      count,
-      Coalesce(child :: Cast(Literal(0), LongType) :: Nil), 1, LongType)
-  )
-
-  override lazy val mergeExpressions = Seq(
-    /* sum = */ sum.left + sum.right,
-    /* count = */ count.left + count.right
-  )
-
-  // If all input are nulls, count will be 0 and we will get null after the division.
-  override lazy val evaluateExpression = child.dataType match {
-    case DecimalType.Fixed(p, s) =>
-      // increase the precision and scale to prevent precision loss
-      val dt = DecimalType.bounded(p + 14, s + 4)
-      Cast(Cast(sum, dt) / Cast(count, dt), resultType)
-    case _ =>
-      Cast(sum, resultType) / Cast(count, resultType)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonCount.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonCount.scala b/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonCount.scala
deleted file mode 100644
index f0e85b1..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/agg/CarbonCount.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.agg
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
-import org.apache.spark.sql.types._
-
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = """_FUNC_(*) - Returns the total number of retrieved rows, including rows containing
NULL values.
-    _FUNC_(expr) - Returns the number of rows for which the supplied expression is non-NULL.
-    _FUNC_(DISTINCT expr[, expr...]) - Returns the number of rows for which the supplied
expression(s) are unique and non-NULL.""")
-// scalastyle:on line.size.limit
-case class CarbonCount(
-    child: Expression,
-    extraChild: Option[Expression] = None) extends DeclarativeAggregate {
-
-  override def children: Seq[Expression] = child :: Nil
-
-  override def nullable: Boolean = false
-
-  // Return data type.
-  override def dataType: DataType = LongType
-
-  // Expected input data type.
-  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
-
-  private lazy val count = AttributeReference("count", LongType, nullable = false)()
-
-  override lazy val aggBufferAttributes = count :: Nil
-
-  override lazy val initialValues = Seq(
-    /* count = */ Literal(0L)
-  )
-  private lazy val zero = Cast(Literal(0), dataType)
-
-  private val actualChild = extraChild.getOrElse(child)
-
-  override lazy val updateExpressions = {
-    if (actualChild.nullable) {
-      Seq(
-        /* sum = */
-        Coalesce(Seq(Add(Coalesce(Seq(count, zero)), Cast(actualChild, dataType)), count))
-      )
-    } else {
-      Seq(
-        /* sum = */
-        Add(Coalesce(Seq(count, zero)), Cast(actualChild, dataType))
-      )
-    }
-  }
-
-  override lazy val mergeExpressions = Seq(
-    /* count = */ count.left + count.right
-  )
-
-  override lazy val evaluateExpression = count
-
-  override def defaultResult: Option[Literal] = Option(Literal(0L))
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
new file mode 100644
index 0000000..b5b6eff
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+
+/**
+ * Identifies a `table` in `database`.  If `database` is not defined, the current database
is used.
+ */
+private[sql] case class TableIdentifier(table: String, database: Option[String]) {
+  def this(table: String) = this(table, None)
+
+  override def toString: String = quotedString
+
+  def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`")
+
+  def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table)
+
+  def withDatabase(database: String): TableIdentifier = this.copy(database = Some(database))
+
+  def toSeq: Seq[String] = database.toSeq :+ table
+
+}
+
+private[sql] object TableIdentifier {
+  def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
+
+  implicit def toTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
+    tableIdentifier match {
+      case Seq(dbName, tableName) => TableIdentifier(tableName, Some(dbName))
+      case Seq(tableName) => TableIdentifier(tableName, None)
+      case _ => throw new NoSuchTableException
+    }
+  }
+
+  implicit def toSequence(tableIdentifier: TableIdentifier): Seq[String] = {
+    tableIdentifier.database match {
+      case Some(dbName) => Seq(dbName, tableIdentifier.table)
+      case _ => Seq(tableIdentifier.table)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index 30a1070..9f1a2d6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -68,7 +68,7 @@ object CarbonMetastoreCatalog {
   def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = {
     val createTBase = new ThriftReader.TBaseCreator() {
       override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
-        return new TableInfo();
+        new TableInfo()
       }
     }
     val thriftReader = new ThriftReader(schemaFilePath, createTBase)
@@ -85,10 +85,10 @@ object CarbonMetastoreCatalog {
   def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit =
{
     val thriftWriter = new ThriftWriter(schemaFilePath, false)
     try {
-      thriftWriter.open();
+      thriftWriter.open()
       thriftWriter.write(tableInfo);
     } finally {
-      thriftWriter.close();
+      thriftWriter.close()
     }
   }
 
@@ -121,16 +121,6 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String,
client: C
     false
   }
 
-  override def lookupRelation(tableIdentifier: TableIdentifier,
-      alias: Option[String] = None): LogicalPlan = {
-    try {
-      super.lookupRelation(tableIdentifier, alias)
-    } catch {
-      case s: java.lang.Exception =>
-        lookupRelation1(tableIdentifier, alias)(hive.asInstanceOf[SQLContext])
-    }
-  }
-
   def getCubeCreationTime(schemaName: String, cubeName: String): Long = {
     val cubeMeta = metadata.cubesMeta.filter(
       c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(schemaName) &&
@@ -253,7 +243,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String,
client: C
                     .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
                   CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
                   val carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-                      .getCarbonTable(cubeUniqueName);
+                      .getCarbonTable(cubeUniqueName)
                   metaDataBuffer += TableMeta(
                     carbonTable.getCarbonTableIdentifier,
                     storePath,
@@ -306,7 +296,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String,
client: C
       .add(schemaEvolutionEntry)
 
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-        tableInfo.getFactTable().getTableId())
+        tableInfo.getFactTable.getTableId)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
@@ -344,7 +334,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String,
client: C
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
       wrapperTableInfo.getTableUniqueName)
-    for (i <- 0 until metadata.cubesMeta.size) {
+    for (i <- metadata.cubesMeta.indices) {
       if (wrapperTableInfo.getTableUniqueName.equals(
         metadata.cubesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
         metadata.cubesMeta(i).carbonTable = carbonTable
@@ -508,7 +498,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String,
client: C
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
     if (!tableExists(tableIdentifier)(sqlContext)) {
-      LOGGER.audit(s"Drop Table failed. Table with ${dbName}.$tableName does not exist")
+      LOGGER.audit(s"Drop Table failed. Table with $dbName.$tableName does not exist")
       sys.error(s"Table with $dbName.$tableName does not exist")
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
index edfaa90..d80703e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
@@ -18,11 +18,12 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.CarbonSqlParser
+import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 import org.carbondata.spark.exception.MalformedCarbonCommandException
 
-private[spark] class CarbonSQLDialect(context: HiveContext) extends HiveQLDialect(context)
{
+private[spark] class CarbonSQLDialect(context: HiveContext) extends ParserDialect {
 
   @transient
   protected val sqlParser = new CarbonSqlParser
@@ -36,7 +37,8 @@ private[spark] class CarbonSQLDialect(context: HiveContext) extends HiveQLDialec
       // because hive can no parse carbon command
       case ce: MalformedCarbonCommandException =>
         throw ce
-      case _ => super.parse(sqlText)
+      case _ =>
+        HiveQl.parseSql(sqlText)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 76edc11..a989efe 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.TableIdentifier._
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
 import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
@@ -134,7 +135,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan]
{
       val projectExprsNeedToDecode = new java.util.HashSet[Attribute]()
       val scan = CarbonScan(projectList.map(_.toAttribute),
         relation.carbonRelation,
-        predicates)(sqlContext)
+        predicates,
+        useUnsafeCoversion = false)(sqlContext)
       projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
       if (projectExprsNeedToDecode.size() > 0) {
         val decoder = getCarbonDecoder(logicalRelation,
@@ -194,7 +196,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan]
{
         ExecutedCommand(ShowAllTablesDetail(schemaName, plan.output)) :: Nil
       case DropTable(tableName, ifNotExists)
         if CarbonEnv.getInstance(sqlContext).carbonCatalog
-            .tableExists(TableIdentifier(tableName.toLowerCase))(sqlContext) =>
+            .tableExists(TableIdentifier(tableName.toLowerCase, None))(sqlContext) =>
         ExecutedCommand(DropTableCommand(ifNotExists, None, tableName.toLowerCase)) :: Nil
       case ShowAggregateTablesCommand(schemaName) =>
         ExecutedCommand(ShowAggregateTables(schemaName, plan.output)) :: Nil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 73cb3d5..fc74291 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
 import org.apache.spark.sql.catalyst.rules.Rule

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 5811d3a..f9af61d 100644
--- a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConversions._
 
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 class QueryTest extends PlanTest {
@@ -70,21 +69,6 @@ class QueryTest extends PlanTest {
     }
   }
 
-  /**
-   * Asserts that a given [[DataFrame]] will be executed using the given number of cached
results.
-   */
-  def assertCached(query: DataFrame, numCachedTables: Int = 1): Unit = {
-    val planWithCaching = query.queryExecution.withCachedData
-    val cachedData = planWithCaching collect {
-      case cached: InMemoryRelation => cached
-    }
-
-    assert(
-      cachedData.size == numCachedTables,
-      s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n"
+
-        planWithCaching)
-  }
-
   protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = {
     checkAnswer(df, Seq(expectedAnswer))
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/19590dba/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5df017a..d5f5e33 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
   <name>carbondata</name>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <spark.version>1.6.1</spark.version>
+    <spark.version>1.5.2</spark.version>
     <scala.binary.version>2.10</scala.binary.version>
     <snappy.version>1.1.1.7</snappy.version>
     <hadoop.version>2.2.0</hadoop.version>
@@ -114,6 +114,19 @@
       </properties>
     </profile>
     <profile>
+      <id>spark-1.5.2</id>
+      <!-- default -->
+      <properties>
+        <spark.version>1.5.2</spark.version>
+      </properties>
+    </profile>
+    <profile>
+      <id>spark-1.6.1</id>
+      <properties>
+        <spark.version>1.6.1</spark.version>
+      </properties>
+    </profile>
+    <profile>
       <id>integration-test</id>
       <modules>
         <module>integration-testcases</module>


Mime
View raw message