carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/2] incubator-carbondata git commit: Supporting Spark 2.1
Date Mon, 02 Jan 2017 03:38:48 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master f9971909b -> b286d465d


Supporting  Spark 2.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a3b8a6d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a3b8a6d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a3b8a6d4

Branch: refs/heads/master
Commit: a3b8a6d47116ce152a600605a285eb57ef584989
Parents: f997190
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Fri Dec 30 22:49:58 2016 +0530
Committer: chenliang613 <chenliang613@apache.org>
Committed: Mon Jan 2 11:37:14 2017 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 assembly/pom.xml                                |   2 +-
 .../readsupport/impl/RawDataReadSupport.java    |   4 +-
 .../readsupport/SparkRowReadSupportImpl.java    |   4 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   7 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  30 +---
 .../execution/BatchedDataSourceScanExec.scala   | 145 +++++++++++++++++++
 .../execution/CarbonLateDecodeStrategy.scala    |   4 +-
 .../sql/execution/command/DDLStrategy.scala     |   2 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  27 +---
 .../AllDataTypesTestCaseAggregate.scala         |   2 +-
 .../bucketing/TableBucketingTestCase.scala      |   5 -
 .../spark/sql/common/util/QueryTest.scala       |  13 +-
 pom.xml                                         |   4 +-
 14 files changed, 177 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 71cc4d6..ce385ad 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -14,4 +14,4 @@
 # limitations under the License.
 
 language: java
-script: mvn -U clean scalastyle:check checkstyle:check -Pspark-2.0 test
\ No newline at end of file
+script: mvn -U clean scalastyle:check checkstyle:check -Pspark-2.1 test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 61e2a58..c170e08 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -147,7 +147,7 @@
       </dependencies>
     </profile>
     <profile>
-      <id>spark-2.0</id>
+      <id>spark-2.1</id>
       <dependencies>
         <dependency>
           <groupId>org.apache.carbondata</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
