carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/4] carbondata git commit: Revert "[CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, compatability issues"
Date Fri, 13 Jul 2018 09:37:46 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
deleted file mode 100644
index 3c3efaa..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.util.Utils
-
-/** Physical plan node for scanning data from a batched relation. */
-case class BatchedDataSourceScanExec(
-    output: Seq[Attribute],
-    rdd: RDD[InternalRow],
-    @transient relation: BaseRelation,
-    override val outputPartitioning: Partitioning,
-    override val metadata: Map[String, String],
-    override val metastoreTableIdentifier: Option[TableIdentifier],
-    @transient logicalRelation: LogicalRelation)
-  extends DataSourceScanExec with CodegenSupport {
-
-  override lazy val metrics =
-    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    // in the case of fallback, this batched scan should never fail because of:
-    // 1) only primitive types are supported
-    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
-    WholeStageCodegenExec(this).execute()
-  }
-
-  override def simpleString: String = {
-    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
-      key + ": " + StringUtils.abbreviate(value, 100)
-    }
-    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
-    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    rdd :: Nil
-  }
-
-  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
-      dataType: DataType, nullable: Boolean): ExprCode = {
-    val javaType = ctx.javaType(dataType)
-    val value = ctx.getValue(columnVar, dataType, ordinal)
-    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
-    val valueVar = ctx.freshName("value")
-    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
-    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
-      s"""
-        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
-        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
-      """
-    } else {
-      s"$javaType ${valueVar} = $value;"
-    }).trim
-    ExprCode(code, isNullVar, valueVar)
-  }
-
-  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
-  // never requires UnsafeRow as input.
-  override protected def doProduce(ctx: CodegenContext): String = {
-    val input = ctx.freshName("input")
-    // PhysicalRDD always just has one input
-    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
-
-    // metrics
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
-    val scanTimeMetric = metricTerm(ctx, "scanTime")
-    val scanTimeTotalNs = ctx.freshName("scanTime")
-    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
-
-    val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
-    val batch = ctx.freshName("batch")
-    ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
-
-    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
-    val idx = ctx.freshName("batchIdx")
-    ctx.addMutableState("int", idx, s"$idx = 0;")
-    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
-    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
-      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
-      s"$name = $batch.column($i);"
-    }
-
-    val nextBatch = ctx.freshName("nextBatch")
-    ctx.addNewFunction(nextBatch,
-      s"""
-         |private void $nextBatch() throws java.io.IOException {
-         |  long getBatchStart = System.nanoTime();
-         |  if ($input.hasNext()) {
-         |    $batch = ($columnarBatchClz)$input.next();
-         |    $numOutputRows.add($batch.numRows());
-         |    $idx = 0;
-         |    ${columnAssigns.mkString("", "\n", "\n")}
-         |  }
-         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
-         |}""".stripMargin)
-
-    ctx.currentVars = null
-    val rowidx = ctx.freshName("rowIdx")
-    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
-      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
-    }
-    s"""
-       |if ($batch == null) {
-       |  $nextBatch();
-       |}
-       |while ($batch != null) {
-       |  int numRows = $batch.numRows();
-       |  while ($idx < numRows) {
-       |    int $rowidx = $idx++;
-       |    ${consume(ctx, columnsBatchInput).trim}
-       |    if (shouldStop()) return;
-       |  }
-       |  $batch = null;
-       |  $nextBatch();
-       |}
-       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
-       |$scanTimeTotalNs = 0;
-     """.stripMargin
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
new file mode 100644
index 0000000..dfb89fd
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.CarbonReflectionUtils
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+    conf: SQLConf,
+    sparkSession: SparkSession,
+    analyzer: Analyzer) extends Analyzer(catalog, conf) {
+
+  val mvPlan = try {
+    CarbonReflectionUtils.createObject(
+      "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
+      sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
+  } catch {
+    case e: Exception =>
+      null
+  }
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    var logicalPlan = analyzer.execute(plan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
+    logicalPlan = CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+    if (mvPlan != null) {
+      mvPlan.apply(logicalPlan)
+    } else {
+      logicalPlan
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
new file mode 100644
index 0000000..e8ab84a
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class InMemorySessionCatalog(
+    externalCatalog: ExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    functionRegistry: FunctionRegistry,
+    sparkSession: SparkSession,
+    conf: SQLConf,
+    hadoopConf: Configuration,
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
+  extends SessionCatalog(
+    externalCatalog,
+    globalTempViewManager,
+    functionRegistry,
+    conf,
+    hadoopConf,
+    parser,
+    functionResourceLoader
+  ) with CarbonSessionCatalog {
+
+  override def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit = {
+    sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier)
+  }
+
+  override def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    // NOt Required in case of In-memory catalog
+  }
+
+  override def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val structType = catalogTable.schema
+    var newStructType = structType
+    newColumns.get.foreach {cols =>
+      newStructType = structType
+        .add(cols.getColumnName, CarbonScalaUtil.convertCarbonToSparkDataType(cols.getDataType))
+    }
+    alterSchema(newStructType, catalogTable, tableIdentifier)
+  }
+
+  override def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val fields = catalogTable.schema.fields.filterNot { field =>
+      dropCols.get.exists { col =>
+        col.getColumnName.equalsIgnoreCase(field.name)
+      }
+    }
+    alterSchema(new StructType(fields), catalogTable, tableIdentifier)
+  }
+
+  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val a = catalogTable.schema.fields.flatMap { field =>
+      columns.get.map { col =>
+        if (col.getColumnName.equalsIgnoreCase(field.name)) {
+          StructField(col.getColumnName,
+            CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType))
+        } else {
+          field
+        }
+      }
+    }
+    alterSchema(new StructType(a), catalogTable, tableIdentifier)
+  }
+
+  private def alterSchema(structType: StructType,
+      catalogTable: CatalogTable,
+      tableIdentifier: TableIdentifier): Unit = {
+    val copy = catalogTable.copy(schema = structType)
+    sparkSession.sessionState.catalog.alterTable(copy)
+    sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+  }
+
+  lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+
+  def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
+  def getThriftTableInfo(tablePath: String): TableInfo = {
+    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+    CarbonUtil.readSchemaFile(tableMetadataFile)
+  }
+
+  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name)
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+    if (isRelationRefreshed) {
+      super.lookupRelation(name)
+    } else {
+      rtnRelation
+    }
+  }
+
+  /**
+   * returns hive client from HiveExternalCatalog
+   *
+   * @return
+   */
+  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    null
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
+  }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  override def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
+}
+
+class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends SessionStateBuilder(sparkSession, parentState) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy,
+      new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule,
+    new CarbonLateDecodeRule)
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  override protected lazy val catalog: InMemorySessionCatalog = {
+    val catalog = new InMemorySessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      sparkSession,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  private def externalCatalog: ExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog]
+
+  override protected lazy val resourceLoader: SessionResourceLoader = {
+    new SessionResourceLoader(session)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+    new Analyzer(catalog, conf) {
+      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+        new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        new CarbonIUDAnalysisRule(sparkSession) +:
+        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+        PreWriteCheck :: HiveOnlyCheck :: Nil
+      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+        PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        customPostHocResolutionRules
+    }
+  )
+  override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _)
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
index 2f9ad3e..a046763 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.ExperimentalMethods
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.SQLConf
 
 
@@ -35,3 +37,41 @@ class CarbonOptimizer(
     super.execute(transFormedPlan)
   }
 }
