carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [4/6] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, compatability issues
Date Tue, 10 Jul 2018 15:01:13 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
new file mode 100644
index 0000000..3c3efaa
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.Utils
+
+/** Physical plan node for scanning data from a batched relation. */
+case class BatchedDataSourceScanExec(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val outputPartitioning: Partitioning,
+    override val metadata: Map[String, String],
+    override val metastoreTableIdentifier: Option[TableIdentifier],
+    @transient logicalRelation: LogicalRelation)
+  extends DataSourceScanExec with CodegenSupport {
+
+  override lazy val metrics =
+    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    // in the case of fallback, this batched scan should never fail because of:
+    // 1) only primitive types are supported
+    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
+    WholeStageCodegenExec(this).execute()
+  }
+
+  override def simpleString: String = {
+    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+      key + ": " + StringUtils.abbreviate(value, 100)
+    }
+    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    rdd :: Nil
+  }
+
+  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
+      dataType: DataType, nullable: Boolean): ExprCode = {
+    val javaType = ctx.javaType(dataType)
+    val value = ctx.getValue(columnVar, dataType, ordinal)
+    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
+    val valueVar = ctx.freshName("value")
+    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
+      s"""
+        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
+        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
+      """
+    } else {
+      s"$javaType ${valueVar} = $value;"
+    }).trim
+    ExprCode(code, isNullVar, valueVar)
+  }
+
+  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
+  // never requires UnsafeRow as input.
+  override protected def doProduce(ctx: CodegenContext): String = {
+    val input = ctx.freshName("input")
+    // PhysicalRDD always just has one input
+    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+    // metrics
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    val scanTimeMetric = metricTerm(ctx, "scanTime")
+    val scanTimeTotalNs = ctx.freshName("scanTime")
+    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+    val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+    val batch = ctx.freshName("batch")
+    ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
+
+    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
+    val idx = ctx.freshName("batchIdx")
+    ctx.addMutableState("int", idx, s"$idx = 0;")
+    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
+    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
+      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
+      s"$name = $batch.column($i);"
+    }
+
+    val nextBatch = ctx.freshName("nextBatch")
+    ctx.addNewFunction(nextBatch,
+      s"""
+         |private void $nextBatch() throws java.io.IOException {
+         |  long getBatchStart = System.nanoTime();
+         |  if ($input.hasNext()) {
+         |    $batch = ($columnarBatchClz)$input.next();
+         |    $numOutputRows.add($batch.numRows());
+         |    $idx = 0;
+         |    ${columnAssigns.mkString("", "\n", "\n")}
+         |  }
+         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+         |}""".stripMargin)
+
+    ctx.currentVars = null
+    val rowidx = ctx.freshName("rowIdx")
+    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
+    }
+    s"""
+       |if ($batch == null) {
+       |  $nextBatch();
+       |}
+       |while ($batch != null) {
+       |  int numRows = $batch.numRows();
+       |  while ($idx < numRows) {
+       |    int $rowidx = $idx++;
+       |    ${consume(ctx, columnsBatchInput).trim}
+       |    if (shouldStop()) return;
+       |  }
+       |  $batch = null;
+       |  $nextBatch();
+       |}
+       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+       |$scanTimeTotalNs = 0;
+     """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
deleted file mode 100644
index dfb89fd..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.CarbonReflectionUtils
-
-class CarbonAnalyzer(catalog: SessionCatalog,
-    conf: SQLConf,
-    sparkSession: SparkSession,
-    analyzer: Analyzer) extends Analyzer(catalog, conf) {
-
-  val mvPlan = try {
-    CarbonReflectionUtils.createObject(
-      "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
-      sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
-  } catch {
-    case e: Exception =>
-      null
-  }
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    var logicalPlan = analyzer.execute(plan)
-    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
-    logicalPlan = CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
-    if (mvPlan != null) {
-      mvPlan.apply(logicalPlan)
-    } else {
-      logicalPlan
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
deleted file mode 100644
index e8ab84a..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.CarbonSparkSqlParser
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class InMemorySessionCatalog(
-    externalCatalog: ExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    functionRegistry: FunctionRegistry,
-    sparkSession: SparkSession,
-    conf: SQLConf,
-    hadoopConf: Configuration,
-    parser: ParserInterface,
-    functionResourceLoader: FunctionResourceLoader)
-  extends SessionCatalog(
-    externalCatalog,
-    globalTempViewManager,
-    functionRegistry,
-    conf,
-    hadoopConf,
-    parser,
-    functionResourceLoader
-  ) with CarbonSessionCatalog {
-
-  override def alterTableRename(oldTableIdentifier: TableIdentifier,
-      newTableIdentifier: TableIdentifier,
-      newTablePath: String): Unit = {
-    sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier)
-  }
-
-  override def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    // NOt Required in case of In-memory catalog
-  }
-
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
-    val structType = catalogTable.schema
-    var newStructType = structType
-    newColumns.get.foreach {cols =>
-      newStructType = structType
-        .add(cols.getColumnName, CarbonScalaUtil.convertCarbonToSparkDataType(cols.getDataType))
-    }
-    alterSchema(newStructType, catalogTable, tableIdentifier)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
-    val fields = catalogTable.schema.fields.filterNot { field =>
-      dropCols.get.exists { col =>
-        col.getColumnName.equalsIgnoreCase(field.name)
-      }
-    }
-    alterSchema(new StructType(fields), catalogTable, tableIdentifier)
-  }
-
-  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
-    val a = catalogTable.schema.fields.flatMap { field =>
-      columns.get.map { col =>
-        if (col.getColumnName.equalsIgnoreCase(field.name)) {
-          StructField(col.getColumnName,
-            CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType))
-        } else {
-          field
-        }
-      }
-    }
-    alterSchema(new StructType(a), catalogTable, tableIdentifier)
-  }
-
-  private def alterSchema(structType: StructType,
-      catalogTable: CatalogTable,
-      tableIdentifier: TableIdentifier): Unit = {
-    val copy = catalogTable.copy(schema = structType)
-    sparkSession.sessionState.catalog.alterTable(copy)
-    sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-  }
-
-  lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-
-  def getCarbonEnv() : CarbonEnv = {
-    carbonEnv
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.initListeners()
-
-  def getThriftTableInfo(tablePath: String): TableInfo = {
-    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
-    CarbonUtil.readSchemaFile(tableMetadataFile)
-  }
-
-  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
-    val rtnRelation = super.lookupRelation(name)
-    val isRelationRefreshed =
-      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
-    if (isRelationRefreshed) {
-      super.lookupRelation(name)
-    } else {
-      rtnRelation
-    }
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    null
-  }
-
-  override def createPartitions(
-      tableName: TableIdentifier,
-      parts: Seq[CatalogTablePartition],
-      ignoreIfExists: Boolean): Unit = {
-    try {
-      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
-      super.createPartitions(tableName, updatedParts, ignoreIfExists)
-    } catch {
-      case e: Exception =>
-        super.createPartitions(tableName, parts, ignoreIfExists)
-    }
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
-    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  override def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toUri))
-  }
-}
-
-class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession,
-    parentState: Option[SessionState] = None)
-  extends SessionStateBuilder(sparkSession, parentState) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies =
-    Seq(new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
-    new CarbonUDFTransformRule,
-    new CarbonLateDecodeRule)
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  override protected lazy val catalog: InMemorySessionCatalog = {
-    val catalog = new InMemorySessionCatalog(
-      externalCatalog,
-      session.sharedState.globalTempViewManager,
-      functionRegistry,
-      sparkSession,
-      conf,
-      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
-      sqlParser,
-      resourceLoader)
-    parentState.foreach(_.catalog.copyStateTo(catalog))
-    catalog
-  }
-
-  private def externalCatalog: ExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog]
-
-  override protected lazy val resourceLoader: SessionResourceLoader = {
-    new SessionResourceLoader(session)
-  }
-
-  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
-  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
-    new Analyzer(catalog, conf) {
-      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
-        new FindDataSourceTable(session) +:
-        new ResolveSQLOnFile(session) +:
-        new CarbonIUDAnalysisRule(sparkSession) +:
-        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
-      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
-        PreWriteCheck :: HiveOnlyCheck :: Nil
-      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
-        PreprocessTableCreation(session) +:
-        PreprocessTableInsertion(conf) +:
-        DataSourceAnalysis(conf) +:
-        customPostHocResolutionRules
-    }
-  )
-  override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _)
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
index a046763..2f9ad3e 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -17,12 +17,10 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods}
+import org.apache.spark.sql.ExperimentalMethods
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.SQLConf
 
 
@@ -37,41 +35,3 @@ class CarbonOptimizer(
     super.execute(transFormedPlan)
   }
 }