index 3a68efb..c381bb7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
@@ -23,7 +23,7 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColu
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 
 public class RawDataReadSupport implements CarbonReadSupport<InternalRow> {
 
@@ -40,7 +40,7 @@ public class RawDataReadSupport implements CarbonReadSupport<InternalRow>
{
    */
   @Override
   public InternalRow readRow(Object[] data) {
-    return new GenericMutableRow(data);
+    return new GenericInternalRow(data);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 3aea985..3357d6c 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColu
 import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
 
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 
 public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<InternalRow>
{
 
@@ -47,6 +47,6 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
         }
       }
     }
-    return new GenericMutableRow(data);
+    return new GenericInternalRow(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 8deacc0..fbcfbc8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -22,6 +22,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.hive.{CarbonMetastoreTypes, CarbonRelation}
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
@@ -74,6 +75,10 @@ case class CarbonDictionaryDecoder(
   }
 
 
+  override def outputPartitioning: Partitioning = {
+    child.outputPartitioning
+  }
+
   def canBeDecoded(attr: Attribute): Boolean = {
     profile match {
       case ip: IncludeProfile if ip.attributes.nonEmpty =>
@@ -336,7 +341,7 @@ class CarbonDecoderRDD(
               getDictionaryColumnIds(index)._3)
           }
         }
-        new GenericMutableRow(data)
+        new GenericInternalRow(data)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index d5034dd..8958227 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -44,36 +44,18 @@ class CarbonSession(@transient val sc: SparkContext,
   override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
 
   /**
-   * State shared across sessions, including the [[SparkContext]], cached data, listener,
+   * State shared across sessions, including the `SparkContext`, cached data, listener,
    * and a catalog that interacts with external systems.
    */
   @transient
-  override private[sql] lazy val sharedState: SharedState = {
-    existingSharedState.getOrElse(reflect[SharedState, SparkContext](
-      "org.apache.spark.sql.hive.HiveSharedState",
-      sparkContext))
+ override private[sql] lazy val sharedState: SharedState = {
+    existingSharedState.getOrElse(new SharedState(sparkContext))
   }
 
   override def newSession(): SparkSession = {
     new CarbonSession(sparkContext, Some(sharedState))
   }
 
-  /**
-   * Helper method to create an instance of [[T]] using a single-arg constructor that
-   * accepts an [[Arg]].
-   */
-  private def reflect[T, Arg <: AnyRef](
-      className: String,
-      ctorArg: Arg)(implicit ctorArgTag: ClassTag[Arg]): T = {
-    try {
-      val clazz = Utils.classForName(className)
-      val ctor = clazz.getDeclaredConstructor(ctorArgTag.runtimeClass)
-      ctor.newInstance(ctorArg).asInstanceOf[T]
-    } catch {
-      case NonFatal(e) =>
-        throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
-    }
-  }
 }
 
 object CarbonSession {
@@ -92,7 +74,7 @@ object CarbonSession {
       var session: SparkSession = SparkSession.getActiveSession match {
         case Some(sparkSession: CarbonSession) =>
           if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
-            options.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+            options.foreach { case (k, v) => sparkSession.sessionState.conf.setConfString(k,
v) }
             sparkSession
           } else {
             null
@@ -109,7 +91,7 @@ object CarbonSession {
         session = SparkSession.getDefaultSession match {
           case Some(sparkSession: CarbonSession) =>
             if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
-              options.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+              options.foreach { case (k, v) => sparkSession.sessionState.conf.setConfString(k,
v) }
               sparkSession
             } else {
               null
@@ -139,7 +121,7 @@ object CarbonSession {
           sc
         }
         session = new CarbonSession(sparkContext)
-        options.foreach { case (k, v) => session.conf.set(k, v) }
+        options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v)
}
         SparkSession.setDefaultSession(session)
 
         // Register a successfully instantiated context to the singleton. This should be
at the

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
new file mode 100644
index 0000000..f08c111
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.Utils
+
+/** Physical plan node for scanning data from a batched relation. */
+case class BatchedDataSourceScanExec(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val outputPartitioning: Partitioning,
+    override val metadata: Map[String, String],
+    override val metastoreTableIdentifier: Option[TableIdentifier])
+  extends DataSourceScanExec with CodegenSupport {
+
+  override lazy val metrics =
+    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    // in the case of fallback, this batched scan should never fail because of:
+    // 1) only primitive types are supported
+    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
+    WholeStageCodegenExec(this).execute()
+  }
+
+  override def simpleString: String = {
+    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+      key + ": " + StringUtils.abbreviate(value, 100)
+    }
+    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    rdd :: Nil
+  }
+
+  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
+      dataType: DataType, nullable: Boolean): ExprCode = {
+    val javaType = ctx.javaType(dataType)
+    val value = ctx.getValue(columnVar, dataType, ordinal)
+    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
+    val valueVar = ctx.freshName("value")
+    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
+      s"""
+        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
+        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
+      """
+    } else {
+      s"$javaType ${valueVar} = $value;"
+    }).trim
+    ExprCode(code, isNullVar, valueVar)
+  }
+
+  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
+  // never requires UnsafeRow as input.
+  override protected def doProduce(ctx: CodegenContext): String = {
+    val input = ctx.freshName("input")
+    // PhysicalRDD always just has one input
+    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+    // metrics
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    val scanTimeMetric = metricTerm(ctx, "scanTime")
+    val scanTimeTotalNs = ctx.freshName("scanTime")
+    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+    val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+    val batch = ctx.freshName("batch")
+    ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
+
+    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
+    val idx = ctx.freshName("batchIdx")
+    ctx.addMutableState("int", idx, s"$idx = 0;")
+    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
+    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
+      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
+      s"$name = $batch.column($i);"
+    }
+
+    val nextBatch = ctx.freshName("nextBatch")
+    ctx.addNewFunction(nextBatch,
+      s"""
+         |private void $nextBatch() throws java.io.IOException {
+         |  long getBatchStart = System.nanoTime();
+         |  if ($input.hasNext()) {
+         |    $batch = ($columnarBatchClz)$input.next();
+         |    $numOutputRows.add($batch.numRows());
+         |    $idx = 0;
+         |    ${columnAssigns.mkString("", "\n", "\n")}
+         |  }
+         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+         |}""".stripMargin)
+
+    ctx.currentVars = null
+    val rowidx = ctx.freshName("rowIdx")
+    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
+    }
+    s"""
+       |if ($batch == null) {
+       |  $nextBatch();
+       |}
+       |while ($batch != null) {
+       |  int numRows = $batch.numRows();
+       |  while ($idx < numRows) {
+       |    int $rowidx = $idx++;
+       |    ${consume(ctx, columnsBatchInput).trim}
+       |    if (shouldStop()) return;
+       |  }
+       |  $batch = null;
+       |  $nextBatch();
+       |}
+       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+       |$scanTimeTotalNs = 0;
+     """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index de768c0..ace92fc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -261,14 +261,14 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         relation.relation,
         getPartitioning(table.carbonTable, updateRequestedColumns),
         metadata,
-        relation.metastoreTableIdentifier)
+        relation.catalogTable.map(_.identifier))
     } else {
       RowDataSourceScanExec(output,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
         relation.relation,
         getPartitioning(table.carbonTable, updateRequestedColumns),
         metadata,
-        relation.metastoreTableIdentifier)
+        relation.catalogTable.map(_.identifier))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 5211f1a..befa772 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -35,7 +35,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) =>
         ExecutedCommandExec(LoadTable(identifier.database, identifier.table, path, Seq(),
           Map(), isOverwrite)) :: Nil