+
+object CarbonOptimizerUtil {
+  def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
+    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+    // optimize whole plan at once.
+    val transFormedPlan = plan.transform {
+      case filter: Filter =>
+        filter.transformExpressions {
+          case s: ScalarSubquery =>
+            val tPlan = s.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            ScalarSubquery(tPlan, s.children, s.exprId)
+          case e: Exists =>
+            val tPlan = e.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
+
+          case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
+            val tPlan = sub.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            In(value, Seq(ListQuery(tPlan, l.children, exprId)))
+        }
+    }
+    transFormedPlan
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..a7a255e
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
+import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{SQLConf, SessionState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonHiveSessionCatalog(
+    externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    functionRegistry: FunctionRegistry,
+    sparkSession: SparkSession,
+    conf: SQLConf,
+    hadoopConf: Configuration,
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
+  extends HiveSessionCatalog (
+    externalCatalog,
+    globalTempViewManager,
+    new HiveMetastoreCatalog(sparkSession),
+    functionRegistry,
+    conf,
+    hadoopConf,
+    parser,
+    functionResourceLoader
+  ) with CarbonSessionCatalog {
+
+  private lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+  /**
+   * return's the carbonEnv instance
+   * @return
+   */
+  override def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
+  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name)
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+    if (isRelationRefreshed) {
+      super.lookupRelation(name)
+    } else {
+      rtnRelation
+    }
+  }
+
+  /**
+   * returns hive client from HiveExternalCatalog
+   *
+   * @return
+   */
+  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+      .asInstanceOf[HiveExternalCatalog].client
+  }
+
+  def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit = {
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
+      s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
+      s"SET SERDEPROPERTIES" +
+      s"('tableName'='${ newTableIdentifier.table }', " +
+      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
+  }
+
+  override def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    getClient()
+      .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
+                  s"SET TBLPROPERTIES(${ schemaParts })")
+  }
+
+  override def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
+  }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  override def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ *
+ * @param sparkSession
+ */
+class CarbonSessionStateBuilder(sparkSession: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends HiveSessionStateBuilder(sparkSession, parentState) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(new StreamingTableStrategy(sparkSession),
+        new CarbonLateDecodeStrategy,
+        new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule,
+    new CarbonLateDecodeRule)
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  /**
+   * Create a [[CarbonSessionStateBuilder]].
+   */
+  override protected lazy val catalog: CarbonHiveSessionCatalog = {
+    val catalog = new CarbonHiveSessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      sparkSession,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  private def externalCatalog: HiveExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+
+  /**
+   * Create a Hive aware resource loader.
+   */
+  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
+    val client: HiveClient = externalCatalog.client.newSession()
+    new HiveSessionResourceLoader(session, client)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+    new Analyzer(catalog, conf) {
+
+      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+        new ResolveHiveSerdeTable(session) +:
+        new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        new CarbonIUDAnalysisRule(sparkSession) +:
+        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+
+      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+      PreWriteCheck :: HiveOnlyCheck :: Nil
+
+      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+        new DetermineTableStats(session) +:
+        RelationConversions(conf, catalog) +:
+        PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        HiveAnalysis +:
+        customPostHocResolutionRules
+    }
+  )
+
+  override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
new file mode 100644
index 0000000..ac702ea
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * This class refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table.
+ */
+object CarbonSessionUtil {
+
+  val LOGGER = LogServiceFactory.getLogService("CarbonSessionUtil")
+
+  /**
+   * The method refreshes the cache entry
+   *
+   * @param rtnRelation [[LogicalPlan]] represents the given table or view.
+   * @param name        tableName
+   * @param sparkSession
+   * @return
+   */
+  def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    var isRelationRefreshed = false
+    rtnRelation match {
+      case SubqueryAlias(_,
+      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)
+      ) =>
+        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case SubqueryAlias(_, relation) if
+      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+      relation.getClass.getName
+        .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+      relation.getClass.getName.equals(
+        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation"
+      ) =>
+        val catalogTable =
+          CarbonReflectionUtils.getFieldOfCatalogTable(
+            "tableMeta",
+            relation
+          ).asInstanceOf[CatalogTable]
+        isRelationRefreshed =
+          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+      case _ =>
+    }
+    isRelationRefreshed
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   *
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def prunePartitionsByFilter(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
+    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+    ExternalCatalogUtils.prunePartitionsByFilter(
+      sparkSession.sessionState.catalog.getTableMetadata(identifier),
+      allPartitions,
+      partitionFilters,
+      sparkSession.sessionState.conf.sessionLocalTimeZone
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index d9cf3f7..a533db0 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -18,15 +18,24 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.parser.ParserUtils.string
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser}
+import org.apache.spark.sql.types.DecimalType
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 
 class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
+  extends SparkSqlAstBuilder(conf) {
 
   val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
@@ -46,7 +55,75 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
     }
   }
 
+  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
+
+    val newColumn = visitColType(ctx.colType)
+    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
+      throw new MalformedCarbonCommandException(
+        "Column names provided are different. Both the column names should be same")
+    }
+
+    val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
+      case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
+      case _ => (newColumn.dataType.typeName.toLowerCase, None)
+    }
+
+    val alterTableChangeDataTypeModel =
+      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
+        new CarbonSpark2SqlParser()
+          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
+        ctx.tableIdentifier().table.getText.toLowerCase,
+        ctx.identifier.getText.toLowerCase,
+        newColumn.name.toLowerCase)
+
+    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+  }
+
+
   override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    visitAddTableColumns(parser,ctx)
