carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [3/4] carbondata git commit: Revert "[CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, compatability issues"
Date Fri, 13 Jul 2018 09:37:47 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
deleted file mode 100644
index 74c0f97..0000000
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
-import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
-import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
-import org.apache.spark.util.CarbonReflectionUtils
-
-/**
- * Create table 'using carbondata' and insert the query result into it.
- *
- * @param table the Catalog Table
- * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
- * @param query the query whose result will be insert into the new relation
- *
- */
-
-case class CreateCarbonSourceTableAsSelectCommand(
-    table: CatalogTable,
-    mode: SaveMode,
-    query: LogicalPlan)
-  extends RunnableCommand {
-
-  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    assert(table.tableType != CatalogTableType.VIEW)
-    assert(table.provider.isDefined)
-
-    val sessionState = sparkSession.sessionState
-    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = table.identifier.copy(database = Some(db))
-    val tableName = tableIdentWithDB.unquotedString
-
-    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
-      assert(mode != SaveMode.Overwrite,
-        s"Expect the table $tableName has been dropped when the save mode is Overwrite")
-
-      if (mode == SaveMode.ErrorIfExists) {
-        throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
-      }
-      if (mode == SaveMode.Ignore) {
-        // Since the table already exists and the save mode is Ignore, we will just return.
-        return Seq.empty
-      }
-
-      saveDataIntoTable(
-        sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
-    } else {
-      assert(table.schema.isEmpty)
-
-      val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
-        Some(sessionState.catalog.defaultTablePath(table.identifier))
-      } else {
-        table.storage.locationUri
-      }
-      val result = saveDataIntoTable(
-        sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
-
-      result match {
-        case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
-                                     sparkSession.sqlContext.conf.manageFilesourcePartitions =>
-          // Need to recover partitions into the metastore so our saved data is visible.
-          sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
-        case _ =>
-      }
-    }
-
-    Seq.empty[Row]
-  }
-
-  private def saveDataIntoTable(
-      session: SparkSession,
-      table: CatalogTable,
-      tableLocation: Option[URI],
-      data: LogicalPlan,
-      mode: SaveMode,
-      tableExists: Boolean): BaseRelation = {
-    // Create the relation based on the input logical plan: `data`.
-    val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
-    val dataSource = DataSource(
-      session,
-      className = table.provider.get,
-      partitionColumns = table.partitionColumnNames,
-      bucketSpec = table.bucketSpec,
-      options = table.storage.properties ++ pathOption,
-      catalogTable = if (tableExists) {
-        Some(table)
-      } else {
-        None
-      })
-
-    try {
-      val physicalPlan = session.sessionState.executePlan(data).executedPlan
-      CarbonReflectionUtils.invokewriteAndReadMethod(dataSource,
-        Dataset.ofRows(session, query),
-        data,
-        session,
-        mode,
-        query,
-        physicalPlan)
-    } catch {
-      case ex: AnalysisException =>
-        logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
-        throw ex
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
deleted file mode 100644
index 8f1477c..0000000
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext,ChangeColumnContext, CreateTableContext, ShowTablesContext}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel,AlterTableDataTypeChangeModel}
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
-import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.sql.types.DecimalType
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-trait SqlAstBuilderHelper extends SparkSqlAstBuilder {
-
-
-  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
-
-    val newColumn = visitColType(ctx.colType)
-    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
-      throw new MalformedCarbonCommandException(
-        "Column names provided are different. Both the column names should be same")
-    }
-
-    val (typeString, values): (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
-      case d: DecimalType => ("decimal", Some(List((d.precision, d.scale))))
-      case _ => (newColumn.dataType.typeName.toLowerCase, None)
-    }
-
-    val alterTableChangeDataTypeModel =
-      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
-        new CarbonSpark2SqlParser()
-          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
-        ctx.tableIdentifier().table.getText.toLowerCase,
-        ctx.identifier.getText.toLowerCase,
-        newColumn.name.toLowerCase)
-
-    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
-  }
-
-
-  def visitAddTableColumns(parser: CarbonSpark2SqlParser,
-      ctx: AddTableColumnsContext): LogicalPlan = {
-    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
-    val fields = parser.getFields(cols)
-    val tblProperties = scala.collection.mutable.Map.empty[String, String]
-    val tableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
-      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
-        .map(_.getText)),
-      ctx.tableIdentifier.table.getText.toLowerCase,
-      fields,
-      Seq.empty,
-      tblProperties,
-      None,
-      true)
-
-    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
-      Option(ctx.tableIdentifier().db).map(_.getText),
-      ctx.tableIdentifier.table.getText,
-      tblProperties.toMap,
-      tableModel.dimCols,
-      tableModel.msrCols,
-      tableModel.highcardinalitydims.getOrElse(Seq.empty))
-
-    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
-  }
-
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    super.visitCreateTable(ctx)
-  }
-
-  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
-    withOrigin(ctx) {
-      if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
-          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
-        super.visitShowTables(ctx)
-      } else {
-        CarbonShowTablesCommand(
-          Option(ctx.db).map(_.getText),
-          Option(ctx.pattern).map(string))
-      }
-    }
-  }
-
-  override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
-    CarbonExplainCommand(super.visitExplain(ctx))
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
index 41fc013..6f038eb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -20,6 +20,7 @@ import java.lang.Long
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.InputMetrics
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.util.TaskMetricsMap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 450ead1..d433470 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -21,11 +21,16 @@ import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
+import org.apache.spark.sql.catalyst.expressions.aggregate.Count
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.HiveSessionCatalog
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.types.{StringType, TimestampType}
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
 case class CarbonDictionaryCatalystDecoder(
@@ -120,40 +125,22 @@ case class InsertIntoCarbonTable (table: CarbonDatasourceHadoopRelation,
 object CountStarPlan {
   type ReturnType = (mutable.MutableList[Attribute], LogicalPlan)
 
- /**
-  * It fill count star query attribute.
-  * 2.2.1 plan
-  * Aggregate [count(1) AS count(1)#30L]
-  * +- Project
-  *
-  *2.3.0 plan
-  * Aggregate [cast(count(1) as string) AS count(1)#29]
-  * +- Project
-  */
- private def fillCountStarAttribute(
-     expr: Expression,
-     outputColumns: mutable.MutableList[Attribute]) {
-   expr match {
-     case par@Alias(cast: Cast, _) =>
-       if (cast.child.isInstanceOf[AggregateExpression]) {
-         val head = cast.child.children.head
-         head match {
-           case count: Count if count.children.head.isInstanceOf[Literal] =>
-             outputColumns += par.toAttribute
-           case _ =>
-         }
-       }
-     case par@Alias(child, _) =>
-       if (child.isInstanceOf[AggregateExpression]) {
-         val head = child.children.head
-         head match {
-           case count: Count if count.children.head.isInstanceOf[Literal] =>
-             outputColumns += par.toAttribute
-           case _ =>
-         }
-       }
-   }
- }
+  /**
+   * It fill count star query attribute.
+   */
+  private def fillCountStarAttribute(
+      expr: Expression,
+      outputColumns: mutable.MutableList[Attribute]) {
+    expr match {
+      case par@Alias(_, _) =>
+        val head = par.children.head.children.head
+        head match {
+          case count: Count if count.children.head.isInstanceOf[Literal] =>
+            outputColumns += par.toAttribute
+          case _ =>
+        }
+    }
+  }
 
   def unapply(plan: LogicalPlan): Option[ReturnType] = {
     plan match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index e769cca..6622246 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -30,8 +30,6 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.types.StringType
-import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -62,13 +60,7 @@ case class CarbonCountStar(
             carbonTable.getTableName,
             Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
       carbonTable)
-    val valueRaw =
-      attributesRaw.head.dataType match {
-        case StringType => Seq(UTF8String.fromString(Long.box(rowCount).toString)).toArray
-          .asInstanceOf[Array[Any]]
-        case _ => Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]]
-      }
-    val value = new GenericInternalRow(valueRaw)
+    val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
     val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
     val row = if (outUnsafeRows) unsafeProjection(value) else value
     sparkContext.parallelize(Seq(row))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSessionBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSessionBuilder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSessionBuilder.scala
index 6deade1..47b5c8c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSessionBuilder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSessionBuilder.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
 import org.apache.spark.sql.profiler.Profiler
 import org.apache.spark.util.Utils
@@ -135,7 +136,12 @@ class CarbonSessionBuilder(builder: Builder) {
       // Register a successfully instantiated context to the singleton. This should be at the
       // end of the class definition so that the singleton is updated only if there is no
       // exception in the construction of the instance.
-      CarbonToSparkAdapater.addSparkListener(sparkContext)
+      sparkContext.addSparkListener(new SparkListener {
+        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+          SparkSession.setDefaultSession(null)
+          SparkSession.sqlListener.set(null)
+        }
+      })
       session.streams.addListener(new CarbonStreamingQueryListener(session))
     }
     session

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
new file mode 100644
index 0000000..6312746
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Custom expression to override the deterministic property .
+ */
+case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = null
+
+  override def dataType: DataType = StringType
+
+  override def children: Seq[Expression] = Seq()
+
+  override def deterministic: Boolean = true
+
+  def childexp : Expression = nonDt
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
new file mode 100644
index 0000000..3c3efaa
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.Utils
+
+/** Physical plan node for scanning data from a batched relation. */
+case class BatchedDataSourceScanExec(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val outputPartitioning: Partitioning,
+    override val metadata: Map[String, String],
+    override val metastoreTableIdentifier: Option[TableIdentifier],
+    @transient logicalRelation: LogicalRelation)
+  extends DataSourceScanExec with CodegenSupport {
+
+  override lazy val metrics =
+    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    // in the case of fallback, this batched scan should never fail because of:
+    // 1) only primitive types are supported
+    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
+    WholeStageCodegenExec(this).execute()
+  }
+
+  override def simpleString: String = {
+    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+      key + ": " + StringUtils.abbreviate(value, 100)
+    }
+    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    rdd :: Nil
+  }
+
+  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
+      dataType: DataType, nullable: Boolean): ExprCode = {
+    val javaType = ctx.javaType(dataType)
+    val value = ctx.getValue(columnVar, dataType, ordinal)
+    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
+    val valueVar = ctx.freshName("value")
+    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
+      s"""
+        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
+        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
+      """
+    } else {
+      s"$javaType ${valueVar} = $value;"
+    }).trim
+    ExprCode(code, isNullVar, valueVar)
+  }
+
+  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
+  // never requires UnsafeRow as input.
+  override protected def doProduce(ctx: CodegenContext): String = {
+    val input = ctx.freshName("input")
+    // PhysicalRDD always just has one input
+    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+    // metrics
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    val scanTimeMetric = metricTerm(ctx, "scanTime")
+    val scanTimeTotalNs = ctx.freshName("scanTime")
+    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+    val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+    val batch = ctx.freshName("batch")
+    ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
+
+    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
+    val idx = ctx.freshName("batchIdx")
+    ctx.addMutableState("int", idx, s"$idx = 0;")
+    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
+    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
+      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
+      s"$name = $batch.column($i);"
+    }
+
+    val nextBatch = ctx.freshName("nextBatch")
+    ctx.addNewFunction(nextBatch,
+      s"""
+         |private void $nextBatch() throws java.io.IOException {
+         |  long getBatchStart = System.nanoTime();
+         |  if ($input.hasNext()) {
+         |    $batch = ($columnarBatchClz)$input.next();
+         |    $numOutputRows.add($batch.numRows());
+         |    $idx = 0;
+         |    ${columnAssigns.mkString("", "\n", "\n")}
+         |  }
+         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+         |}""".stripMargin)
+
+    ctx.currentVars = null
+    val rowidx = ctx.freshName("rowIdx")
+    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
+    }
+    s"""
+       |if ($batch == null) {
+       |  $nextBatch();
+       |}
+       |while ($batch != null) {
+       |  int numRows = $batch.numRows();
+       |  while ($idx < numRows) {
+       |    int $rowidx = $idx++;
+       |    ${consume(ctx, columnsBatchInput).trim}
+       |    if (shouldStop()) return;
+       |  }
+       |  $batch = null;
+       |  $nextBatch();
+       |}
+       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+       |$scanTimeTotalNs = 0;
+     """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 526e330..38bdbcf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -829,21 +829,17 @@ case class CarbonLoadDataCommand(
       // datatype is always int
       val column = table.getColumnByName(table.getTableName, attr.name)
       if (column.hasEncoding(Encoding.DICTIONARY)) {
-        CarbonToSparkAdapater.createAttributeReference(attr.name,
+        AttributeReference(
+          attr.name,
           IntegerType,
           attr.nullable,
-          attr.metadata,
-          attr.exprId,
-          attr.qualifier,
-          attr)
+          attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
       } else if (attr.dataType == TimestampType || attr.dataType == DateType) {
-        CarbonToSparkAdapater.createAttributeReference(attr.name,
+        AttributeReference(
+          attr.name,
           LongType,
           attr.nullable,
-          attr.metadata,
-          attr.exprId,
-          attr.qualifier,
-          attr)
+          attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
       } else {
         attr
       }
@@ -1041,8 +1037,7 @@ case class CarbonLoadDataCommand(
 
     CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
       hdfsRelation.schema.toAttributes,
-      Some(catalogTable),
-      false)
+      Some(catalogTable))
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 4f34d3b..30db50a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -34,7 +34,6 @@ import org.apache.spark.sql.optimizer.{CarbonDecoderRelation, CarbonFilters}
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
-import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -48,7 +47,6 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
-
 /**
  * Carbon specific optimization for late decode (convert dictionary key to value as late as
  * possible), which can improve the aggregation performance and reduce memory usage
@@ -164,14 +162,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       val updatedPartitionFilters = partitionKeyFilters.map { exp =>
         exp.transform {
           case attr: AttributeReference =>
-            CarbonToSparkAdapater.createAttributeReference(
+            AttributeReference(
               attr.name.toLowerCase,
               attr.dataType,
               attr.nullable,
-              attr.metadata,
-              attr.exprId,
-              attr.qualifier,
-              attr)
+              attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
         }
       }
       partitions =
@@ -228,7 +223,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       }
     }
 
-    val (unhandledPredicates, pushedFilters, handledFilters ) =
+    val (unhandledPredicates, pushedFilters) =
       selectFilters(relation.relation, candidatePredicates)
 
     // A set of column attributes that are only referenced by pushed down filters.  We can eliminate
@@ -313,7 +308,6 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         scanBuilder,
         candidatePredicates,
         pushedFilters,
-        handledFilters,
         metadata,
         needDecoder,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
@@ -350,7 +344,6 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         scanBuilder,
         candidatePredicates,
         pushedFilters,
-        handledFilters,
         metadata,
         needDecoder,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
@@ -365,10 +358,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       output: Seq[Attribute],
       partitions: Seq[PartitionSpec],
       scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
-        ArrayBuffer[AttributeReference], Seq[PartitionSpec])
-        => RDD[InternalRow],
+        ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow],
       candidatePredicates: Seq[Expression],
-      pushedFilters: Seq[Filter], handledFilters: Seq[Filter],
+      pushedFilters: Seq[Filter],
       metadata: Map[String, String],
       needDecoder: ArrayBuffer[AttributeReference],
       updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
@@ -387,16 +379,19 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         metadata,
         relation.catalogTable.map(_.identifier), relation)
     } else {
-      val partition = getPartitioning(table.carbonTable, updateRequestedColumns)
-      val rdd = scanBuilder(updateRequestedColumns, candidatePredicates,
-        pushedFilters, needDecoder, partitions)
-      CarbonReflectionUtils.getRowDataSourceScanExecObj(relation, output,
-        pushedFilters, handledFilters,
-        rdd, partition, metadata)
+      RowDataSourceScanExec(output,
+        scanBuilder(updateRequestedColumns,
+          candidatePredicates,
+          pushedFilters,
+          needDecoder,
+          partitions),
+        relation.relation,
+        getPartitioning(table.carbonTable, updateRequestedColumns),
+        metadata,
+        relation.catalogTable.map(_.identifier))
     }
   }
 
-
   def updateRequestedColumnsFunc(requestedColumns: Seq[Expression],
       relation: CarbonDatasourceHadoopRelation,
       needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = {
@@ -474,7 +469,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
 
   protected[sql] def selectFilters(
       relation: BaseRelation,
-      predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Seq[Filter]) = {
+      predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
 
     // In case of ComplexType dataTypes no filters should be pushed down. IsNotNull is being
     // explicitly added by spark and pushed. That also has to be handled and pushed back to
@@ -539,7 +534,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     // a filter to every row or not.
     val (_, translatedFilters) = translated.unzip
 
-    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters, handledFilters)
+    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 36da424..f5c5188 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.strategy
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command._
@@ -39,17 +38,9 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.util.CarbonProperties
 
-  /**
-   * Carbon strategies for ddl commands
-   * CreateDataSourceTableAsSelectCommand class has extra argument in
-   * 2.3, so need to add wrapper to match the case
-   */
-object MatchCreateDataSourceTable {
-  def unapply(plan: LogicalPlan): Option[(CatalogTable, SaveMode, LogicalPlan)] = plan match {
-    case t: CreateDataSourceTableAsSelectCommand => Some(t.table, t.mode, t.query)
-    case _ => None
-  }
-}
+/**
+ * Carbon strategies for ddl commands
+ */
 
 class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
   val LOGGER: LogService =
@@ -238,7 +229,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         val cmd =
           CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
-      case MatchCreateDataSourceTable(tableDesc, mode, query)
+      case cmd@CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
         if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
            && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
                || tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 24a2863..b2f4505 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCom
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
@@ -56,9 +56,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
 
       val projList = Seq(UnresolvedAlias(UnresolvedStar(alias.map(Seq(_)))), tupleId)
 
-      val tableRelation = if (SparkUtil.isSparkVersionEqualToX("2.1")) {
+      val tableRelation = if (SPARK_VERSION.startsWith("2.1")) {
         relation
-      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      } else if (SPARK_VERSION.startsWith("2.2")) {
         alias match {
           case Some(_) =>
             CarbonReflectionUtils.getSubqueryAlias(
@@ -171,9 +171,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
 
         val projList = Seq(UnresolvedAlias(UnresolvedStar(alias.map(Seq(_)))), tupleId)
         // include tuple id in subquery
-        if (SparkUtil.isSparkVersionEqualToX("2.1")) {
+        if (SPARK_VERSION.startsWith("2.1")) {
           Project(projList, relation)
-        } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+        } else if (SPARK_VERSION.startsWith("2.2")) {
           alias match {
             case Some(_) =>
               val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
@@ -244,13 +244,14 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
           case attr => attr
         }
       }
+      val version = SPARK_VERSION
       val newChild: LogicalPlan = if (newChildOutput == child.output) {
-        if (SparkUtil.isSparkVersionEqualToX("2.1")) {
+        if (version.startsWith("2.1")) {
           CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
-        } else if (SparkUtil.isSparkVersionEqualToX("2.2")) {
+        } else if (version.startsWith("2.2")) {
           CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
         } else {
-          throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
+          throw new UnsupportedOperationException(s"Spark version $version is not supported")
         }
       } else {
         Project(newChildOutput, child)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 2f32066..81a6bed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -23,6 +23,7 @@ import java.net.URI
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -30,8 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
@@ -42,8 +42,9 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
+import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.SchemaReader
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
@@ -70,13 +71,6 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-object MatchLogicalRelation {
-  def unapply(logicalPlan: LogicalPlan): Option[(BaseRelation, Any, Any)] = logicalPlan match {
-    case l: LogicalRelation => Some(l.relation, l.output, l.catalogTable)
-    case _ => None
-  }
-}
-
 class CarbonFileMetastore extends CarbonMetaStore {
 
   @transient
@@ -148,13 +142,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
       sparkSession.catalog.currentDatabase)
     val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
       case SubqueryAlias(_,
-      MatchLogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
         carbonDatasourceHadoopRelation.carbonRelation
-      case MatchLogicalRelation(
+      case LogicalRelation(
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDatasourceHadoopRelation.carbonRelation
       case SubqueryAlias(_, c)
-        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
+        if SPARK_VERSION.startsWith("2.2") &&
            (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
@@ -599,13 +593,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val relation: LogicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier)
     relation match {
       case SubqueryAlias(_,
-      MatchLogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+      LogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
         carbonDataSourceHadoopRelation
-      case MatchLogicalRelation(
+      case LogicalRelation(
       carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDataSourceHadoopRelation
       case SubqueryAlias(_, c)
-        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
+        if SPARK_VERSION.startsWith("2.2") &&
            (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName
               .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 76ff41a..c59246d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -273,14 +273,11 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         case attr: AttributeReference =>
           updatedExpression.find { p => p._1.sameRef(attr) } match {
             case Some((_, childAttr)) =>
-              CarbonToSparkAdapater.createAttributeReference(
+              AttributeReference(
                 childAttr.name,
                 childAttr.dataType,
                 childAttr.nullable,
-                childAttr.metadata,
-                childAttr.exprId,
-                attr.qualifier,
-                attr)
+                childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
             case None =>
               attr
           }
@@ -299,14 +296,11 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         case attr: AttributeReference =>
           updatedExpression.find { p => p._1.sameRef(attr) } match {
             case Some((_, childAttr)) =>
-              CarbonToSparkAdapater.createAttributeReference(
+              AttributeReference(
                 childAttr.name,
                 childAttr.dataType,
                 childAttr.nullable,
-                childAttr.metadata,
-                childAttr.exprId,
-                attr.qualifier,
-                attr)
+                childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
             case None =>
               attr
           }
@@ -783,36 +777,24 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         val factAlias = factPlanExpForStreaming(name)
         // create attribute reference object for each expression
         val attrs = factAlias.map { factAlias =>
-          CarbonToSparkAdapater.createAttributeReference(
+          AttributeReference(
             name,
             alias.dataType,
-            alias.nullable,
-            Metadata.empty,
-            factAlias.exprId,
-            alias.qualifier,
-            alias)
+            alias.nullable) (factAlias.exprId, alias.qualifier, alias.isGenerated)
         }
         // add aggregate function in Aggregate node added for handling streaming
         // to aggregate results from fact and aggregate table
         val updatedAggExp = getAggregateExpressionForAggregation(aggExp, attrs)
         // same reference id will be used as it can be used by above nodes in the plan like
         // sort, project, join
-        CarbonToSparkAdapater.createAliasRef(
+        Alias(
           updatedAggExp.head,
-          name,
-          alias.exprId,
-          alias.qualifier,
-          Option(alias.metadata),
-          Some(alias))
+          name)(alias.exprId, alias.qualifier, Option(alias.metadata), alias.isGenerated)
       case alias@Alias(expression, name) =>
-        CarbonToSparkAdapater.createAttributeReference(
+        AttributeReference(
           name,
           alias.dataType,
-          alias.nullable,
-          Metadata.empty,
-          alias.exprId,
-          alias.qualifier,
-          alias)
+          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
     }
     updatedExp
   }
@@ -915,10 +897,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           case attr: AttributeReference =>
             newAggExp += attr
           case exp: Expression =>
-            newAggExp += CarbonToSparkAdapater.createAliasRef(
+            newAggExp += Alias(
               exp,
-              "dummy_" + counter,
-              NamedExpression.newExprId)
+              "dummy_" + counter)(NamedExpression.newExprId, None, None, false)
             counter = counter + 1
         }
       }
@@ -942,12 +923,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         // get the new aggregate expression
         val newAggExp = getAggFunctionForFactStreaming(aggExp)
         val updatedExp = newAggExp.map { exp =>
-          CarbonToSparkAdapater.createAliasRef(exp,
-            name,
-            NamedExpression.newExprId,
-            alias.qualifier,
+          Alias(exp,
+              name)(
+              NamedExpression.newExprId,
+              alias.qualifier,
             Some(alias.metadata),
-            Some(alias))
+              alias.isGenerated)
         }
         // adding to map which will be used while Adding an Aggregate node for handling streaming
         // table plan change
@@ -955,13 +936,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         updatedExp
       case alias@Alias(exp: Expression, name) =>
         val newAlias = Seq(alias)
-        val attr = CarbonToSparkAdapater.createAttributeReference(name,
-          alias.dataType,
-          alias.nullable,
-          alias.metadata,
-          alias.exprId,
-          alias.qualifier,
-          alias)
+        val attr = AttributeReference(name,
+            alias.dataType,
+            alias.nullable,
+            alias.metadata) (alias.exprId, alias.qualifier, alias.isGenerated)
         factPlanGrpExpForStreaming.put(
           AggExpToColumnMappingModel(
             removeQualifiers(PreAggregateUtil.normalizeExprId(exp, plan.allAttributes))),
@@ -1115,14 +1093,11 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   private def removeQualifiers(expression: Expression) : Expression = {
     expression.transform {
       case attr: AttributeReference =>
-        CarbonToSparkAdapater.createAttributeReference(
+        AttributeReference(
           attr.name,
           attr.dataType,
           attr.nullable,
-          attr.metadata,
-          attr.exprId,
-          None,
-          attr)
+          attr.metadata)(attr.exprId, None, attr.isGenerated)
     }
   }
 
@@ -1388,13 +1363,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           attr,
           attributes)
         val newExpressionId = NamedExpression.newExprId
-        val childTableAttr = CarbonToSparkAdapater.createAttributeReference(attr.name,
+        val childTableAttr = AttributeReference(attr.name,
           childAttr.dataType,
           childAttr.nullable,
-          childAttr.metadata,
-          newExpressionId,
-          childAttr.qualifier,
-          attr)
+          childAttr.metadata)(newExpressionId, childAttr.qualifier, attr.isGenerated)
         updatedExpression.put(attr, childTableAttr)
         // returning the alias to show proper column name in output
         Seq(Alias(childAttr,
@@ -1406,20 +1378,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           attr,
           attributes)
         val newExpressionId = NamedExpression.newExprId
-        val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
+        val parentTableAttr = AttributeReference(name,
           alias.dataType,
-          alias.nullable,
-          Metadata.empty,
-          alias.exprId,
-          alias.qualifier,
-          alias)
-        val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
+          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
+        val childTableAttr = AttributeReference(name,
           alias.dataType,
-          alias.nullable,
-          Metadata.empty,
-          newExpressionId,
-          alias.qualifier,
-          alias)
+          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
         updatedExpression.put(parentTableAttr, childTableAttr)
         // returning alias with child attribute reference
         Seq(Alias(childAttr,
@@ -1445,21 +1409,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           val newExpressionId = NamedExpression.newExprId
           // create a parent attribute reference which will be replced on node which may be referred
           // by node like sort join
-          val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
+          val parentTableAttr = AttributeReference(name,
             alias.dataType,
-            alias.nullable,
-            Metadata.empty,
-            alias.exprId,
-            alias.qualifier,
-            alias)
+            alias.nullable)(alias.exprId, alias.qualifier, alias.isGenerated)
           // creating a child attribute reference which will be replced
-          val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
+          val childTableAttr = AttributeReference(name,
             alias.dataType,
-            alias.nullable,
-            Metadata.empty,
-            newExpressionId,
-            alias.qualifier,
-            alias)
+            alias.nullable)(newExpressionId, alias.qualifier, alias.isGenerated)
           // adding to map, will be used during other node updation like sort, join, project
           updatedExpression.put(parentTableAttr, childTableAttr)
           // returning alias with child attribute reference
@@ -1470,12 +1426,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           // for streaming table
           // create alias for aggregate table
           val aggExpForStreaming = aggExp.map{ exp =>
-            CarbonToSparkAdapater.createAliasRef(exp,
-              name,
+            Alias(exp,
+              name)(
               NamedExpression.newExprId,
               alias.qualifier,
               Some(alias.metadata),
-              Some(alias)).asInstanceOf[NamedExpression]
+              alias.isGenerated).asInstanceOf[NamedExpression]
           }
           aggExpForStreaming
         }
@@ -1504,20 +1460,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             }
           }
         val newExpressionId = NamedExpression.newExprId
-        val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
+        val parentTableAttr = AttributeReference(name,
           alias.dataType,
-          alias.nullable,
-          Metadata.empty,
-          alias.exprId,
-          alias.qualifier,
-          alias)
-        val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
+          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
+        val childTableAttr = AttributeReference(name,
           alias.dataType,
-          alias.nullable,
-          Metadata.empty,
-          newExpressionId,
-          alias.qualifier,
-          alias)
+          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
         updatedExpression.put(parentTableAttr, childTableAttr)
         Seq(Alias(updatedExp, name)(newExpressionId,
           alias.qualifier).asInstanceOf[NamedExpression])
@@ -1839,23 +1787,20 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
               // named expression list otherwise update the list and add it to set
               if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
                 namedExpressionList +=
-                CarbonToSparkAdapater.createAliasRef(expressions.head,
-                  name + "_ sum",
-                  NamedExpression.newExprId,
+                Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId,
                   alias.qualifier,
                   Some(alias.metadata),
-                  Some(alias))
+                  alias.isGenerated)
                 validExpressionsMap += AggExpToColumnMappingModel(sumExp)
               }
               // check with same expression already count is present then do not add to
               // named expression list otherwise update the list and add it to set
               if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
                 namedExpressionList +=
-                CarbonToSparkAdapater.createAliasRef(expressions.last, name + "_ count",
-                    NamedExpression.newExprId,
-                    alias.qualifier,
-                    Some(alias.metadata),
-                    Some(alias))
+                Alias(expressions.last, name + "_ count")(NamedExpression.newExprId,
+                  alias.qualifier,
+                  Some(alias.metadata),
+                  alias.isGenerated)
                 validExpressionsMap += AggExpToColumnMappingModel(countExp)
               }
             } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index a517bd4..c052cd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.CarbonEndsWith
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -50,7 +49,6 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 
-
 /**
  * All filter conversions are done here.
  */
@@ -296,27 +294,6 @@ object CarbonFilters {
     filters.flatMap(translate(_, false)).toArray
   }
 
-  /**
-   * This API checks whether StringTrim object is compatible with
-   * carbon,carbon only deals with the space any other symbol should
-   * be ignored.So condition is SPARK version < 2.3.
-   * If it is 2.3 then trimStr field should be empty
-   *
-   * @param stringTrim
-   * @return
-   */
-  def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
-    var isCompatible = true
-    if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
-        .asInstanceOf[Option[Expression]]
-      if (trimStr.isDefined) {
-        isCompatible = false
-      }
-    }
-    isCompatible
-  }
-
   def transformExpression(expr: Expression): CarbonExpression = {
     expr match {
       case Or(left, right)
@@ -404,8 +381,7 @@ object CarbonFilters {
           new CarbonLiteralExpression(maxValueLimit,
             CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
         new AndExpression(l, r)
-      case strTrim: StringTrim if isStringTrimCompatibleWithCarbon(strTrim) =>
-        transformExpression(strTrim)
+      case StringTrim(child) => transformExpression(child)
       case s: ScalaUDF =>
         new MatchExpression(s.children.head.toString())
       case _ =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 505ca1b..7ed1705 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -764,12 +764,8 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           p.transformAllExpressions {
             case a@Alias(exp, _)
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp),
-                a.name,
-                a.exprId,
-                a.qualifier,
-                a.explicitMetadata,
-                Some(a))
+              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
+                a.explicitMetadata, a.isGenerated)
             case exp: NamedExpression
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
               CustomDeterministicExpression(exp)
@@ -782,12 +778,8 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           f.transformAllExpressions {
             case a@Alias(exp, _)
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp),
-                a.name,
-                a.exprId,
-                a.qualifier,
-                a.explicitMetadata,
-                Some(a))
+              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
+                a.explicitMetadata, a.isGenerated)
             case exp: NamedExpression
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
               CustomDeterministicExpression(exp)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 11112ba..c9c61cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.{CarbonToSparkAdapater, DeleteRecords, UpdateTable}
+import org.apache.spark.sql.{CarbonEnv, DeleteRecords, UpdateTable}
 import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -490,7 +490,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         logicalPlan match {
           case _: CarbonCreateTableCommand =>
             ExplainCommand(logicalPlan, extended = isExtended.isDefined)
-          case _ => CarbonToSparkAdapater.getExplainCommandObj
+          case _ => ExplainCommand(OneRowRelation)
         }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
deleted file mode 100644
index f9922b3..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
-import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapater {
-
-  def addSparkListener(sparkContext: SparkContext) = {
-    sparkContext.addSparkListener(new SparkListener {
-      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-        SparkSession.setDefaultSession(null)
-        SparkSession.sqlListener.set(null)
-      }
-    })
-  }
-
-  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
-                               attrRef : NamedExpression): AttributeReference = {
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qualifier,attrRef.isGenerated)
-  }
-
-  def createAliasRef(child: Expression,
-                     name: String,
-                     exprId: ExprId = NamedExpression.newExprId,
-                     qualifier: Option[String] = None,
-                     explicitMetadata: Option[Metadata] = None,
-                     namedExpr: Option[NamedExpression] = None): Alias = {
-    val isGenerated:Boolean = if (namedExpr.isDefined) {
-      namedExpr.get.isGenerated
-    } else {
-      false
-    }
-    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
-  }
-
-  def getExplainCommandObj() : ExplainCommand = {
-    ExplainCommand(OneRowRelation)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CustomDeterministicExpression.scala
deleted file mode 100644
index 6312746..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.types.{DataType, StringType}
-
-/**
- * Custom expression to override the deterministic property .
- */
-case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
-  override def nullable: Boolean = true
-
-  override def eval(input: InternalRow): Any = null
-
-  override def dataType: DataType = StringType
-
-  override def children: Seq[Expression] = Seq()
-
-  override def deterministic: Boolean = true
-
-  def childexp : Expression = nonDt
-
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.1/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
deleted file mode 100644
index 3c3efaa..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.util.Utils
-
-/** Physical plan node for scanning data from a batched relation. */
-case class BatchedDataSourceScanExec(
-    output: Seq[Attribute],
-    rdd: RDD[InternalRow],
-    @transient relation: BaseRelation,
-    override val outputPartitioning: Partitioning,
-    override val metadata: Map[String, String],
-    override val metastoreTableIdentifier: Option[TableIdentifier],
-    @transient logicalRelation: LogicalRelation)
-  extends DataSourceScanExec with CodegenSupport {
-
-  override lazy val metrics =
-    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    // in the case of fallback, this batched scan should never fail because of:
-    // 1) only primitive types are supported
-    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
-    WholeStageCodegenExec(this).execute()
-  }
-
-  override def simpleString: String = {
-    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
-      key + ": " + StringUtils.abbreviate(value, 100)
-    }
-    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
-    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    rdd :: Nil
-  }
-
-  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
-      dataType: DataType, nullable: Boolean): ExprCode = {
-    val javaType = ctx.javaType(dataType)
-    val value = ctx.getValue(columnVar, dataType, ordinal)
-    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
-    val valueVar = ctx.freshName("value")
-    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
-    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
-      s"""
-        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
-        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
-      """
-    } else {
-      s"$javaType ${valueVar} = $value;"
-    }).trim
-    ExprCode(code, isNullVar, valueVar)
-  }
-
-  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
-  // never requires UnsafeRow as input.
-  override protected def doProduce(ctx: CodegenContext): String = {
-    val input = ctx.freshName("input")
-    // PhysicalRDD always just has one input
-    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
-
-    // metrics
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
-    val scanTimeMetric = metricTerm(ctx, "scanTime")
-    val scanTimeTotalNs = ctx.freshName("scanTime")
-    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
-
-    val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
-    val batch = ctx.freshName("batch")
-    ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
-
-    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
-    val idx = ctx.freshName("batchIdx")
-    ctx.addMutableState("int", idx, s"$idx = 0;")
-    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
-    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
-      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
-      s"$name = $batch.column($i);"
-    }
-
-    val nextBatch = ctx.freshName("nextBatch")
-    ctx.addNewFunction(nextBatch,
-      s"""
-         |private void $nextBatch() throws java.io.IOException {
-         |  long getBatchStart = System.nanoTime();
-         |  if ($input.hasNext()) {
-         |    $batch = ($columnarBatchClz)$input.next();
-         |    $numOutputRows.add($batch.numRows());
-         |    $idx = 0;
-         |    ${columnAssigns.mkString("", "\n", "\n")}
-         |  }
-         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
-         |}""".stripMargin)
-
-    ctx.currentVars = null
-    val rowidx = ctx.freshName("rowIdx")
-    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
-      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
-    }
-    s"""
-       |if ($batch == null) {
-       |  $nextBatch();
-       |}
-       |while ($batch != null) {
-       |  int numRows = $batch.numRows();
-       |  while ($idx < numRows) {
-       |    int $rowidx = $idx++;
-       |    ${consume(ctx, columnsBatchInput).trim}
-       |    if (shouldStop()) return;
-       |  }
-       |  $batch = null;
-       |  $nextBatch();
-       |}
-       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
-       |$scanTimeTotalNs = 0;
-     """.stripMargin
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index dd690e4..989b1d5 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -34,9 +34,11 @@ import org.apache.spark.sql.types.StructType
 
 /**
  * Create table 'using carbondata' and insert the query result into it.
+ *
  * @param table the Catalog Table
  * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
  * @param query the query whose result will be insert into the new relation
+ *
  */
 
 case class CreateCarbonSourceTableAsSelectCommand(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
deleted file mode 100644
index f9922b3..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
-import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapater {
-
-  def addSparkListener(sparkContext: SparkContext) = {
-    sparkContext.addSparkListener(new SparkListener {
-      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-        SparkSession.setDefaultSession(null)
-        SparkSession.sqlListener.set(null)
-      }
-    })
-  }
-
-  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
-                               attrRef : NamedExpression): AttributeReference = {
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qualifier,attrRef.isGenerated)
-  }
-
-  def createAliasRef(child: Expression,
-                     name: String,
-                     exprId: ExprId = NamedExpression.newExprId,
-                     qualifier: Option[String] = None,
-                     explicitMetadata: Option[Metadata] = None,
-                     namedExpr: Option[NamedExpression] = None): Alias = {
-    val isGenerated:Boolean = if (namedExpr.isDefined) {
-      namedExpr.get.isGenerated
-    } else {
-      false
-    }
-    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
-  }
-
-  def getExplainCommandObj() : ExplainCommand = {
-    ExplainCommand(OneRowRelation)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0aab4e7c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala
deleted file mode 100644
index 6312746..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.types.{DataType, StringType}
-
-/**
- * Custom expression to override the deterministic property .
- */
-case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
-  override def nullable: Boolean = true
-
-  override def eval(input: InternalRow): Any = null
-
-  override def dataType: DataType = StringType
-
-  override def children: Seq[Expression] = Seq()
-
-  override def deterministic: Boolean = true
-
-  def childexp : Expression = nonDt
-
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
-}


Mime
View raw message