-
-object CarbonOptimizerUtil {
-  def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
-    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
-    // optimize whole plan at once.
-    val transFormedPlan = plan.transform {
-      case filter: Filter =>
-        filter.transformExpressions {
-          case s: ScalarSubquery =>
-            val tPlan = s.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            ScalarSubquery(tPlan, s.children, s.exprId)
-          case e: Exists =>
-            val tPlan = e.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
-
-          case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
-            val tPlan = sub.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            In(value, Seq(ListQuery(tPlan, l.children, exprId)))
-        }
-    }
-    transFormedPlan
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
deleted file mode 100644
index a7a255e..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
-import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser}
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SQLConf, SessionState}
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.CarbonSparkSqlParser
-
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonHiveSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    functionRegistry: FunctionRegistry,
-    sparkSession: SparkSession,
-    conf: SQLConf,
-    hadoopConf: Configuration,
-    parser: ParserInterface,
-    functionResourceLoader: FunctionResourceLoader)
-  extends HiveSessionCatalog (
-    externalCatalog,
-    globalTempViewManager,
-    new HiveMetastoreCatalog(sparkSession),
-    functionRegistry,
-    conf,
-    hadoopConf,
-    parser,
-    functionResourceLoader
-  ) with CarbonSessionCatalog {
-
-  private lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-  /**
-   * return's the carbonEnv instance
-   * @return
-   */
-  override def getCarbonEnv() : CarbonEnv = {
-    carbonEnv
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.initListeners()
-
-  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
-    val rtnRelation = super.lookupRelation(name)
-    val isRelationRefreshed =
-      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
-    if (isRelationRefreshed) {
-      super.lookupRelation(name)
-    } else {
-      rtnRelation
-    }
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
-      .asInstanceOf[HiveExternalCatalog].client
-  }
-
-  def alterTableRename(oldTableIdentifier: TableIdentifier,
-      newTableIdentifier: TableIdentifier,
-      newTablePath: String): Unit = {
-    getClient().runSqlHive(
-      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
-      s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
-    getClient().runSqlHive(
-      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
-      s"SET SERDEPROPERTIES" +
-      s"('tableName'='${ newTableIdentifier.table }', " +
-      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
-  }
-
-  override def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    getClient()
-      .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
-                  s"SET TBLPROPERTIES(${ schemaParts })")
-  }
-
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  override def createPartitions(
-      tableName: TableIdentifier,
-      parts: Seq[CatalogTablePartition],
-      ignoreIfExists: Boolean): Unit = {
-    try {
-      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
-      super.createPartitions(tableName, updatedParts, ignoreIfExists)
-    } catch {
-      case e: Exception =>
-        super.createPartitions(tableName, parts, ignoreIfExists)
-    }
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
-    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  override def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toUri))
-  }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- *
- * @param sparkSession
- */
-class CarbonSessionStateBuilder(sparkSession: SparkSession,
-    parentState: Option[SessionState] = None)
-  extends HiveSessionStateBuilder(sparkSession, parentState) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies =
-    Seq(new StreamingTableStrategy(sparkSession),
-        new CarbonLateDecodeStrategy,
-        new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
-    new CarbonUDFTransformRule,
-    new CarbonLateDecodeRule)
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  /**
-   * Create a [[CarbonSessionStateBuilder]].
-   */
-  override protected lazy val catalog: CarbonHiveSessionCatalog = {
-    val catalog = new CarbonHiveSessionCatalog(
-      externalCatalog,
-      session.sharedState.globalTempViewManager,
-      functionRegistry,
-      sparkSession,
-      conf,
-      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
-      sqlParser,
-      resourceLoader)
-    parentState.foreach(_.catalog.copyStateTo(catalog))
-    catalog
-  }
-
-  private def externalCatalog: HiveExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
-
-  /**
-   * Create a Hive aware resource loader.
-   */
-  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
-    val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, client)
-  }
-
-  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
-  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
-    new Analyzer(catalog, conf) {
-
-      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
-        new ResolveHiveSerdeTable(session) +:
-        new FindDataSourceTable(session) +:
-        new ResolveSQLOnFile(session) +:
-        new CarbonIUDAnalysisRule(sparkSession) +:
-        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
-
-      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
-      PreWriteCheck :: HiveOnlyCheck :: Nil
-
-      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
-        new DetermineTableStats(session) +:
-        RelationConversions(conf, catalog) +:
-        PreprocessTableCreation(session) +:
-        PreprocessTableInsertion(conf) +:
-        DataSourceAnalysis(conf) +:
-        HiveAnalysis +:
-        customPostHocResolutionRules
-    }
-  )
-
-  override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
deleted file mode 100644
index ac702ea..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
-import org.apache.spark.util.CarbonReflectionUtils
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-/**
- * This class refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table.
- */
-object CarbonSessionUtil {
-
-  val LOGGER = LogServiceFactory.getLogService("CarbonSessionUtil")
-
-  /**
-   * The method refreshes the cache entry
-   *
-   * @param rtnRelation [[LogicalPlan]] represents the given table or view.
-   * @param name        tableName
-   * @param sparkSession
-   * @return
-   */
-  def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier)
-    (sparkSession: SparkSession): Boolean = {
-    var isRelationRefreshed = false
-    rtnRelation match {
-      case SubqueryAlias(_,
-      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)
-      ) =>
-        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
-        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case SubqueryAlias(_, relation) if
-      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
-      relation.getClass.getName
-        .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
-      relation.getClass.getName.equals(
-        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation"
-      ) =>
-        val catalogTable =
-          CarbonReflectionUtils.getFieldOfCatalogTable(
-            "tableMeta",
-            relation
-          ).asInstanceOf[CatalogTable]
-        isRelationRefreshed =
-          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
-      case _ =>
-    }
-    isRelationRefreshed
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   *
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def prunePartitionsByFilter(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
-    ExternalCatalogUtils.prunePartitionsByFilter(
-      sparkSession.sessionState.catalog.getTableMetadata(identifier),
-      allPartitions,
-      partitionFilters,
-      sparkSession.sessionState.conf.sessionLocalTimeZone
-    )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index a533db0..d9cf3f7 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -18,24 +18,15 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel}
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser}
-import org.apache.spark.sql.types.DecimalType
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
 
 class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) {
+  extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
 
   val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
@@ -55,75 +46,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
     }
   }
 