-      case DropTableCommand(identifier, ifNotExists, isView)
+      case DropTableCommand(identifier, ifNotExists, isView, _)
         if CarbonEnv.get.carbonMetastore
           .isTablePathExists(identifier)(sparkSession) =>
         ExecutedCommandExec(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 342cabc..3dabefa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,15 +18,13 @@ package org.apache.spark.sql.parser
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.catalyst.catalog.CatalogColumn
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ColTypeListContext, CreateTableContext,
TablePropertyListContext}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
 import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
-import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -79,8 +77,8 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf)
{
       if (ctx.bucketSpec != null) {
         operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
       }
-      val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
-      val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+      val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
+      val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
       val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
         .getOrElse(Map.empty)
 
@@ -106,7 +104,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf)
{
       val schema = cols ++ partitionCols
 
       val fields = schema.map { col =>
-        val x = col.name + ' ' + col.dataType
+        val x = col.name + ' ' + col.dataType.catalogString
         val f: Field = parser.anyFieldDef(new parser.lexical.Scanner(x))
         match {
           case parser.Success(field, _) => field.asInstanceOf[Field]
@@ -117,7 +115,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf)
{
         // so checking the start of the string and taking the precision and scale.
         // resetting the data type with decimal
         if (f.dataType.getOrElse("").startsWith("decimal")) {
-          val (precision, scale) = parser.getScaleAndPrecision(col.dataType)
+          val (precision, scale) = parser.getScaleAndPrecision(col.dataType.catalogString)
           f.precision = precision
           f.scale = scale
           f.dataType = Some("decimal")
@@ -169,19 +167,4 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf)
{
     props
   }
 
-  private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = {
-    withOrigin(ctx) {
-      ctx.colType.asScala.map { col =>
-        CatalogColumn(
-          col.identifier.getText.toLowerCase,
-          // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>"
we can't
-          // just convert the whole type string to lower case, otherwise the struct field
names
-          // will no longer be case sensitive. Instead, we rely on our parser to get the
proper
-          // case before passing it to Hive.
-          typedVisit[DataType](col.dataType).catalogString,
-          nullable = true,
-          Option(col.STRING).map(string))
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index e9330c8..36f9006 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -496,7 +496,7 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll
{
   test("select percentile_approx(deviceInformationId,0.2) as a  from Carbon_automation_test")({
     checkAnswer(
       sql("select percentile_approx(deviceInformationId,0.2) as a  from Carbon_automation_test"),
-      Seq(Row(100005.8)))
+      Seq(Row(100006.0)))
   })
   //TC_127
   test("select percentile(deviceInformationId,0.2) as  a  from Carbon_automation_test1")({

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index c480d30..5b69c9c 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -36,9 +36,6 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
 
-    // clean data folder
-    clean
-
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
     spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
@@ -187,7 +184,5 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll
{
     spark.sql("DROP TABLE IF EXISTS t6")
     spark.sql("DROP TABLE IF EXISTS t7")
     spark.sql("DROP TABLE IF EXISTS t8")
-    // clean data folder
-    clean
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 57a8475..ac96070 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -41,18 +41,11 @@ class QueryTest extends PlanTest {
 
 
   val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath
-  val storeLocation = s"$rootPath/examples/spark2/target/store"
-  val warehouse = s"$rootPath/examples/spark2/target/warehouse"
-  val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+  val storeLocation = s"$rootPath/integration/spark2/target/store"
+  val warehouse = s"$rootPath/integration/spark2/target/warehouse"
+  val metastoredb = s"$rootPath/integration/spark2/target/metastore_db"
 
   val spark = {
-    // clean data folder
-    if (true) {
-      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
-      clean(storeLocation)
-      clean(warehouse)
-      clean(metastoredb)
-    }
 
     CarbonProperties.getInstance()
       .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a3b8a6d4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 457fd55..c91f186 100644
--- a/pom.xml
+++ b/pom.xml
@@ -336,9 +336,9 @@
       </modules>
     </profile>
     <profile>
-      <id>spark-2.0</id>
+      <id>spark-2.1</id>
       <properties>
-        <spark.version>2.0.2</spark.version>
+        <spark.version>2.1.0</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>


Mime
View raw message