+    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+    val fields = parser.getFields(cols)
+    val tblProperties = scala.collection.mutable.Map.empty[String, String]
+    val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
+      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
+        .map(_.getText)),
+      ctx.tableIdentifier.table.getText.toLowerCase,
+      fields,
+      Seq.empty,
+      tblProperties,
+      None,
+      true)
+
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      Option(ctx.tableIdentifier().db).map(_.getText),
+      ctx.tableIdentifier.table.getText,
+      tblProperties.toMap,
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highcardinalitydims.getOrElse(Seq.empty))
+
+    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
+  }
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    super.visitCreateTable(ctx)
+  }
+
+  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
+    withOrigin(ctx) {
+      if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
+          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
+        super.visitShowTables(ctx)
+      } else {
+        CarbonShowTablesCommand(
+          Option(ctx.db).map(_.getText),
+          Option(ctx.pattern).map(string))
+      }
+    }
+  }
+
+  override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
+    CarbonExplainCommand(super.visitExplain(ctx))
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
new file mode 100644
index 0000000..2128ffd
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.buildConf
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+  val carbonProperties = CarbonProperties.getInstance()
+
+  /**
+   * To initialize dynamic param defaults along with usage docs
+   */
+  def addDefaultCarbonParams(): Unit = {
+    val ENABLE_UNSAFE_SORT =
+      buildConf(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+        .doc("To enable/ disable unsafe sort.")
+        .booleanConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+      buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+        .doc("To set carbon task distribution.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    val BAD_RECORDS_LOGGER_ENABLE =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+        .doc("To enable/ disable carbon bad record logger.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    val BAD_RECORDS_ACTION =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+        .doc("To configure the bad records action.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    val IS_EMPTY_DATA_BAD_RECORD =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+        .doc("Property to decide weather empty data to be considered bad/ good record.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+          .toBoolean)
+    val SORT_SCOPE =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+        .doc("Property to specify sort scope.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    val BATCH_SORT_SIZE_INMB =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+        .doc("Property to specify batch sort size in MB.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    val SINGLE_PASS =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+        .doc("Property to enable/disable single_pass.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    val BAD_RECORD_PATH =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+        .doc("Property to configure the bad record location.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    val GLOBAL_SORT_PARTITIONS =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+        .doc("Property to configure the global sort partitions.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    val DATEFORMAT =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+        .doc("Property to configure data format for date type columns.")
+        .stringConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+    val CARBON_INPUT_SEGMENTS = buildConf(
+      "carbon.input.segments.<database_name>.<table_name>")
+      .doc("Property to configure the list of segments to query.").stringConf
+      .createWithDefault(carbonProperties
+        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
+  }
+  /**
+   * to set the dynamic properties default values
+   */
+  def addDefaultCarbonSessionParams(): Unit = {
+    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+      carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
new file mode 100644
index 0000000..4e22d13
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -0,0 +1,122 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.net.URI
+
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
+import org.apache.spark.sql.sources.BaseRelation
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ *
+ * @param table the Catalog Table
+ * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ *
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+    table: CatalogTable,
+    mode: SaveMode,
+    query: LogicalPlan)
+  extends RunnableCommand {
+
+  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+
+    val sessionState = sparkSession.sessionState
+    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+    val tableIdentWithDB = table.identifier.copy(database = Some(db))
+    val tableName = tableIdentWithDB.unquotedString
+
+    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+      assert(mode != SaveMode.Overwrite,
+        s"Expect the table $tableName has been dropped when the save mode is Overwrite")
+
+      if (mode == SaveMode.ErrorIfExists) {
+        throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
+      }
+      if (mode == SaveMode.Ignore) {
+        // Since the table already exists and the save mode is Ignore, we will just return.
+        return Seq.empty
+      }
+
+      saveDataIntoTable(
+        sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
+    } else {
+      assert(table.schema.isEmpty)
+
+      val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+        Some(sessionState.catalog.defaultTablePath(table.identifier))
+      } else {
+        table.storage.locationUri
+      }
+      val result = saveDataIntoTable(
+        sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
+
+      result match {
+        case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+                                     sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+          // Need to recover partitions into the metastore so our saved data is visible.
+          sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+        case _ =>
+      }
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def saveDataIntoTable(
+      session: SparkSession,
+      table: CatalogTable,
+      tableLocation: Option[URI],
+      data: LogicalPlan,
+      mode: SaveMode,
+      tableExists: Boolean): BaseRelation = {
+    // Create the relation based on the input logical plan: `data`.
+    val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
+    val dataSource = DataSource(
+      session,
+      className = table.provider.get,
+      partitionColumns = table.partitionColumnNames,
+      bucketSpec = table.bucketSpec,
+      options = table.storage.properties ++ pathOption,
+      catalogTable = if (tableExists) {
+        Some(table)
+      } else {
+        None
+      })
+
+    try {
+      dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
+    } catch {
+      case ex: AnalysisException =>
+        logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
+        throw ex
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
deleted file mode 100644
index ca79720..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
-import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapater {
-
-  def addSparkListener(sparkContext: SparkContext) = {
-    sparkContext.addSparkListener(new SparkListener {
-      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-        SparkSession.setDefaultSession(null)
-      }
-    })
-  }
-
-  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-                               metadata: Metadata, exprId: ExprId, qualifier: Option[String],
-                               attrRef : NamedExpression): AttributeReference = {
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qualifier)
-  }
-
-  def createAliasRef(child: Expression,
-                     name: String,
-                     exprId: ExprId = NamedExpression.newExprId,
-                     qualifier: Option[String] = None,
-                     explicitMetadata: Option[Metadata] = None,
-                     namedExpr : Option[NamedExpression] = None ) : Alias = {
-
-      Alias(child, name)(exprId, qualifier, explicitMetadata)
-  }
-
-  def getExplainCommandObj() : ExplainCommand = {
-    ExplainCommand(OneRowRelation())
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CustomDeterministicExpression.scala
deleted file mode 100644
index 2457dd5..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.types.{DataType, StringType}
-
-/**
- * Custom expression to override the deterministic property .
- */
-case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable {
-  override def nullable: Boolean = true
-
-  override def eval(input: InternalRow): Any = null
-
-  override def dataType: DataType = StringType
-
-  override def children: Seq[Expression] = Seq()
-
-  override lazy val deterministic: Boolean = true
-
-  def childexp : Expression = nonDt
-
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
deleted file mode 100644
index 79fde66..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.util.Utils
-
-/** Physical plan node for scanning data from a batched relation. */
-case class BatchedDataSourceScanExec(
-    output: Seq[Attribute],
-    rdd: RDD[InternalRow],
-    @transient relation: BaseRelation,
-    override val outputPartitioning: Partitioning,
-    override val metadata: Map[String, String],
-    override val tableIdentifier: Option[TableIdentifier],
-    @transient logicalRelation: LogicalRelation)
-  extends DataSourceScanExec with CodegenSupport {
-
-  override lazy val metrics =
-    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    // in the case of fallback, this batched scan should never fail because of:
-    // 1) only primitive types are supported
-    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
-    WholeStageCodegenExec(this)(codegenStageId = 0).execute()
-  }
-
-  override def simpleString: String = {
-    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
-      key + ": " + StringUtils.abbreviate(value, 100)
-    }
-    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
-    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    rdd :: Nil
-  }
-
-  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
-      dataType: DataType, nullable: Boolean): ExprCode = {
-    val javaType = ctx.javaType(dataType)
-    val value = ctx.getValue(columnVar, dataType, ordinal)
-    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
-    val valueVar = ctx.freshName("value")
-    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
-    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
-      s"""
-        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
-        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
-      """
-    } else {
-      s"$javaType ${valueVar} = $value;"
-    }).trim
-    ExprCode(code, isNullVar, valueVar)
-  }
-
-  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
-  // never requires UnsafeRow as input.
-  override protected def doProduce(ctx: CodegenContext): String = {
-    // PhysicalRDD always just has one input
-    val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];")
-
-    // metrics
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
-    val scanTimeMetric = metricTerm(ctx, "scanTime")
-    val scanTimeTotalNs = ctx.addMutableState("long", "scanTime", v =>  s"$v = 0;")
-
-    val columnarBatchClz = "org.apache.spark.sql.vectorized.ColumnarBatch"
-    val batch = ctx.addMutableState(columnarBatchClz, "batch", v => s"$v = null;")
-
-    val columnVectorClz = "org.apache.spark.sql.vectorized.ColumnVector"
-    val idx = ctx.addMutableState("int", "batchIdx", v => s"$v = 0;")
-    val (colVars,columnAssigns) = output.indices.map {  i =>
-      val name = ctx.addMutableState(columnVectorClz, s"colInstance$i")
-        (name, s"$name = ($columnVectorClz) $batch.column($i);")
-    }.unzip
-
-    val nextBatch = ctx.freshName("nextBatch")
-    val nextBatchFuncName = ctx.addNewFunction(nextBatch,
-      s"""
-         |private void $nextBatch() throws java.io.IOException {
-         |  long getBatchStart = System.nanoTime();
-         |  if ($input.hasNext()) {
-         |    $batch = ($columnarBatchClz)$input.next();
-         |    $numOutputRows.add($batch.numRows());
-         |    $idx = 0;
-         |    ${columnAssigns.mkString("", "\n", "\n")}
-         |  }
-         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
-         |}""".stripMargin)
-
-    ctx.currentVars = null
-    val rowidx = ctx.freshName("rowIdx")
-    val columnsBatchInput = (output zip colVars).map { case (attr, colvar) =>
-      genCodeColumnVector(ctx, colvar, rowidx, attr.dataType, attr.nullable)
-    }
-    s"""
-       |if ($batch == null) {
-       |  $nextBatchFuncName();
-       |}
-       |while ($batch != null) {
-       |  int numRows = $batch.numRows();
-       |  while ($idx < numRows) {
-       |    int $rowidx = $idx++;
-       |    ${consume(ctx, columnsBatchInput).trim}
-       |    if (shouldStop()) return;
-       |  }
-       |  $batch = null;
-       |  $nextBatchFuncName();
-       |}
-       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
-       |$scanTimeTotalNs = 0;
-     """.stripMargin
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
deleted file mode 100644
index f1b632b..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.ExperimentalMethods
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.internal.SQLConf
-
-
-class CarbonOptimizer(
-    catalog: SessionCatalog,
-    conf: SQLConf,
-    experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, experimentalMethods) {
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
-    super.execute(transFormedPlan)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
deleted file mode 100644
index 2bba2ae..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.parser.ParserUtils.string
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
-
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
-  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
-    val fileStorage = helper.getFileStorage(ctx.createFileFormat(0))
-
-    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("carbondata") ||
-        fileStorage.equalsIgnoreCase("'carbonfile'") ||
-        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
-        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),ctx.locationSpec(0),
-        Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
-      helper.createCarbonTable(createTableTuple)
-    } else {
-      super.visitCreateHiveTable(ctx)
-    }
-  }
-
-  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    visitAddTableColumns(parser,ctx)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java b/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
deleted file mode 100644
index f357e41..0000000
--- a/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.stream;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.statusmanager.FileFormat;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.streaming.CarbonStreamInputFormat;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class CarbonStreamRecordReaderTest extends TestCase {
-
-  private TaskAttemptID taskAttemptId;
-  private TaskAttemptContext taskAttemptContext;
-  private Configuration hadoopConf;
-  private AbsoluteTableIdentifier identifier;
-  private String tablePath;
-
-
-  @Override protected void setUp() throws Exception {
-    tablePath = new File("target/stream_input").getCanonicalPath();
-    String dbName = "default";
-    String tableName = "stream_table_input";
-    identifier = AbsoluteTableIdentifier.from(
-        tablePath,
-        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
-
-    JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
-    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
-    taskAttemptId = new TaskAttemptID(taskId, 0);
-
-    hadoopConf = new Configuration();
-    taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
-  }
-
-  private InputSplit buildInputSplit() throws IOException {
-    CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
-    List<CarbonInputSplit> splitList = new ArrayList<>();
-    splitList.add(carbonInputSplit);
-    return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" },
-        FileFormat.ROW_V1);
-  }
-
-  @Test public void testCreateRecordReader() {
-    try {
-      InputSplit inputSplit = buildInputSplit();
-      CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
-      RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
-      Assert.assertNotNull("Failed to create record reader", recordReader);
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.assertTrue(e.getMessage(), false);
-    }
-  }
-
-  @Override protected void tearDown() throws Exception {
-    super.tearDown();
-    if (tablePath != null) {
-      FileFactory.deleteAllFilesOfDir(new File(tablePath));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
index 466475b..cdcf018 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
@@ -80,7 +80,7 @@ class BooleanDataTypesFilterTest extends QueryTest with BeforeAndAfterEach with
     checkAnswer(sql("select count(*) from carbon_table where booleanField = true"),
       Row(4))
 
-    if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
+    if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
       checkAnswer(sql("select count(*) from carbon_table where booleanField = 'true'"),
         Row(0))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
index 4050fd8..c220f1c 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
@@ -283,7 +283,7 @@ class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with Be
     checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
       Row(10))
 
-    if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
+    if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
       checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
         Row(0))
 
@@ -373,7 +373,7 @@ class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with Be
     checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
       Row(10))
 
-    if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
+    if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
       checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
         Row(0))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index d94d56c..65a006b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.carbondata.bucketing
 
 import org.apache.spark.sql.common.util.Spark2QueryTest
-import org.apache.spark.sql.execution.exchange.Exchange
+import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -127,11 +127,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: Exchange if (s.getClass.getName.equals
-      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
-        s.getClass.getName.equals
-        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
-      => shuffleExists = true
+      case s: ShuffleExchange => shuffleExists = true
     }
     assert(shuffleExists, "shuffle should exist on non bucket tables")
   }
@@ -149,11 +145,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: Exchange if (s.getClass.getName.equals
-      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
-        s.getClass.getName.equals
-        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
-      => shuffleExists = true
+      case s: ShuffleExchange => shuffleExists = true
     }
     assert(!shuffleExists, "shuffle should not exist on bucket tables")
   }
@@ -178,11 +170,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: Exchange if (s.getClass.getName.equals
-      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
-        s.getClass.getName.equals
-        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
-      => shuffleExists = true
+      case s: ShuffleExchange => shuffleExists = true
     }
     assert(!shuffleExists, "shuffle should not exist on bucket tables")
     sql("DROP TABLE bucketed_parquet_table")
@@ -207,11 +195,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: Exchange if (s.getClass.getName.equals
-      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
-        s.getClass.getName.equals
-        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
-      => shuffleExists = true
+      case s: ShuffleExchange => shuffleExists = true
     }
     assert(shuffleExists, "shuffle should exist on non bucket tables")
     sql("DROP TABLE parquet_table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
index 089dfd2..66cf675 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.carbondata.query
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.Spark2QueryTest
+import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.scalatest.BeforeAndAfterAll
 


Mime
View raw message