carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [5/6] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, compatability issues
Date Tue, 10 Jul 2018 15:01:14 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
new file mode 100644
index 0000000..74c0f97
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -0,0 +1,130 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.net.URI
+
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.util.CarbonReflectionUtils
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ *
+ * @param table the Catalog Table
+ * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ *
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+    table: CatalogTable,
+    mode: SaveMode,
+    query: LogicalPlan)
+  extends RunnableCommand {
+
+  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+
+    val sessionState = sparkSession.sessionState
+    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+    val tableIdentWithDB = table.identifier.copy(database = Some(db))
+    val tableName = tableIdentWithDB.unquotedString
+
+    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+      assert(mode != SaveMode.Overwrite,
+        s"Expect the table $tableName has been dropped when the save mode is Overwrite")
+
+      if (mode == SaveMode.ErrorIfExists) {
+        throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
+      }
+      if (mode == SaveMode.Ignore) {
+        // Since the table already exists and the save mode is Ignore, we will just return.
+        return Seq.empty
+      }
+
+      saveDataIntoTable(
+        sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
+    } else {
+      assert(table.schema.isEmpty)
+
+      val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+        Some(sessionState.catalog.defaultTablePath(table.identifier))
+      } else {
+        table.storage.locationUri
+      }
+      val result = saveDataIntoTable(
+        sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
+
+      result match {
+        case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+                                     sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+          // Need to recover partitions into the metastore so our saved data is visible.
+          sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+        case _ =>
+      }
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def saveDataIntoTable(
+      session: SparkSession,
+      table: CatalogTable,
+      tableLocation: Option[URI],
+      data: LogicalPlan,
+      mode: SaveMode,
+      tableExists: Boolean): BaseRelation = {
+    // Create the relation based on the input logical plan: `data`.
+    val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
+    val dataSource = DataSource(
+      session,
+      className = table.provider.get,
+      partitionColumns = table.partitionColumnNames,
+      bucketSpec = table.bucketSpec,
+      options = table.storage.properties ++ pathOption,
+      catalogTable = if (tableExists) {
+        Some(table)
+      } else {
+        None
+      })
+
+    try {
+      val physicalPlan = session.sessionState.executePlan(data).executedPlan
+      CarbonReflectionUtils.invokewriteAndReadMethod(dataSource,
+        Dataset.ofRows(session, query),
+        data,
+        session,
+        mode,
+        query,
+        physicalPlan)
+    } catch {
+      case ex: AnalysisException =>
+        logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
+        throw ex
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
new file mode 100644
index 0000000..8f1477c
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext,ChangeColumnContext, CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel,AlterTableDataTypeChangeModel}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.types.DecimalType
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+trait SqlAstBuilderHelper extends SparkSqlAstBuilder {
+
+
+  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
+
+    val newColumn = visitColType(ctx.colType)
+    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
+      throw new MalformedCarbonCommandException(
+        "Column names provided are different. Both the column names should be same")
+    }
+
+    val (typeString, values): (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
+      case d: DecimalType => ("decimal", Some(List((d.precision, d.scale))))
+      case _ => (newColumn.dataType.typeName.toLowerCase, None)
+    }
+
+    val alterTableChangeDataTypeModel =
+      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
+        new CarbonSpark2SqlParser()
+          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
+        ctx.tableIdentifier().table.getText.toLowerCase,
+        ctx.identifier.getText.toLowerCase,
+        newColumn.name.toLowerCase)
+
+    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+  }
+
+
+  def visitAddTableColumns(parser: CarbonSpark2SqlParser,
+      ctx: AddTableColumnsContext): LogicalPlan = {
+    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+    val fields = parser.getFields(cols)
+    val tblProperties = scala.collection.mutable.Map.empty[String, String]
+    val tableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
+        .map(_.getText)),
+      ctx.tableIdentifier.table.getText.toLowerCase,
+      fields,
+      Seq.empty,
+      tblProperties,
+      None,
+      true)
+
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      Option(ctx.tableIdentifier().db).map(_.getText),
+      ctx.tableIdentifier.table.getText,
+      tblProperties.toMap,
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highcardinalitydims.getOrElse(Seq.empty))
+
+    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
+  }
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    super.visitCreateTable(ctx)
+  }
+
+  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
+    withOrigin(ctx) {
+      if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
+          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
+        super.visitShowTables(ctx)
+      } else {
+        CarbonShowTablesCommand(
+          Option(ctx.db).map(_.getText),
+          Option(ctx.pattern).map(string))
+      }
+    }
+  }
+
+  override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
+    CarbonExplainCommand(super.visitExplain(ctx))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
index 6f038eb..41fc013 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -20,7 +20,6 @@ import java.lang.Long
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.InputMetrics
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.util.TaskMetricsMap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index d433470..450ead1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -21,16 +21,11 @@ import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.HiveSessionCatalog
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.types.{StringType, TimestampType}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
 case class CarbonDictionaryCatalystDecoder(
@@ -125,22 +120,40 @@ case class InsertIntoCarbonTable (table: CarbonDatasourceHadoopRelation,
 object CountStarPlan {
   type ReturnType = (mutable.MutableList[Attribute], LogicalPlan)
 
-  /**
-   * It fill count star query attribute.
-   */
-  private def fillCountStarAttribute(
-      expr: Expression,
-      outputColumns: mutable.MutableList[Attribute]) {
-    expr match {
-      case par@Alias(_, _) =>
-        val head = par.children.head.children.head
-        head match {
-          case count: Count if count.children.head.isInstanceOf[Literal] =>
-            outputColumns += par.toAttribute
-          case _ =>
-        }
-    }
-  }
+ /**
+  * It fill count star query attribute.
+  * 2.2.1 plan
+  * Aggregate [count(1) AS count(1)#30L]
+  * +- Project
+  *
+  *2.3.0 plan
+  * Aggregate [cast(count(1) as string) AS count(1)#29]
+  * +- Project
+  */
+ private def fillCountStarAttribute(
+     expr: Expression,
+     outputColumns: mutable.MutableList[Attribute]) {
+   expr match {
+     case par@Alias(cast: Cast, _) =>
+       if (cast.child.isInstanceOf[AggregateExpression]) {
+         val head = cast.child.children.head
+         head match {
+           case count: Count if count.children.head.isInstanceOf[Literal] =>
+             outputColumns += par.toAttribute
+           case _ =>
+         }
+       }
+     case par@Alias(child, _) =>
+       if (child.isInstanceOf[AggregateExpression]) {
+         val head = child.children.head
+         head match {
+           case count: Count if count.children.head.isInstanceOf[Literal] =>
+             outputColumns += par.toAttribute
+           case _ =>
+         }
+       }
+   }
+ }
 
   def unapply(plan: LogicalPlan): Option[ReturnType] = {
     plan match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 6622246..e769cca 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -60,7 +62,13 @@ case class CarbonCountStar(
             carbonTable.getTableName,
             Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
       carbonTable)
-    val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
+    val valueRaw =
+      attributesRaw.head.dataType match {
+        case StringType => Seq(UTF8String.fromString(Long.box(rowCount).toString)).toArray
+          .asInstanceOf[Array[Any]]
+        case _ => Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]]
+      }
+    val value = new GenericInternalRow(valueRaw)
     val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
     val row = if (outUnsafeRows) unsafeProjection(value) else value
     sparkContext.parallelize(Seq(row))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 7fdba89..ee19503 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -359,12 +359,7 @@ object CarbonSession {
         // Register a successfully instantiated context to the singleton. This should be at the
         // end of the class definition so that the singleton is updated only if there is no
         // exception in the construction of the instance.
-        sparkContext.addSparkListener(new SparkListener {
-          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-            SparkSession.setDefaultSession(null)
-            SparkSession.sqlListener.set(null)
-          }
-        })
+        CarbonToSparkAdapater.addSparkListener(sparkContext)
         session.streams.addListener(new CarbonStreamingQueryListener(session))
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
deleted file mode 100644
index 6312746..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.types.{DataType, StringType}
-
-/**
- * Custom expression to override the deterministic property .
- */
-case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
-  override def nullable: Boolean = true
-
-  override def eval(input: InternalRow): Any = null
-
-  override def dataType: DataType = StringType
-
-  override def children: Seq[Expression] = Seq()
-
-  override def deterministic: Boolean = true
-
-  def childexp : Expression = nonDt
-
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
deleted file mode 100644
index 3c3efaa..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.util.Utils
-
-/** Physical plan node for scanning data from a batched relation. */
-case class BatchedDataSourceScanExec(
-    output: Seq[Attribute],
-    rdd: RDD[InternalRow],
-    @transient relation: BaseRelation,
-    override val outputPartitioning: Partitioning,
-    override val metadata: Map[String, String],
-    override val metastoreTableIdentifier: Option[TableIdentifier],
-    @transient logicalRelation: LogicalRelation)
-  extends DataSourceScanExec with CodegenSupport {
-
-  override lazy val metrics =
-    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    // in the case of fallback, this batched scan should never fail because of:
-    // 1) only primitive types are supported
-    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
-    WholeStageCodegenExec(this).execute()
-  }
-
-  override def simpleString: String = {
-    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
-      key + ": " + StringUtils.abbreviate(value, 100)
-    }
-    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
-    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    rdd :: Nil
-  }
-
-  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
-      dataType: DataType, nullable: Boolean): ExprCode = {
-    val javaType = ctx.javaType(dataType)
-    val value = ctx.getValue(columnVar, dataType, ordinal)
-    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
-    val valueVar = ctx.freshName("value")
-    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
-    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
-      s"""
-        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
-        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
-      """
-    } else {
-      s"$javaType ${valueVar} = $value;"
-    }).trim
-    ExprCode(code, isNullVar, valueVar)
-  }
-
-  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
-  // never requires UnsafeRow as input.
-  override protected def doProduce(ctx: CodegenContext): String = {
-    val input = ctx.freshName("input")
-    // PhysicalRDD always just has one input
-    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
-
-    // metrics
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
-    val scanTimeMetric = metricTerm(ctx, "scanTime")
-    val scanTimeTotalNs = ctx.freshName("scanTime")
-    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
-
-    val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
-    val batch = ctx.freshName("batch")
-    ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
-
-    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
-    val idx = ctx.freshName("batchIdx")
-    ctx.addMutableState("int", idx, s"$idx = 0;")
-    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
-    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
-      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
-      s"$name = $batch.column($i);"
-    }
-
-    val nextBatch = ctx.freshName("nextBatch")
-    ctx.addNewFunction(nextBatch,
-      s"""
-         |private void $nextBatch() throws java.io.IOException {
-         |  long getBatchStart = System.nanoTime();
-         |  if ($input.hasNext()) {
-         |    $batch = ($columnarBatchClz)$input.next();
-         |    $numOutputRows.add($batch.numRows());
-         |    $idx = 0;
-         |    ${columnAssigns.mkString("", "\n", "\n")}
-         |  }
-         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
-         |}""".stripMargin)
-
-    ctx.currentVars = null
-    val rowidx = ctx.freshName("rowIdx")
-    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
-      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
-    }
-    s"""
-       |if ($batch == null) {
-       |  $nextBatch();
-       |}
-       |while ($batch != null) {
-       |  int numRows = $batch.numRows();
-       |  while ($idx < numRows) {
-       |    int $rowidx = $idx++;
-       |    ${consume(ctx, columnsBatchInput).trim}
-       |    if (shouldStop()) return;
-       |  }
-       |  $batch = null;
-       |  $nextBatch();
-       |}
-       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
-       |$scanTimeTotalNs = 0;
-     """.stripMargin
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 38bdbcf..526e330 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -829,17 +829,21 @@ case class CarbonLoadDataCommand(
       // datatype is always int
       val column = table.getColumnByName(table.getTableName, attr.name)
       if (column.hasEncoding(Encoding.DICTIONARY)) {
-        AttributeReference(
-          attr.name,
+        CarbonToSparkAdapater.createAttributeReference(attr.name,
           IntegerType,
           attr.nullable,
-          attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+          attr.metadata,
+          attr.exprId,
+          attr.qualifier,
+          attr)
       } else if (attr.dataType == TimestampType || attr.dataType == DateType) {
-        AttributeReference(
-          attr.name,
+        CarbonToSparkAdapater.createAttributeReference(attr.name,
           LongType,
           attr.nullable,
-          attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+          attr.metadata,
+          attr.exprId,
+          attr.qualifier,
+          attr)
       } else {
         attr
       }
@@ -1037,7 +1041,8 @@ case class CarbonLoadDataCommand(
 
     CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
       hdfsRelation.schema.toAttributes,
-      Some(catalogTable))
+      Some(catalogTable),
+      false)
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 30db50a..4f34d3b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.optimizer.{CarbonDecoderRelation, CarbonFilters}
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -47,6 +48,7 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
+
 /**
  * Carbon specific optimization for late decode (convert dictionary key to value as late as
  * possible), which can improve the aggregation performance and reduce memory usage
@@ -162,11 +164,14 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       val updatedPartitionFilters = partitionKeyFilters.map { exp =>
         exp.transform {
           case attr: AttributeReference =>
-            AttributeReference(
+            CarbonToSparkAdapater.createAttributeReference(
               attr.name.toLowerCase,
               attr.dataType,
               attr.nullable,
-              attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+              attr.metadata,
+              attr.exprId,
+              attr.qualifier,
+              attr)
         }
       }
       partitions =
@@ -223,7 +228,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       }
     }
 
-    val (unhandledPredicates, pushedFilters) =
+    val (unhandledPredicates, pushedFilters, handledFilters ) =
       selectFilters(relation.relation, candidatePredicates)
 
     // A set of column attributes that are only referenced by pushed down filters.  We can eliminate
@@ -308,6 +313,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         scanBuilder,
         candidatePredicates,
         pushedFilters,
+        handledFilters,
         metadata,
         needDecoder,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
@@ -344,6 +350,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         scanBuilder,
         candidatePredicates,
         pushedFilters,
+        handledFilters,
         metadata,
         needDecoder,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
@@ -358,9 +365,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       output: Seq[Attribute],
       partitions: Seq[PartitionSpec],
       scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
-        ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow],
+        ArrayBuffer[AttributeReference], Seq[PartitionSpec])
+        => RDD[InternalRow],
       candidatePredicates: Seq[Expression],
-      pushedFilters: Seq[Filter],
+      pushedFilters: Seq[Filter], handledFilters: Seq[Filter],
       metadata: Map[String, String],
       needDecoder: ArrayBuffer[AttributeReference],
       updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
@@ -379,19 +387,16 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         metadata,
         relation.catalogTable.map(_.identifier), relation)
     } else {
-      RowDataSourceScanExec(output,
-        scanBuilder(updateRequestedColumns,
-          candidatePredicates,
-          pushedFilters,
-          needDecoder,
-          partitions),
-        relation.relation,
-        getPartitioning(table.carbonTable, updateRequestedColumns),
-        metadata,
-        relation.catalogTable.map(_.identifier))
+      val partition = getPartitioning(table.carbonTable, updateRequestedColumns)
+      val rdd = scanBuilder(updateRequestedColumns, candidatePredicates,
+        pushedFilters, needDecoder, partitions)
+      CarbonReflectionUtils.getRowDataSourceScanExecObj(relation, output,
+        pushedFilters, handledFilters,
+        rdd, partition, metadata)
     }
   }
 
+
   def updateRequestedColumnsFunc(requestedColumns: Seq[Expression],
       relation: CarbonDatasourceHadoopRelation,
       needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = {
@@ -469,7 +474,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
 
   protected[sql] def selectFilters(
       relation: BaseRelation,
-      predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
+      predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Seq[Filter]) = {
 
     // In case of ComplexType dataTypes no filters should be pushed down. IsNotNull is being
     // explicitly added by spark and pushed. That also has to be handled and pushed back to
@@ -534,7 +539,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     // a filter to every row or not.
     val (_, translatedFilters) = translated.unzip
 
-    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
+    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters, handledFilters)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index f5c5188..36da424 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.strategy
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command._
@@ -38,9 +39,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.util.CarbonProperties
 
-/**
- * Carbon strategies for ddl commands
- */
+  /**
+   * Carbon strategies for ddl commands
+   * CreateDataSourceTableAsSelectCommand class has extra argument in
+   * 2.3, so need to add wrapper to match the case
+   */
+object MatchCreateDataSourceTable {
+  def unapply(plan: LogicalPlan): Option[(CatalogTable, SaveMode, LogicalPlan)] = plan match {
+    case t: CreateDataSourceTableAsSelectCommand => Some(t.table, t.mode, t.query)
+    case _ => None
+  }
+}
 
 class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
   val LOGGER: LogService =
@@ -229,7 +238,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         val cmd =
           CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
-      case cmd@CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
+      case MatchCreateDataSourceTable(tableDesc, mode, query)
         if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
            && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
                || tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index b2f4505..24a2863 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCom
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
@@ -56,9 +56,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
 
       val projList = Seq(UnresolvedAlias(UnresolvedStar(alias.map(Seq(_)))), tupleId)
 
-      val tableRelation = if (SPARK_VERSION.startsWith("2.1")) {
+      val tableRelation = if (SparkUtil.isSparkVersionEqualToX("2.1")) {
         relation
-      } else if (SPARK_VERSION.startsWith("2.2")) {
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
         alias match {
           case Some(_) =>
             CarbonReflectionUtils.getSubqueryAlias(
@@ -171,9 +171,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
 
         val projList = Seq(UnresolvedAlias(UnresolvedStar(alias.map(Seq(_)))), tupleId)
         // include tuple id in subquery
-        if (SPARK_VERSION.startsWith("2.1")) {
+        if (SparkUtil.isSparkVersionEqualToX("2.1")) {
           Project(projList, relation)
-        } else if (SPARK_VERSION.startsWith("2.2")) {
+        } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
           alias match {
             case Some(_) =>
               val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
@@ -244,14 +244,13 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
           case attr => attr
         }
       }
-      val version = SPARK_VERSION
       val newChild: LogicalPlan = if (newChildOutput == child.output) {
-        if (version.startsWith("2.1")) {
+        if (SparkUtil.isSparkVersionEqualToX("2.1")) {
           CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
-        } else if (version.startsWith("2.2")) {
+        } else if (SparkUtil.isSparkVersionEqualToX("2.2")) {
           CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
         } else {
-          throw new UnsupportedOperationException(s"Spark version $version is not supported")
+          throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
         }
       } else {
         Project(newChildOutput, child)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 81a6bed..2f32066 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -23,7 +23,6 @@ import java.net.URI
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -31,7 +30,8 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
@@ -42,9 +42,8 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table
+import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.SchemaReader
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
@@ -71,6 +70,13 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
+object MatchLogicalRelation {
+  def unapply(logicalPlan: LogicalPlan): Option[(BaseRelation, Any, Any)] = logicalPlan match {
+    case l: LogicalRelation => Some(l.relation, l.output, l.catalogTable)
+    case _ => None
+  }
+}
+
 class CarbonFileMetastore extends CarbonMetaStore {
 
   @transient
@@ -142,13 +148,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
       sparkSession.catalog.currentDatabase)
     val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
       case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+      MatchLogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
         carbonDatasourceHadoopRelation.carbonRelation
-      case LogicalRelation(
+      case MatchLogicalRelation(
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDatasourceHadoopRelation.carbonRelation
       case SubqueryAlias(_, c)
-        if SPARK_VERSION.startsWith("2.2") &&
+        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
            (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
@@ -593,13 +599,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val relation: LogicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier)
     relation match {
       case SubqueryAlias(_,
-      LogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+      MatchLogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
         carbonDataSourceHadoopRelation
-      case LogicalRelation(
+      case MatchLogicalRelation(
       carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDataSourceHadoopRelation
       case SubqueryAlias(_, c)
-        if SPARK_VERSION.startsWith("2.2") &&
+        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
            (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName
               .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index c59246d..76ff41a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -273,11 +273,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         case attr: AttributeReference =>
           updatedExpression.find { p => p._1.sameRef(attr) } match {
             case Some((_, childAttr)) =>
-              AttributeReference(
+              CarbonToSparkAdapater.createAttributeReference(
                 childAttr.name,
                 childAttr.dataType,
                 childAttr.nullable,
-                childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
+                childAttr.metadata,
+                childAttr.exprId,
+                attr.qualifier,
+                attr)
             case None =>
               attr
           }
@@ -296,11 +299,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         case attr: AttributeReference =>
           updatedExpression.find { p => p._1.sameRef(attr) } match {
             case Some((_, childAttr)) =>
-              AttributeReference(
+              CarbonToSparkAdapater.createAttributeReference(
                 childAttr.name,
                 childAttr.dataType,
                 childAttr.nullable,
-                childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
+                childAttr.metadata,
+                childAttr.exprId,
+                attr.qualifier,
+                attr)
             case None =>
               attr
           }
@@ -777,24 +783,36 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         val factAlias = factPlanExpForStreaming(name)
         // create attribute reference object for each expression
         val attrs = factAlias.map { factAlias =>
-          AttributeReference(
+          CarbonToSparkAdapater.createAttributeReference(
             name,
             alias.dataType,
-            alias.nullable) (factAlias.exprId, alias.qualifier, alias.isGenerated)
+            alias.nullable,
+            Metadata.empty,
+            factAlias.exprId,
+            alias.qualifier,
+            alias)
         }
         // add aggregate function in Aggregate node added for handling streaming
         // to aggregate results from fact and aggregate table
         val updatedAggExp = getAggregateExpressionForAggregation(aggExp, attrs)
         // same reference id will be used as it can be used by above nodes in the plan like
         // sort, project, join
-        Alias(
+        CarbonToSparkAdapater.createAliasRef(
           updatedAggExp.head,
-          name)(alias.exprId, alias.qualifier, Option(alias.metadata), alias.isGenerated)
+          name,
+          alias.exprId,
+          alias.qualifier,
+          Option(alias.metadata),
+          Some(alias))
       case alias@Alias(expression, name) =>
-        AttributeReference(
+        CarbonToSparkAdapater.createAttributeReference(
           name,
           alias.dataType,
-          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
+          alias.nullable,
+          Metadata.empty,
+          alias.exprId,
+          alias.qualifier,
+          alias)
     }
     updatedExp
   }
@@ -897,9 +915,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           case attr: AttributeReference =>
             newAggExp += attr
           case exp: Expression =>
-            newAggExp += Alias(
+            newAggExp += CarbonToSparkAdapater.createAliasRef(
               exp,
-              "dummy_" + counter)(NamedExpression.newExprId, None, None, false)
+              "dummy_" + counter,
+              NamedExpression.newExprId)
             counter = counter + 1
         }
       }
@@ -923,12 +942,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         // get the new aggregate expression
         val newAggExp = getAggFunctionForFactStreaming(aggExp)
         val updatedExp = newAggExp.map { exp =>
-          Alias(exp,
-              name)(
-              NamedExpression.newExprId,
-              alias.qualifier,
+          CarbonToSparkAdapater.createAliasRef(exp,
+            name,
+            NamedExpression.newExprId,
+            alias.qualifier,
             Some(alias.metadata),
-              alias.isGenerated)
+            Some(alias))
         }
         // adding to map which will be used while Adding an Aggregate node for handling streaming
         // table plan change
@@ -936,10 +955,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         updatedExp
       case alias@Alias(exp: Expression, name) =>
         val newAlias = Seq(alias)
-        val attr = AttributeReference(name,
-            alias.dataType,
-            alias.nullable,
-            alias.metadata) (alias.exprId, alias.qualifier, alias.isGenerated)
+        val attr = CarbonToSparkAdapater.createAttributeReference(name,
+          alias.dataType,
+          alias.nullable,
+          alias.metadata,
+          alias.exprId,
+          alias.qualifier,
+          alias)
         factPlanGrpExpForStreaming.put(
           AggExpToColumnMappingModel(
             removeQualifiers(PreAggregateUtil.normalizeExprId(exp, plan.allAttributes))),
@@ -1093,11 +1115,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   private def removeQualifiers(expression: Expression) : Expression = {
     expression.transform {
       case attr: AttributeReference =>
-        AttributeReference(
+        CarbonToSparkAdapater.createAttributeReference(
           attr.name,
           attr.dataType,
           attr.nullable,
-          attr.metadata)(attr.exprId, None, attr.isGenerated)
+          attr.metadata,
+          attr.exprId,
+          None,
+          attr)
     }
   }
 
@@ -1363,10 +1388,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           attr,
           attributes)
         val newExpressionId = NamedExpression.newExprId
-        val childTableAttr = AttributeReference(attr.name,
+        val childTableAttr = CarbonToSparkAdapater.createAttributeReference(attr.name,
           childAttr.dataType,
           childAttr.nullable,
-          childAttr.metadata)(newExpressionId, childAttr.qualifier, attr.isGenerated)
+          childAttr.metadata,
+          newExpressionId,
+          childAttr.qualifier,
+          attr)
         updatedExpression.put(attr, childTableAttr)
         // returning the alias to show proper column name in output
         Seq(Alias(childAttr,
@@ -1378,12 +1406,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           attr,
           attributes)
         val newExpressionId = NamedExpression.newExprId
-        val parentTableAttr = AttributeReference(name,
+        val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
           alias.dataType,
-          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
-        val childTableAttr = AttributeReference(name,
+          alias.nullable,
+          Metadata.empty,
+          alias.exprId,
+          alias.qualifier,
+          alias)
+        val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
           alias.dataType,
-          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
+          alias.nullable,
+          Metadata.empty,
+          newExpressionId,
+          alias.qualifier,
+          alias)
         updatedExpression.put(parentTableAttr, childTableAttr)
         // returning alias with child attribute reference
         Seq(Alias(childAttr,
@@ -1409,13 +1445,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           val newExpressionId = NamedExpression.newExprId
           // create a parent attribute reference which will be replced on node which may be referred
           // by node like sort join
-          val parentTableAttr = AttributeReference(name,
+          val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
             alias.dataType,
-            alias.nullable)(alias.exprId, alias.qualifier, alias.isGenerated)
+            alias.nullable,
+            Metadata.empty,
+            alias.exprId,
+            alias.qualifier,
+            alias)
           // creating a child attribute reference which will be replced
-          val childTableAttr = AttributeReference(name,
+          val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
             alias.dataType,
-            alias.nullable)(newExpressionId, alias.qualifier, alias.isGenerated)
+            alias.nullable,
+            Metadata.empty,
+            newExpressionId,
+            alias.qualifier,
+            alias)
           // adding to map, will be used during other node updation like sort, join, project
           updatedExpression.put(parentTableAttr, childTableAttr)
           // returning alias with child attribute reference
@@ -1426,12 +1470,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           // for streaming table
           // create alias for aggregate table
           val aggExpForStreaming = aggExp.map{ exp =>
-            Alias(exp,
-              name)(
+            CarbonToSparkAdapater.createAliasRef(exp,
+              name,
               NamedExpression.newExprId,
               alias.qualifier,
               Some(alias.metadata),
-              alias.isGenerated).asInstanceOf[NamedExpression]
+              Some(alias)).asInstanceOf[NamedExpression]
           }
           aggExpForStreaming
         }
@@ -1460,12 +1504,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             }
           }
         val newExpressionId = NamedExpression.newExprId
-        val parentTableAttr = AttributeReference(name,
+        val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
           alias.dataType,
-          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
-        val childTableAttr = AttributeReference(name,
+          alias.nullable,
+          Metadata.empty,
+          alias.exprId,
+          alias.qualifier,
+          alias)
+        val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
           alias.dataType,
-          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
+          alias.nullable,
+          Metadata.empty,
+          newExpressionId,
+          alias.qualifier,
+          alias)
         updatedExpression.put(parentTableAttr, childTableAttr)
         Seq(Alias(updatedExp, name)(newExpressionId,
           alias.qualifier).asInstanceOf[NamedExpression])
@@ -1787,20 +1839,23 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
               // named expression list otherwise update the list and add it to set
               if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
                 namedExpressionList +=
-                Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId,
+                CarbonToSparkAdapater.createAliasRef(expressions.head,
+                  name + "_ sum",
+                  NamedExpression.newExprId,
                   alias.qualifier,
                   Some(alias.metadata),
-                  alias.isGenerated)
+                  Some(alias))
                 validExpressionsMap += AggExpToColumnMappingModel(sumExp)
               }
               // check with same expression already count is present then do not add to
               // named expression list otherwise update the list and add it to set
               if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
                 namedExpressionList +=
-                Alias(expressions.last, name + "_ count")(NamedExpression.newExprId,
-                  alias.qualifier,
-                  Some(alias.metadata),
-                  alias.isGenerated)
+                CarbonToSparkAdapater.createAliasRef(expressions.last, name + "_ count",
+                    NamedExpression.newExprId,
+                    alias.qualifier,
+                    Some(alias.metadata),
+                    Some(alias))
                 validExpressionsMap += AggExpToColumnMappingModel(countExp)
               }
             } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index c052cd7..a517bd4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.CarbonEndsWith
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.CarbonSessionCatalog
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -49,6 +50,7 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 
+
 /**
  * All filter conversions are done here.
  */
@@ -294,6 +296,27 @@ object CarbonFilters {
     filters.flatMap(translate(_, false)).toArray
   }
 
+  /**
+   * This API checks whether StringTrim object is compatible with
+   * carbon,carbon only deals with the space any other symbol should
+   * be ignored.So condition is SPARK version < 2.3.
+   * If it is 2.3 then trimStr field should be empty
+   *
+   * @param stringTrim
+   * @return
+   */
+  def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
+    var isCompatible = true
+    if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
+        .asInstanceOf[Option[Expression]]
+      if (trimStr.isDefined) {
+        isCompatible = false
+      }
+    }
+    isCompatible
+  }
+
   def transformExpression(expr: Expression): CarbonExpression = {
     expr match {
       case Or(left, right)
@@ -381,7 +404,8 @@ object CarbonFilters {
           new CarbonLiteralExpression(maxValueLimit,
             CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
         new AndExpression(l, r)
-      case StringTrim(child) => transformExpression(child)
+      case strTrim: StringTrim if isStringTrimCompatibleWithCarbon(strTrim) =>
+        transformExpression(strTrim)
       case s: ScalaUDF =>
         new MatchExpression(s.children.head.toString())
       case _ =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 7ed1705..505ca1b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -764,8 +764,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           p.transformAllExpressions {
             case a@Alias(exp, _)
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
-                a.explicitMetadata, a.isGenerated)
+              CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp),
+                a.name,
+                a.exprId,
+                a.qualifier,
+                a.explicitMetadata,
+                Some(a))
             case exp: NamedExpression
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
               CustomDeterministicExpression(exp)
@@ -778,8 +782,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           f.transformAllExpressions {
             case a@Alias(exp, _)
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
-                a.explicitMetadata, a.isGenerated)
+              CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp),
+                a.name,
+                a.exprId,
+                a.qualifier,
+                a.explicitMetadata,
+                Some(a))
             case exp: NamedExpression
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
               CustomDeterministicExpression(exp)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index f00fcf8..423bc32 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.{CarbonEnv, DeleteRecords, UpdateTable}
+import org.apache.spark.sql.{CarbonToSparkAdapater, DeleteRecords, UpdateTable}
 import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -479,7 +479,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         logicalPlan match {
           case _: CarbonCreateTableCommand =>
             ExplainCommand(logicalPlan, extended = isExtended.isDefined)
-          case _ => ExplainCommand(OneRowRelation)
+          case _ => CarbonToSparkAdapater.getExplainCommandObj
         }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..f9922b3
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapater {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+        SparkSession.sqlListener.set(null)
+      }
+    })
+  }
+
+  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+                               attrRef : NamedExpression): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier,attrRef.isGenerated)
+  }
+
+  def createAliasRef(child: Expression,
+                     name: String,
+                     exprId: ExprId = NamedExpression.newExprId,
+                     qualifier: Option[String] = None,
+                     explicitMetadata: Option[Metadata] = None,
+                     namedExpr: Option[NamedExpression] = None): Alias = {
+    val isGenerated:Boolean = if (namedExpr.isDefined) {
+      namedExpr.get.isGenerated
+    } else {
+      false
+    }
+    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CustomDeterministicExpression.scala
new file mode 100644
index 0000000..6312746
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Custom expression to override the deterministic property .
+ */
+case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = null
+
+  override def dataType: DataType = StringType
+
+  override def children: Seq[Expression] = Seq()
+
+  override def deterministic: Boolean = true
+
+  def childexp : Expression = nonDt
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.1/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
new file mode 100644
index 0000000..3c3efaa
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.Utils
+
+/** Physical plan node for scanning data from a batched relation. */
+case class BatchedDataSourceScanExec(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val outputPartitioning: Partitioning,
+    override val metadata: Map[String, String],
+    override val metastoreTableIdentifier: Option[TableIdentifier],
+    @transient logicalRelation: LogicalRelation)
+  extends DataSourceScanExec with CodegenSupport {
+
+  override lazy val metrics =
+    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    // in the case of fallback, this batched scan should never fail because of:
+    // 1) only primitive types are supported
+    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
+    WholeStageCodegenExec(this).execute()
+  }
+
+  override def simpleString: String = {
+    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+      key + ": " + StringUtils.abbreviate(value, 100)
+    }
+    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    rdd :: Nil
+  }
+
+  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
+      dataType: DataType, nullable: Boolean): ExprCode = {
+    val javaType = ctx.javaType(dataType)
+    val value = ctx.getValue(columnVar, dataType, ordinal)
+    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
+    val valueVar = ctx.freshName("value")
+    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
+      s"""
+        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
+        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
+      """
+    } else {
+      s"$javaType ${valueVar} = $value;"
+    }).trim
+    ExprCode(code, isNullVar, valueVar)
+  }
+
+  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
+  // never requires UnsafeRow as input.
+  override protected def doProduce(ctx: CodegenContext): String = {
+    val input = ctx.freshName("input")
+    // PhysicalRDD always just has one input
+    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+    // metrics
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    val scanTimeMetric = metricTerm(ctx, "scanTime")
+    val scanTimeTotalNs = ctx.freshName("scanTime")
+    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+    val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+    val batch = ctx.freshName("batch")
+    ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
+
+    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
+    val idx = ctx.freshName("batchIdx")
+    ctx.addMutableState("int", idx, s"$idx = 0;")
+    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
+    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
+      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
+      s"$name = $batch.column($i);"
+    }
+
+    val nextBatch = ctx.freshName("nextBatch")
+    ctx.addNewFunction(nextBatch,
+      s"""
+         |private void $nextBatch() throws java.io.IOException {
+         |  long getBatchStart = System.nanoTime();
+         |  if ($input.hasNext()) {
+         |    $batch = ($columnarBatchClz)$input.next();
+         |    $numOutputRows.add($batch.numRows());
+         |    $idx = 0;
+         |    ${columnAssigns.mkString("", "\n", "\n")}
+         |  }
+         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+         |}""".stripMargin)
+
+    ctx.currentVars = null
+    val rowidx = ctx.freshName("rowIdx")
+    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
+    }
+    s"""
+       |if ($batch == null) {
+       |  $nextBatch();
+       |}
+       |while ($batch != null) {
+       |  int numRows = $batch.numRows();
+       |  while ($idx < numRows) {
+       |    int $rowidx = $idx++;
+       |    ${consume(ctx, columnsBatchInput).trim}
+       |    if (shouldStop()) return;
+       |  }
+       |  $batch = null;
+       |  $nextBatch();
+       |}
+       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+       |$scanTimeTotalNs = 0;
+     """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index 989b1d5..dd690e4 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -34,11 +34,9 @@ import org.apache.spark.sql.types.StructType
 
 /**
  * Create table 'using carbondata' and insert the query result into it.
- *
  * @param table the Catalog Table
  * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
  * @param query the query whose result will be insert into the new relation
- *
  */
 
 case class CreateCarbonSourceTableAsSelectCommand(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..f9922b3
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapater {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+        SparkSession.sqlListener.set(null)
+      }
+    })
+  }
+
+  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+                               attrRef : NamedExpression): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier,attrRef.isGenerated)
+  }
+
+  def createAliasRef(child: Expression,
+                     name: String,
+                     exprId: ExprId = NamedExpression.newExprId,
+                     qualifier: Option[String] = None,
+                     explicitMetadata: Option[Metadata] = None,
+                     namedExpr: Option[NamedExpression] = None): Alias = {
+    val isGenerated:Boolean = if (namedExpr.isDefined) {
+      namedExpr.get.isGenerated
+    } else {
+      false
+    }
+    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala
new file mode 100644
index 0000000..6312746
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Custom expression to override the deterministic property .
+ */
+case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = null
+
+  override def dataType: DataType = StringType
+
+  override def children: Seq[Expression] = Seq()
+
+  override def deterministic: Boolean = true
+
+  def childexp : Expression = nonDt
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+}


Mime
View raw message