-  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
-
-    val newColumn = visitColType(ctx.colType)
-    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
-      throw new MalformedCarbonCommandException(
-        "Column names provided are different. Both the column names should be same")
-    }
-
-    val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
-      case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
-      case _ => (newColumn.dataType.typeName.toLowerCase, None)
-    }
-
-    val alterTableChangeDataTypeModel =
-      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
-        new CarbonSpark2SqlParser()
-          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
-        ctx.tableIdentifier().table.getText.toLowerCase,
-        ctx.identifier.getText.toLowerCase,
-        newColumn.name.toLowerCase)
-
-    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
-  }
-
-
   override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
-    val fields = parser.getFields(cols)
-    val tblProperties = scala.collection.mutable.Map.empty[String, String]
-    val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
-      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
-        .map(_.getText)),
-      ctx.tableIdentifier.table.getText.toLowerCase,
-      fields,
-      Seq.empty,
-      tblProperties,
-      None,
-      true)
-
-    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
-      Option(ctx.tableIdentifier().db).map(_.getText),
-      ctx.tableIdentifier.table.getText,
-      tblProperties.toMap,
-      tableModel.dimCols,
-      tableModel.msrCols,
-      tableModel.highcardinalitydims.getOrElse(Seq.empty))
-
-    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
-  }
-
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    super.visitCreateTable(ctx)
-  }
-
-  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
-    withOrigin(ctx) {
-      if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
-          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
-        super.visitShowTables(ctx)
-      } else {
-        CarbonShowTablesCommand(
-          Option(ctx.db).map(_.getText),
-          Option(ctx.pattern).map(string))
-      }
-    }
-  }
-
-  override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
-    CarbonExplainCommand(super.visitExplain(ctx))
+    visitAddTableColumns(parser,ctx)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
deleted file mode 100644
index 2128ffd..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.buildConf
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
-  val carbonProperties = CarbonProperties.getInstance()
-
-  /**
-   * To initialize dynamic param defaults along with usage docs
-   */
-  def addDefaultCarbonParams(): Unit = {
-    val ENABLE_UNSAFE_SORT =
-      buildConf(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
-        .doc("To enable/ disable unsafe sort.")
-        .booleanConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
-      buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
-        .doc("To set carbon task distribution.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    val BAD_RECORDS_LOGGER_ENABLE =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
-        .doc("To enable/ disable carbon bad record logger.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants
-          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    val BAD_RECORDS_ACTION =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
-        .doc("To configure the bad records action.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    val IS_EMPTY_DATA_BAD_RECORD =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
-        .doc("Property to decide weather empty data to be considered bad/ good record.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
-          .toBoolean)
-    val SORT_SCOPE =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
-        .doc("Property to specify sort scope.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    val BATCH_SORT_SIZE_INMB =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
-        .doc("Property to specify batch sort size in MB.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    val SINGLE_PASS =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
-        .doc("Property to enable/disable single_pass.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    val BAD_RECORD_PATH =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
-        .doc("Property to configure the bad record location.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    val GLOBAL_SORT_PARTITIONS =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
-        .doc("Property to configure the global sort partitions.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    val DATEFORMAT =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
-        .doc("Property to configure data format for date type columns.")
-        .stringConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-    val CARBON_INPUT_SEGMENTS = buildConf(
-      "carbon.input.segments.<database_name>.<table_name>")
-      .doc("Property to configure the list of segments to query.").stringConf
-      .createWithDefault(carbonProperties
-        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
-  }
-  /**
-   * to set the dynamic properties default values
-   */
-  def addDefaultCarbonSessionParams(): Unit = {
-    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-      carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
deleted file mode 100644
index 4e22d13..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
-import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
-import org.apache.spark.sql.sources.BaseRelation
-
-/**
- * Create table 'using carbondata' and insert the query result into it.
- *
- * @param table the Catalog Table
- * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
- * @param query the query whose result will be insert into the new relation
- *
- */
-
-case class CreateCarbonSourceTableAsSelectCommand(
-    table: CatalogTable,
-    mode: SaveMode,
-    query: LogicalPlan)
-  extends RunnableCommand {
-
-  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    assert(table.tableType != CatalogTableType.VIEW)
-    assert(table.provider.isDefined)
-
-    val sessionState = sparkSession.sessionState
-    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = table.identifier.copy(database = Some(db))
-    val tableName = tableIdentWithDB.unquotedString
-
-    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
-      assert(mode != SaveMode.Overwrite,
-        s"Expect the table $tableName has been dropped when the save mode is Overwrite")
-
-      if (mode == SaveMode.ErrorIfExists) {
-        throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
-      }
-      if (mode == SaveMode.Ignore) {
-        // Since the table already exists and the save mode is Ignore, we will just return.
-        return Seq.empty
-      }
-
-      saveDataIntoTable(
-        sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
-    } else {
-      assert(table.schema.isEmpty)
-
-      val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
-        Some(sessionState.catalog.defaultTablePath(table.identifier))
-      } else {
-        table.storage.locationUri
-      }
-      val result = saveDataIntoTable(
-        sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
-
-      result match {
-        case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
-                                     sparkSession.sqlContext.conf.manageFilesourcePartitions =>
-          // Need to recover partitions into the metastore so our saved data is visible.
-          sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
-        case _ =>
-      }
-    }
-
-    Seq.empty[Row]
-  }
-
-  private def saveDataIntoTable(
-      session: SparkSession,
-      table: CatalogTable,
-      tableLocation: Option[URI],
-      data: LogicalPlan,
-      mode: SaveMode,
-      tableExists: Boolean): BaseRelation = {
-    // Create the relation based on the input logical plan: `data`.
-    val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
-    val dataSource = DataSource(
-      session,
-      className = table.provider.get,
-      partitionColumns = table.partitionColumnNames,
-      bucketSpec = table.bucketSpec,
-      options = table.storage.properties ++ pathOption,
-      catalogTable = if (tableExists) {
-        Some(table)
-      } else {
-        None
-      })
-
-    try {
-      dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
-    } catch {
-      case ex: AnalysisException =>
-        logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
-        throw ex
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..ca79720
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapater {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+      }
+    })
+  }
+
+  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+                               metadata: Metadata, exprId: ExprId, qualifier: Option[String],
+                               attrRef : NamedExpression): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier)
+  }
+
+  def createAliasRef(child: Expression,
+                     name: String,
+                     exprId: ExprId = NamedExpression.newExprId,
+                     qualifier: Option[String] = None,
+                     explicitMetadata: Option[Metadata] = None,
+                     namedExpr : Option[NamedExpression] = None ) : Alias = {
+
+      Alias(child, name)(exprId, qualifier, explicitMetadata)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation())
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CustomDeterministicExpression.scala
new file mode 100644
index 0000000..2457dd5
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Custom expression to override the deterministic property .
+ */
+case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable {
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = null
+
+  override def dataType: DataType = StringType
+
+  override def children: Seq[Expression] = Seq()
+
+  override lazy val deterministic: Boolean = true
+
+  def childexp : Expression = nonDt
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
new file mode 100644
index 0000000..79fde66
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.Utils
+
+/** Physical plan node for scanning data from a batched relation. */
+case class BatchedDataSourceScanExec(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val outputPartitioning: Partitioning,
+    override val metadata: Map[String, String],
+    override val tableIdentifier: Option[TableIdentifier],
+    @transient logicalRelation: LogicalRelation)
+  extends DataSourceScanExec with CodegenSupport {
+
+  override lazy val metrics =
+    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    // in the case of fallback, this batched scan should never fail because of:
+    // 1) only primitive types are supported
+    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
+    WholeStageCodegenExec(this)(codegenStageId = 0).execute()
+  }
+
+  override def simpleString: String = {
+    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+      key + ": " + StringUtils.abbreviate(value, 100)
+    }
+    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    rdd :: Nil
+  }
+
+  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
+      dataType: DataType, nullable: Boolean): ExprCode = {
+    val javaType = ctx.javaType(dataType)
+    val value = ctx.getValue(columnVar, dataType, ordinal)
+    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
+    val valueVar = ctx.freshName("value")
+    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
+      s"""
+        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
+        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
+      """
+    } else {
+      s"$javaType ${valueVar} = $value;"
+    }).trim
+    ExprCode(code, isNullVar, valueVar)
+  }
+
+  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
+  // never requires UnsafeRow as input.
+  override protected def doProduce(ctx: CodegenContext): String = {
+    // PhysicalRDD always just has one input
+    val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];")
+
+    // metrics
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    val scanTimeMetric = metricTerm(ctx, "scanTime")
+    val scanTimeTotalNs = ctx.addMutableState("long", "scanTime", v =>  s"$v = 0;")
+
+    val columnarBatchClz = "org.apache.spark.sql.vectorized.ColumnarBatch"
+    val batch = ctx.addMutableState(columnarBatchClz, "batch", v => s"$v = null;")
+
+    val columnVectorClz = "org.apache.spark.sql.vectorized.ColumnVector"
+    val idx = ctx.addMutableState("int", "batchIdx", v => s"$v = 0;")
+    val (colVars,columnAssigns) = output.indices.map {  i =>
+      val name = ctx.addMutableState(columnVectorClz, s"colInstance$i")
+        (name, s"$name = ($columnVectorClz) $batch.column($i);")
+    }.unzip
+
+    val nextBatch = ctx.freshName("nextBatch")
+    val nextBatchFuncName = ctx.addNewFunction(nextBatch,
+      s"""
+         |private void $nextBatch() throws java.io.IOException {
+         |  long getBatchStart = System.nanoTime();
+         |  if ($input.hasNext()) {
+         |    $batch = ($columnarBatchClz)$input.next();
+         |    $numOutputRows.add($batch.numRows());
+         |    $idx = 0;
+         |    ${columnAssigns.mkString("", "\n", "\n")}
+         |  }
+         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+         |}""".stripMargin)
+
+    ctx.currentVars = null
+    val rowidx = ctx.freshName("rowIdx")
+    val columnsBatchInput = (output zip colVars).map { case (attr, colvar) =>
+      genCodeColumnVector(ctx, colvar, rowidx, attr.dataType, attr.nullable)
+    }
+    s"""
+       |if ($batch == null) {
+       |  $nextBatchFuncName();
+       |}
+       |while ($batch != null) {
+       |  int numRows = $batch.numRows();
+       |  while ($idx < numRows) {
+       |    int $rowidx = $idx++;
+       |    ${consume(ctx, columnsBatchInput).trim}
+       |    if (shouldStop()) return;
+       |  }
+       |  $batch = null;
+       |  $nextBatchFuncName();
+       |}
+       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+       |$scanTimeTotalNs = 0;
+     """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
new file mode 100644
index 0000000..f1b632b
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.ExperimentalMethods
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.internal.SQLConf
+
+
+class CarbonOptimizer(
+    catalog: SessionCatalog,
+    conf: SQLConf,
+    experimentalMethods: ExperimentalMethods)
+  extends SparkOptimizer(catalog, experimentalMethods) {
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+    super.execute(transFormedPlan)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
new file mode 100644
index 0000000..2bba2ae
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
+
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
+    val fileStorage = helper.getFileStorage(ctx.createFileFormat(0))
+
+    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("carbondata") ||
+        fileStorage.equalsIgnoreCase("'carbonfile'") ||
+        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
+        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),ctx.locationSpec(0),
+        Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+      helper.createCarbonTable(createTableTuple)
+    } else {
+      super.visitCreateHiveTable(ctx)
+    }
+  }
+
+  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
+    visitAddTableColumns(parser,ctx)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java b/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
new file mode 100644
index 0000000..f357e41
--- /dev/null
+++ b/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.streaming.CarbonStreamInputFormat;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamRecordReaderTest extends TestCase {
+
+  private TaskAttemptID taskAttemptId;
+  private TaskAttemptContext taskAttemptContext;
+  private Configuration hadoopConf;
+  private AbsoluteTableIdentifier identifier;
+  private String tablePath;
+
+
+  @Override protected void setUp() throws Exception {
+    tablePath = new File("target/stream_input").getCanonicalPath();
+    String dbName = "default";
+    String tableName = "stream_table_input";
+    identifier = AbsoluteTableIdentifier.from(
+        tablePath,
+        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+    JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+    taskAttemptId = new TaskAttemptID(taskId, 0);
+
+    hadoopConf = new Configuration();
+    taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+  }
+
+  private InputSplit buildInputSplit() throws IOException {
+    CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
+    List<CarbonInputSplit> splitList = new ArrayList<>();
+    splitList.add(carbonInputSplit);
+    return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" },
+        FileFormat.ROW_V1);
+  }
+
+  @Test public void testCreateRecordReader() {
+    try {
+      InputSplit inputSplit = buildInputSplit();
+      CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
+      RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+      Assert.assertNotNull("Failed to create record reader", recordReader);
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertTrue(e.getMessage(), false);
+    }
+  }
+
+  @Override protected void tearDown() throws Exception {
+    super.tearDown();
+    if (tablePath != null) {
+      FileFactory.deleteAllFilesOfDir(new File(tablePath));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
index cdcf018..466475b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
@@ -80,7 +80,7 @@ class BooleanDataTypesFilterTest extends QueryTest with BeforeAndAfterEach with
     checkAnswer(sql("select count(*) from carbon_table where booleanField = true"),
       Row(4))
 
-    if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
+    if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
       checkAnswer(sql("select count(*) from carbon_table where booleanField = 'true'"),
         Row(0))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
index c220f1c..4050fd8 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
@@ -283,7 +283,7 @@ class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with Be
     checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
       Row(10))
 
-    if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
+    if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
       checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
         Row(0))
 
@@ -373,7 +373,7 @@ class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with Be
     checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
       Row(10))
 
-    if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
+    if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
       checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
         Row(0))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 65a006b..d94d56c 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.carbondata.bucketing
 
 import org.apache.spark.sql.common.util.Spark2QueryTest
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.execution.exchange.Exchange
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -127,7 +127,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: ShuffleExchange => shuffleExists = true
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
     }
     assert(shuffleExists, "shuffle should exist on non bucket tables")
   }
@@ -145,7 +149,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: ShuffleExchange => shuffleExists = true
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
     }
     assert(!shuffleExists, "shuffle should not exist on bucket tables")
   }
@@ -170,7 +178,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: ShuffleExchange => shuffleExists = true
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
     }
     assert(!shuffleExists, "shuffle should not exist on bucket tables")
     sql("DROP TABLE bucketed_parquet_table")
@@ -195,7 +207,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: ShuffleExchange => shuffleExists = true
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
     }
     assert(shuffleExists, "shuffle should exist on non bucket tables")
     sql("DROP TABLE parquet_table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
index 66cf675..089dfd2 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.carbondata.query
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.Spark2QueryTest
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.scalatest.BeforeAndAfterAll
 


Mime
View raw message