carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3514] Support spark 2.4 integration
Date Wed, 22 Jan 2020 14:12:33 GMT
This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e4a49e9  [CARBONDATA-3514] Support spark 2.4 integration
e4a49e9 is described below

commit e4a49e984bf06cf7727299ed2fb7bceda134ae08
Author: Jacky Li <jacky.likun@qq.com>
AuthorDate: Wed Jan 8 23:26:25 2020 +0800

    [CARBONDATA-3514] Support spark 2.4 integration
    
    Why is this PR needed?
    CarbonData integration with Spark 2.4 is a long expected feature from community
    
    What changes were proposed in this PR?
    1. Support integration with Spark 2.4
    2. Removing support of Spark 2.1 and 2.2
    
    Does this PR introduce any user interface change?
    Yes. New API from Spark 2.4 can be used to access CarbonData
    
    Is any new testcase added?
    No
    
    This closes #3576
---
 README.md                                          |   4 +-
 build/README.md                                    |  10 +-
 .../table/DiskBasedDMSchemaStorageProvider.java    |  17 +-
 .../apache/carbondata/mv/datamap/MVHelper.scala    |  29 +-
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  |  18 +-
 .../carbondata/mv/rewrite/DefaultMatchMaker.scala  |  10 +-
 .../carbondata/mv/plans/modular/Harmonizer.scala   |  27 +-
 .../mv/plans/modular/ModularPatterns.scala         |   6 +-
 .../mv/plans/util/BirdcageOptimizer.scala          |   5 +-
 .../mv/plans/util/Logical2ModularExtractions.scala |  23 +-
 .../apache/carbondata/mv/plans/util/Printers.scala |   4 +-
 .../carbondata/mv/plans/util/SQLBuilder.scala      |  36 +-
 docs/alluxio-guide.md                              |   4 +-
 docs/faq.md                                        |   2 +-
 docs/presto-guide.md                               |   4 +-
 docs/streaming-guide.md                            |   6 +-
 integration/flink/pom.xml                          |  20 -
 .../apache/spark/sql/common/util/QueryTest.scala   |   9 +-
 .../sql/commands/UsingCarbondataSuite.scala        |   9 +-
 integration/spark-common/pom.xml                   |  99 ++++
 .../carbondata/spark/load/CsvRDDHelper.scala       |   7 +-
 .../apache/carbondata/spark/util/CommonUtil.scala  |   8 -
 .../org/apache/spark/sql/util/SparkSQLUtil.scala   | 159 +-----
 .../apache/spark/util/CarbonReflectionUtils.scala  | 198 ++-----
 .../spark/adapter/CarbonToSparkAdapter.scala}      |  18 +-
 .../spark/adapter/CarbonToSparkAdapter.scala}      |  33 +-
 integration/spark-datasource/pom.xml               | 104 ----
 .../apache/spark/sql/CarbonDictionaryWrapper.java  |   0
 .../org/apache/spark/sql/CarbonVectorProxy.java    |   0
 .../org/apache/spark/sql/ColumnVectorFactory.java  |   0
 .../org/apache/spark/sql/CarbonVectorProxy.java    | 586 ---------------------
 integration/spark2/pom.xml                         |  69 +--
 .../apache/spark/sql/CarbonBoundReference.scala    |  27 +-
 .../apache/spark/sql/CarbonDictionaryDecoder.scala |  44 +-
 .../org/apache/spark/sql/CarbonExtensions.scala    |  30 --
 .../scala/org/apache/spark/sql/CarbonSource.scala  | 285 +++++-----
 .../spark/sql/CustomDeterministicExpression.scala  |   2 +-
 .../command/datamap/CarbonDropDataMapCommand.scala |   1 +
 .../command/management/CarbonLoadDataCommand.scala |   6 +-
 .../management/RefreshCarbonTableCommand.scala     |   3 +-
 .../schema/CarbonAlterTableAddColumnCommand.scala  |   4 +-
 ...nAlterTableColRenameDataTypeChangeCommand.scala |   7 +-
 .../schema/CarbonAlterTableDropColumnCommand.scala |   5 +-
 .../table/CarbonCreateDataSourceTableCommand.scala |  48 +-
 .../command/table/CarbonCreateTableCommand.scala   |   3 +-
 .../command/table/CarbonDropTableCommand.scala     |   1 +
 .../strategy/CarbonLateDecodeStrategy.scala        |  12 +-
 .../spark/sql/execution/strategy/DDLStrategy.scala |  23 +-
 .../execution/strategy/MixedFormatHandler.scala    |   2 +-
 .../spark/sql/hive/CarbonAnalysisRules.scala       |  39 +-
 .../org/apache/spark/sql/hive/CarbonAnalyzer.scala |   2 +-
 .../spark/sql/hive/CarbonFileMetastore.scala       |  22 +-
 .../spark/sql/hive/CarbonOptimizerUtil.scala       |  19 +-
 .../org/apache/spark/sql/hive/CarbonRelation.scala |  10 +-
 .../apache/spark/sql/hive/CarbonSessionState.scala |  15 +-
 .../apache/spark/sql/hive/CarbonSessionUtil.scala  |  51 +-
 .../spark/sql/hive/CarbonSqlAstBuilder.scala       |   6 +-
 .../org/apache/spark/sql/hive/CarbonSqlConf.scala  |   0
 .../CreateCarbonSourceTableAsSelectCommand.scala   |   7 +-
 .../spark/sql/hive/SqlAstBuilderHelper.scala       |   0
 .../apache/spark/sql/optimizer/CarbonFilters.scala |  14 +-
 .../spark/sql/optimizer/CarbonLateDecodeRule.scala |   2 +-
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |   8 +-
 .../sql/parser/CarbonSparkSqlParserUtil.scala      |   2 +-
 .../apache/spark/sql/CarbonToSparkAdapater.scala   |  90 ----
 .../sql/catalyst/catalog/HiveTableRelation.scala   |  56 --
 .../sql/catalyst/optimizer/MigrateOptimizer.scala  | 129 -----
 .../org/apache/spark/sql/hive/CarbonSQLConf.scala  | 132 -----
 .../apache/spark/sql/hive/CarbonSessionState.scala | 379 -------------
 .../CreateCarbonSourceTableAsSelectCommand.scala   | 165 ------
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |  90 ----
 .../spark/sql/hive/CarbonSqlAstBuilder.scala       |  52 --
 .../apache/spark/sql/CarbonBoundReference.scala    |  19 +-
 .../apache/spark/sql/CarbonToSparkAdapater.scala   |  70 ++-
 .../apache/spark/sql/MixedFormatHandlerUtil.scala} |   1 +
 .../execution/strategy/CarbonDataSourceScan.scala  |   2 +-
 .../apache/spark/sql/CarbonBoundReference.scala    |  23 +-
 .../apache/spark/sql/CarbonToSparkAdapter.scala    | 174 ++++++
 .../apache/spark/sql/MixedFormatHandlerUtil.scala  |   7 +-
 .../execution/strategy/CarbonDataSourceScan.scala  |  14 +-
 .../apache/spark/sql/hive/CarbonOptimizer.scala    |   5 +-
 .../BloomCoarseGrainDataMapFunctionSuite.scala     |   1 +
 .../booleantype/BooleanDataTypesInsertTest.scala   |   3 +
 .../spark/util/AllDictionaryTestCase.scala         |   0
 .../spark/carbondata/CarbonDataSourceSuite.scala   |  20 +-
 .../bucketing/TableBucketingTestCase.scala         |   4 +-
 .../spark/sql/common/util/Spark2QueryTest.scala    |  12 +-
 .../org/apache/spark/util/CarbonCommandSuite.scala |   3 +-
 pom.xml                                            |  73 +--
 89 files changed, 948 insertions(+), 2800 deletions(-)

diff --git a/README.md b/README.md
index a34784d..d54b913 100644
--- a/README.md
+++ b/README.md
@@ -28,8 +28,8 @@ Visit count: [![HitCount](http://hits.dwyl.io/jackylk/apache/carbondata.svg)](ht
 
 
 ## Status
-Spark2.2:
-[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.2)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
+Spark2.3:
+[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.3)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.3/lastBuild/testReport)
 [![Coverage Status](https://coveralls.io/repos/github/apache/carbondata/badge.svg?branch=master)](https://coveralls.io/github/apache/carbondata?branch=master)
 <a href="https://scan.coverity.com/projects/carbondata">
   <img alt="Coverity Scan Build Status"
diff --git a/build/README.md b/build/README.md
index f361a6e..81f9133 100644
--- a/build/README.md
+++ b/build/README.md
@@ -21,15 +21,13 @@
 * Unix-like environment (Linux, Mac OS X)
 * Git
 * [Apache Maven (Recommend version 3.3 or later)](https://maven.apache.org/download.cgi)
-* [Oracle Java 7 or 8](http://www.oracle.com/technetwork/java/javase/downloads/index.html)
+* [Oracle Java 8](http://www.oracle.com/technetwork/java/javase/downloads/index.html)
 * [Apache Thrift 0.9.3](http://archive.apache.org/dist/thrift/0.9.3/)
 
 ## Build command
-Build with different supported versions of Spark, by default using Spark 2.2.1 to build
+Build with different supported versions of Spark, by default using Spark 2.4.4
 ```
-mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 clean package
-mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 clean package
-mvn -DskipTests -Pspark-2.3 -Dspark.version=2.3.2 clean package
+mvn -DskipTests -Pspark-2.4 clean package
 ```
 
 Note:
@@ -39,5 +37,5 @@ Note:
 ## For contributors : To build the format code after any changes, please follow the below command.
 Note:Need install Apache Thrift 0.9.3
 ```
-mvn clean -DskipTests -Pbuild-with-format -Pspark-2.2 package
+mvn clean -DskipTests -Pbuild-with-format -Pspark-2.4 package
 ```
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
index d0e5a42..c18298d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -27,11 +27,11 @@ import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -42,12 +42,15 @@ import com.google.gson.Gson;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
 
 /**
  * Stores datamap schema in disk as json format
  */
 public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStorageProvider {
 
+  private Logger LOG = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
+
   private String storePath;
 
   private String mdtFilePath;
@@ -171,17 +174,15 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro
     if (!FileFactory.isFileExist(schemaPath)) {
       throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
     }
-    Iterator<DataMapSchema> iterator = dataMapSchemas.iterator();
-    while (iterator.hasNext()) {
-      DataMapSchema schema = iterator.next();
-      if (schema.getDataMapName().equalsIgnoreCase(dataMapName)) {
-        iterator.remove();
-      }
-    }
+
+    LOG.info(String.format("Trying to delete DataMap %s schema", dataMapName));
+
+    dataMapSchemas.removeIf(schema -> schema.getDataMapName().equalsIgnoreCase(dataMapName));
     touchMDTFile();
     if (!FileFactory.deleteFile(schemaPath)) {
       throw new IOException("DataMap with name " + dataMapName + " cannot be deleted");
     }
+    LOG.info(String.format("DataMap %s schema is deleted", dataMapName));
   }
 
   private void checkAndReloadDataMapSchemas(boolean touchFile) throws IOException {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index adff89d..12cad37 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -372,7 +372,8 @@ object MVHelper {
 
   def updateColumnName(attr: Attribute, counter: Int): String = {
     val name = getUpdatedName(attr.name, counter)
-    attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
+    val value = attr.qualifier.map(qualifier => qualifier + "_" + name)
+    if (value.nonEmpty) value.head else name
   }
 
   def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
@@ -472,7 +473,7 @@ object MVHelper {
   }
 
   def createAttrReference(ref: NamedExpression, name: String): Alias = {
-    Alias(ref, name)(exprId = ref.exprId, qualifier = None)
+    CarbonToSparkAdapter.createAliasRef(ref, name, exprId = ref.exprId)
   }
 
   case class AttributeKey(exp: Expression) {
@@ -536,13 +537,13 @@ object MVHelper {
         case attr: AttributeReference =>
           val uattr = attrMap.get(AttributeKey(attr)).map{a =>
             if (keepAlias) {
-              CarbonToSparkAdapter.createAttributeReference(a.name,
-                a.dataType,
-                a.nullable,
-                a.metadata,
-                a.exprId,
-                attr.qualifier,
-                a)
+              CarbonToSparkAdapter.createAttributeReference(
+                name = a.name,
+                dataType = a.dataType,
+                nullable = a.nullable,
+                metadata = a.metadata,
+                exprId = a.exprId,
+                qualifier = attr.qualifier)
             } else {
               a
             }
@@ -574,9 +575,9 @@ object MVHelper {
     outputSel.zip(subsumerOutputList).map{ case (l, r) =>
       l match {
         case attr: AttributeReference =>
-          Alias(attr, r.name)(r.exprId, None)
+          CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
         case a@Alias(attr: AttributeReference, name) =>
-          Alias(attr, r.name)(r.exprId, None)
+          CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
         case other => other
       }
     }
@@ -593,13 +594,13 @@ object MVHelper {
           val uattr = attrMap.get(AttributeKey(attr)).map{a =>
             if (keepAlias) {
               CarbonToSparkAdapter
-                .createAttributeReference(a.name,
+                .createAttributeReference(
+                  a.name,
                   a.dataType,
                   a.nullable,
                   a.metadata,
                   a.exprId,
-                  attr.qualifier,
-                  a)
+                  attr.qualifier)
             } else {
               a
             }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 3ddb0fc..cff5c41 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -125,17 +125,18 @@ class MVUtil {
             arrayBuffer += relation
           }
           var qualifier: Option[String] = None
-          if (attr.qualifier.isDefined) {
-            qualifier = if (attr.qualifier.get.startsWith("gen_sub")) {
+          if (attr.qualifier.nonEmpty) {
+            qualifier = if (attr.qualifier.headOption.get.startsWith("gen_sub")) {
               Some(carbonTable.getTableName)
             } else {
-              attr.qualifier
+              attr.qualifier.headOption
             }
           }
           fieldToDataMapFieldMap +=
-          getFieldToDataMapFields(attr.name,
+          getFieldToDataMapFields(
+            attr.name,
             attr.dataType,
-            qualifier,
+            qualifier.headOption,
             "",
             arrayBuffer,
             carbonTable.getTableName)
@@ -248,7 +249,8 @@ class MVUtil {
   /**
    * Below method will be used to get the fields object for mv table
    */
-  private def getFieldToDataMapFields(name: String,
+  private def getFieldToDataMapFields(
+      name: String,
       dataType: DataType,
       qualifier: Option[String],
       aggregateType: String,
@@ -313,7 +315,7 @@ class MVUtil {
     val updatedOutList = outputList.map { col =>
       val duplicateColumn = duplicateNameCols
         .find(a => a.semanticEquals(col))
-      val qualifiedName = col.qualifier.getOrElse(s"${ col.exprId.id }") + "_" + col.name
+      val qualifiedName = col.qualifier.headOption.getOrElse(s"${ col.exprId.id }") + "_" + col.name
       if (duplicateColumn.isDefined) {
         val attributesOfDuplicateCol = duplicateColumn.get.collect {
           case a: AttributeReference => a
@@ -329,7 +331,7 @@ class MVUtil {
           attributeOfCol.exists(a => a.semanticEquals(expr)))
         if (!isStrictDuplicate) {
           Alias(col, qualifiedName)(exprId = col.exprId)
-        } else if (col.qualifier.isDefined) {
+        } else if (col.qualifier.nonEmpty) {
           Alias(col, qualifiedName)(exprId = col.exprId)
           // this check is added in scenario where the column is direct Attribute reference and
           // since duplicate columns select is allowed, we should just put alias for those columns
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 7e8eb96..6fbc87f 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -18,11 +18,12 @@
 
 package org.apache.carbondata.mv.rewrite
 
+import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, Metadata}
 
 import org.apache.carbondata.mv.datamap.MVHelper
 import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _}
@@ -95,9 +96,12 @@ abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] {
       // Replace all compensation1 attributes with refrences of subsumer attributeset
       val compensationFinal = compensation1.transformExpressions {
         case ref: Attribute if subqueryAttributeSet.contains(ref) =>
-          AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
+          CarbonToSparkAdapter.createAttributeReference(
+            ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
+            exprId = ref.exprId, qualifier = subsumerName)
         case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
-          Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
+          CarbonToSparkAdapter.createAliasRef(
+            alias.child, alias.name, alias.exprId, subsumerName)
       }
       compensationFinal
     } else {
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
index cb2043e..2b4247e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -17,10 +17,11 @@
 
 package org.apache.carbondata.mv.plans.modular
 
+import org.apache.spark.sql.{CarbonToSparkAdapter, SQLConf}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.types.Metadata
 
 import org.apache.carbondata.mv.plans
 import org.apache.carbondata.mv.plans._
@@ -198,18 +199,18 @@ object HarmonizeFactTable extends Rule[ModularPlan] with PredicateHelper with Ag
                                                               .isInstanceOf[Attribute]))
           val aggOutputList = aggTransMap.values.flatMap(t => t._2)
             .map { ref =>
-              AttributeReference(ref.name, ref.dataType)(
-                exprId = ref.exprId,
-                qualifier = Some(hFactName))
+              CarbonToSparkAdapter.createAttributeReference(
+                ref.name, ref.dataType, nullable = true, Metadata.empty,
+                ref.exprId, Some(hFactName))
             }
           val hFactOutputSet = hFact.outputSet
           // Update the outputlist qualifier
           val hOutputList = (attrOutputList ++ aggOutputList).map {attr =>
             attr.transform {
               case ref: Attribute if hFactOutputSet.contains(ref) =>
-                AttributeReference(ref.name, ref.dataType)(
-                  exprId = ref.exprId,
-                  qualifier = Some(hFactName))
+                CarbonToSparkAdapter.createAttributeReference(
+                  ref.name, ref.dataType, nullable = true, Metadata.empty,
+                  ref.exprId, Some(hFactName))
             }
           }.asInstanceOf[Seq[NamedExpression]]
 
@@ -217,9 +218,9 @@ object HarmonizeFactTable extends Rule[ModularPlan] with PredicateHelper with Ag
           val hPredList = s.predicateList.map{ pred =>
             pred.transform {
               case ref: Attribute if hFactOutputSet.contains(ref) =>
-                AttributeReference(ref.name, ref.dataType)(
-                  exprId = ref.exprId,
-                  qualifier = Some(hFactName))
+                CarbonToSparkAdapter.createAttributeReference(
+                  ref.name, ref.dataType, nullable = true, Metadata.empty,
+                  ref.exprId, Some(hFactName))
             }
           }
           val hSel = s.copy(
@@ -241,9 +242,9 @@ object HarmonizeFactTable extends Rule[ModularPlan] with PredicateHelper with Ag
           val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
           wip.transformExpressions {
             case ref: Attribute if hFactOutputSet.contains(ref) =>
-              AttributeReference(ref.name, ref.dataType)(
-                exprId = ref.exprId,
-                qualifier = Some(hFactName))
+              CarbonToSparkAdapter.createAttributeReference(
+                ref.name, ref.dataType, nullable = true, Metadata.empty,
+                ref.exprId, Some(hFactName))
           }
         }
       }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
index 30857c8..b694e78 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -52,9 +52,11 @@ object SimpleModularizer extends ModularPatterns {
       plan transform {
         case g@GroupBy(_, _, _, _, s@Select(_, _, _, aliasmap, _, children, _, _, _, _), _, _, _) =>
           val aq = AttributeSet(g.outputList).filter(_.qualifier.nonEmpty)
-          val makeupmap = children.zipWithIndex.flatMap {
+          val makeupmap: Map[Int, String] = children.zipWithIndex.flatMap {
             case (child, i) =>
-              aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+              aq.find(child.outputSet.contains(_))
+                .flatMap(_.qualifier.headOption)
+                .map((i, _))
           }.toMap
           g.copy(child = s.copy(aliasMap = makeupmap ++ aliasmap))
       }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
index 0bbacc4..7068b7e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -110,10 +110,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
         RewriteCorrelatedScalarSubquery,
         EliminateSerialization,
         SparkSQLUtil.getRemoveRedundantAliasesObj(),
-        RemoveRedundantProject,
-        SimplifyCreateStructOps,
-        SimplifyCreateArrayOps,
-        SimplifyCreateMapOps) ++
+        RemoveRedundantProject) ++
                                             extendedOperatorOptimizationRules: _*) ::
     Batch(
       "Check Cartesian Products", Once,
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
index 3b6c725..2033342 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
@@ -167,7 +167,9 @@ object ExtractSelectModule extends PredicateHelper {
     val aq = attributeSet.filter(_.qualifier.nonEmpty)
     children.zipWithIndex.flatMap {
       case (child, i) =>
-        aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+        aq.find(child.outputSet.contains(_))
+          .flatMap(_.qualifier.headOption)
+          .map((i, _))
     }.toMap
   }
 
@@ -353,28 +355,13 @@ object ExtractTableModule extends PredicateHelper {
           Seq.empty)
       case l: LogicalRelation =>
         val tableIdentifier = l.catalogTable.map(_.identifier)
-        val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
-        val table = tableIdentifier.map(_.table).getOrElse(null)
+        val database = tableIdentifier.flatMap(_.database).orNull
+        val table = tableIdentifier.map(_.table).orNull
         Some(database, table, l.output, Nil, NoFlags, Seq.empty)
       case l: LocalRelation => // used for unit test
         Some(null, null, l.output, Nil, NoFlags, Seq.empty)
       case _ =>
-        // this check is added as we get MetastoreRelation in spark2.1,
-        // this is removed in later spark version
-        // TODO: this check can be removed once 2.1 support is removed from carbon
-        if (SparkUtil.isSparkVersionEqualTo("2.1") &&
-            plan.getClass.getName.equals("org.apache.spark.sql.hive.MetastoreRelation")) {
-          val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("catalogTable", plan)
-            .asInstanceOf[CatalogTable]
-          Some(catalogTable.database,
-            catalogTable.identifier.table,
-            plan.output,
-            Nil,
-            NoFlags,
-            Seq.empty)
-        } else {
           None
-        }
     }
   }
 }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
index d3ce38d..366284b 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -204,7 +204,7 @@ trait Printers {
                          s.child match {
                            case a: Alias =>
                              val qualifierPrefix = a.qualifier
-                               .map(_ + ".").getOrElse("")
+                               .map(_ + ".").headOption.getOrElse("")
                              s"$qualifierPrefix${
                                quoteIdentifier(a
                                  .name)
@@ -221,7 +221,7 @@ trait Printers {
                        s.child match {
                          case a: Alias =>
                            val qualifierPrefix = a.qualifier.map(_ + ".")
-                             .getOrElse("")
+                             .headOption.getOrElse("")
                            s"$qualifierPrefix${ quoteIdentifier(a.name) }"
 
                          case other => other.sql
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
index b17eea2..c3a3a68 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
@@ -21,8 +21,11 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.immutable
 
+import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, NamedExpression}
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.types.Metadata
+import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.mv.expressions.modular._
 import org.apache.carbondata.mv.plans._
@@ -116,18 +119,19 @@ class SQLBuilder private(
                 if (i > -1) {
                   // this is a walk around for mystery of spark qualifier
                   if (aliasMap.nonEmpty && aliasMap(i).nonEmpty) {
-                    AttributeReference(
-                      ref.name,
-                      ref.dataType)(exprId = ref.exprId, qualifier = Option(aliasMap(i)))
+                    CarbonToSparkAdapter.createAttributeReference(
+                      ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
+                      exprId = ref.exprId, qualifier = Some(aliasMap(i)))
                   } else {
                     ref
                   }
                 } else {
                   attrMap.get(ref) match {
                     case Some(alias) =>
-                      AttributeReference(
+                      CarbonToSparkAdapter.createAttributeReference(
                         alias.child.asInstanceOf[AttributeReference].name,
-                        ref.dataType)(exprId = ref.exprId,
+                        ref.dataType, nullable = true, metadata = Metadata.empty,
+                        exprId = ref.exprId,
                         alias.child.asInstanceOf[AttributeReference].qualifier)
                     case None => ref
                   }
@@ -178,13 +182,12 @@ class SQLBuilder private(
                 list = list :+ ((index, subqueryName))
                 newS = newS.transformExpressions {
                   case ref: Attribute if (subqueryAttributeSet.contains(ref)) =>
-                    AttributeReference(ref.name, ref.dataType)(
-                      exprId = ref.exprId,
-                      qualifier = Some(subqueryName))
+                    CarbonToSparkAdapter.createAttributeReference(
+                      ref.name, ref.dataType, nullable = true, Metadata.empty,
+                      ref.exprId, Some(subqueryName))
                   case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
-                    Alias(alias.child, alias.name)(
-                      exprId = alias.exprId,
-                      qualifier = Some(subqueryName))
+                    CarbonToSparkAdapter.createAliasRef(
+                      alias.child, alias.name, alias.exprId, Some(subqueryName))
                 }
 
               case _ =>
@@ -212,13 +215,12 @@ class SQLBuilder private(
             }
             newG.transformExpressions {
               case ref: AttributeReference if (subqueryAttributeSet.contains(ref)) =>
-                AttributeReference(ref.name, ref.dataType)(
-                  exprId = ref.exprId,
-                  qualifier = Some(subqueryName))
+                CarbonToSparkAdapter.createAttributeReference(
+                  ref.name, ref.dataType, nullable = true, Metadata.empty,
+                  ref.exprId, Some(subqueryName))
               case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
-                Alias(alias.child, alias.name)(
-                  exprId = alias.exprId,
-                  qualifier = Some(subqueryName))
+                CarbonToSparkAdapter.createAliasRef(
+                  alias.child, alias.name, alias.exprId, Some(subqueryName))
             }.copy(alias = Some(subqueryName))
         }
       }
diff --git a/docs/alluxio-guide.md b/docs/alluxio-guide.md
index b1bfeeb..bad1fc0 100644
--- a/docs/alluxio-guide.md
+++ b/docs/alluxio-guide.md
@@ -50,7 +50,7 @@ This tutorial provides a brief introduction to using Alluxio.
 ### Running spark-shell
  - Running the command in spark path
  ```$command
-./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
+./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
 ```
  - Testing use alluxio by CarbonSession
  ```$scala
@@ -98,7 +98,7 @@ carbon.sql("select * from carbon_alluxio").show
 --master local \
 --jars ${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar,${CARBONDATA_PATH}/examples/spark2/target/carbondata-examples-1.6.0-SNAPSHOT.jar \
 --class org.apache.carbondata.examples.AlluxioExample \
-${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar \
+${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar \
 false
 ```
 **NOTE**: Please set runShell as false, which can avoid dependency on alluxio shell module.
diff --git a/docs/faq.md b/docs/faq.md
index 16cdfa5..88ca186 100644
--- a/docs/faq.md
+++ b/docs/faq.md
@@ -316,7 +316,7 @@ java.io.FileNotFoundException: hdfs:/localhost:9000/carbon/store/default/hdfstab
   2. Use the following command :
 
   ```
-  mvn -Pspark-2.1 -Dspark.version {yourSparkVersion} clean package
+  mvn -Pspark-2.4 -Dspark.version {yourSparkVersion} clean package
   ```
   
 Note : Refrain from using "mvn clean package" without specifying the profile.
diff --git a/docs/presto-guide.md b/docs/presto-guide.md
index 483585f..0c49f35 100644
--- a/docs/presto-guide.md
+++ b/docs/presto-guide.md
@@ -237,9 +237,9 @@ Now you can use the Presto CLI on the coordinator to query data sources in the c
   $ mvn -DskipTests -P{spark-version} -Dspark.version={spark-version-number} -Dhadoop.version={hadoop-version-number} clean package
   ```
   Replace the spark and hadoop version with the version used in your cluster.
-  For example, if you are using Spark 2.2.1 and Hadoop 2.7.2, you would like to compile using:
+  For example, if you are using Spark 2.4.4, you would like to compile using:
   ```
-  mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 -Dhadoop.version=2.7.2 clean package
+  mvn -DskipTests -Pspark-2.4 -Dspark.version=2.4.4 -Dhadoop.version=2.7.2 clean package
   ```
 
   Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index b66fce4..d007e03 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -37,11 +37,11 @@
     - [CLOSE STREAM](#close-stream)
 
 ## Quick example
-Download and unzip spark-2.2.0-bin-hadoop2.7.tgz, and export $SPARK_HOME
+Download and unzip spark-2.4.4-bin-hadoop2.7.tgz, and export $SPARK_HOME
 
-Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
+Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.6.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
 ```shell
-mvn clean package -DskipTests -Pspark-2.2
+mvn clean package -DskipTests -Pspark-2.4
 ```
 
 Start a socket data server in a terminal
diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml
index 73bf941..de3b5bc 100644
--- a/integration/flink/pom.xml
+++ b/integration/flink/pom.xml
@@ -184,26 +184,6 @@
 
     <profiles>
         <profile>
-            <id>spark-2.2</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.carbondata</groupId>
-                    <artifactId>carbondata-spark2</artifactId>
-                    <version>${project.version}</version>
-                    <scope>test</scope>
-                    <exclusions>
-                        <exclusion>
-                            <groupId>org.apache.hive</groupId>
-                            <artifactId>hive-exec</artifactId>
-                        </exclusion>
-                    </exclusions>
-                </dependency>
-            </dependencies>
-        </profile>
-        <profile>
             <id>spark-2.3</id>
             <activation>
                 <activeByDefault>true</activeByDefault>
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index eca20ed..ab267f5 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -24,20 +24,20 @@ import java.util.{Locale, Properties}
 import scala.collection.JavaConversions._
 import scala.util.{Failure, Success, Try}
 
-import com.facebook.presto.jdbc.{PrestoConnection, PrestoStatement}
+import com.facebook.presto.jdbc.PrestoStatement
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.command.LoadDataCommand
-import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor}
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{CarbonToSparkAdapter, DataFrame, Row, SQLContext}
 import org.scalatest.Suite
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.commons.lang.StringUtils
 
+
 class QueryTest extends PlanTest with Suite {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -141,8 +141,7 @@ class QueryTest extends PlanTest with Suite {
 
   val resourcesPath = TestQueryExecutor.resourcesPath
 
-  val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
-    .getClient();
+  val hiveClient = CarbonToSparkAdapter.getHiveExternalCatalog(sqlContext.sparkSession).client
 }
 
 object QueryTest {
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
index 8947f65..a4d78fa 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
@@ -42,8 +42,7 @@ class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach {
 
   test("CARBONDATA-2262: test check results of table with complex data type and bucketing") {
     sql("DROP TABLE IF EXISTS create_source")
-    sql("CREATE TABLE create_source(intField INT, stringField STRING, complexField ARRAY<INT>) " +
-      "USING carbondata")
+    sql("CREATE TABLE create_source(intField INT, stringField STRING, complexField ARRAY<INT>) USING carbondata")
     sql("INSERT INTO create_source VALUES(1,'source',array(1,2,3))")
     checkAnswer(sql("SELECT * FROM create_source"), Row(1, "source", mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3)))
     sql("DROP TABLE IF EXISTS create_source")
@@ -81,13 +80,13 @@ class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach {
     checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source"))
   }
 
-  test("CARBONDATA-2396 Support Create Table As Select with 'USING org.apache.spark.sql.CarbonSource'") {
+  test("CARBONDATA-2396 Support Create Table As Select with 'USING carbondata'") {
     sql("DROP TABLE IF EXISTS src_carbondata3")
     sql("DROP TABLE IF EXISTS src_carbondata4")
-    sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING org.apache.spark.sql.CarbonSource")
+    sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING carbondata")
     sql("INSERT INTO src_carbondata3 VALUES(1,'source')")
     checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source"))
-    sql("CREATE TABLE src_carbondata4 USING org.apache.spark.sql.CarbonSource as select * from src_carbondata3")
+    sql("CREATE TABLE src_carbondata4 USING carbondata as select * from src_carbondata3")
     checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source"))
     sql("DROP TABLE IF EXISTS src_carbondata3")
     sql("DROP TABLE IF EXISTS src_carbondata4")
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 199ff84..0790763 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -191,4 +191,103 @@
     </plugins>
   </build>
 
+  <profiles>
+    <profile>
+      <id>build-all</id>
+      <properties>
+        <spark.version>2.3.4</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+    </profile>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+    <profile>
+      <id>spark-2.3</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <spark.version>2.3.4</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.4</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.3</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.4</id>
+      <properties>
+        <spark.version>2.4.4</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.3</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.4</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index f629260..2548110 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -37,11 +37,12 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.adapter.CarbonToSparkAdapter
 import org.apache.carbondata.spark.util.CommonUtil
 
 object CsvRDDHelper {
@@ -93,9 +94,9 @@ object CsvRDDHelper {
     def closePartition(): Unit = {
       if (currentFiles.nonEmpty) {
         val newPartition =
-          FilePartition(
+          CarbonToSparkAdapter.createFilePartition(
             partitions.size,
-            currentFiles.toArray.toSeq)
+            currentFiles)
         partitions += newPartition
       }
       currentFiles.clear()
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index e70fc24..865d586 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -834,12 +834,4 @@ object CommonUtil {
     displaySize
   }
 
-  def isCarbonDataSource(catalogTable: CatalogTable): Boolean = {
-    catalogTable.provider match {
-      case Some(x) => x.equalsIgnoreCase("org.apache.spark.sql.CarbonSource") ||
-                      x.equalsIgnoreCase("carbondata")
-      case None => false
-    }
-  }
-
 }
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 71577a9..435cb58 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -17,23 +17,22 @@
 
 package org.apache.spark.sql.util
 
-import java.lang.reflect.Method
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EmptyRule
+import org.apache.spark.sql.catalyst.analysis.EliminateView
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.optimizer.{CheckCartesianProducts, EliminateOuterJoin, NullPropagation, PullupCorrelatedPredicates, RemoveRedundantAliases, ReorderJoin}
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
+import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 
@@ -53,166 +52,60 @@ object SparkSQLUtil {
   }
 
   def invokeStatsMethod(logicalPlanObj: LogicalPlan, conf: SQLConf): Statistics = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val method: Method = logicalPlanObj.getClass.getMethod("stats", classOf[SQLConf])
-      method.invoke(logicalPlanObj, conf).asInstanceOf[Statistics]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val method: Method = logicalPlanObj.getClass.getMethod("stats")
-      method.invoke(logicalPlanObj).asInstanceOf[Statistics]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    logicalPlanObj.stats
   }
 
   def invokeQueryPlannormalizeExprId(r: NamedExpression, input: AttributeSeq)
       : NamedExpression = {
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.QueryPlan")
-      clazz.getDeclaredMethod("normalizeExprId", classOf[Any], classOf[AttributeSeq]).
-        invoke(null, r, input).asInstanceOf[NamedExpression]
-    } else {
-      r
-    }
+    QueryPlan.normalizeExprId(r, input)
   }
 
   def getStatisticsObj(outputList: Seq[NamedExpression],
                        plan: LogicalPlan, stats: Statistics,
                        aliasMap: Option[AttributeMap[Attribute]] = None)
   : Statistics = {
-    val className = "org.apache.spark.sql.catalyst.plans.logical.Statistics"
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val output = outputList.map(_.toAttribute)
-      val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
-        table => AttributeMap(table.output.zip(output))
-      }
-      val rewrites = mapSeq.head
-      val attributes : AttributeMap[ColumnStat] = CarbonReflectionUtils.
-        getField("attributeStats", stats).asInstanceOf[AttributeMap[ColumnStat]]
-      var attributeStats = AttributeMap(attributes.iterator
-        .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
-      if (aliasMap.isDefined) {
-        attributeStats = AttributeMap(
-          attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
-      }
-      val hints = CarbonReflectionUtils.getField("hints", stats).asInstanceOf[Object]
-      CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
-        stats.rowCount, attributeStats, hints).asInstanceOf[Statistics]
-    } else {
-      val output = outputList.map(_.name)
-      val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
-        table => table.output.map(_.name).zip(output).toMap
-      }
-      val rewrites = mapSeq.head
-      val colStats = CarbonReflectionUtils.getField("colStats", stats)
-        .asInstanceOf[Map[String, ColumnStat]]
-      var attributeStats = colStats.iterator
-        .map { pair => (rewrites(pair._1), pair._2) }.toMap
-      if (aliasMap.isDefined) {
-        val aliasMapName = aliasMap.get.map(x => (x._1.name, x._2.name))
-        attributeStats =
-          attributeStats.map(pair => (aliasMapName.getOrElse(pair._1, pair._1)
-            , pair._2))
-      }
-      CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
-        stats.rowCount, attributeStats).asInstanceOf[Statistics]
+    val output = outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq.head
+    val attributes : AttributeMap[ColumnStat] = stats.attributeStats
+    var attributeStats = AttributeMap(attributes.iterator
+      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+    if (aliasMap.isDefined) {
+      attributeStats = AttributeMap(
+        attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
     }
+    val hints = stats.hints
+    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, hints)
   }
 
   def getEliminateViewObj(): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.analysis.EliminateView"
-      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      EmptyRule
-    }
+    EliminateView
   }
 
   def getPullupCorrelatedPredicatesObj(): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates"
-      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      EmptyRule
-    }
+    PullupCorrelatedPredicates
   }
 
   def getRemoveRedundantAliasesObj(): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases"
-      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      EmptyRule
-    }
+    RemoveRedundantAliases
   }
 
   def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    }
-    else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    ReorderJoin
   }
 
   def getEliminateOuterJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    }
-    else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    EliminateOuterJoin
   }
 
   def getNullPropagationObj(conf: SQLConf): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    NullPropagation
   }
 
   def getCheckCartesianProductsObj(conf: SQLConf): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    }
-    else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    CheckCartesianProducts
   }
 
   /**
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index b35bc4d..9c3e8e1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -17,24 +17,22 @@
 
 package org.apache.spark.util
 
-import java.lang.reflect.Method
-
 import scala.reflect.runtime._
 import scala.reflect.runtime.universe._
 
-import org.apache.spark.{SPARK_VERSION, SparkContext}
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.parser.AstBuilder
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand}
 import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -60,45 +58,19 @@ object CarbonReflectionUtils {
   def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
     val im = rm.reflect(obj)
     im.symbol.typeSignature.members.find(_.name.toString.equals(name))
-      .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
+      .map(l => im.reflectField(l.asTerm).get).orNull
   }
 
   def getUnresolvedRelation(
       tableIdentifier: TableIdentifier,
       tableAlias: Option[String] = None): UnresolvedRelation = {
-    val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      createObject(
-        className,
-        tableIdentifier,
-        tableAlias)._1.asInstanceOf[UnresolvedRelation]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      createObject(
-        className,
-        tableIdentifier)._1.asInstanceOf[UnresolvedRelation]
-    } else {
-      throw new UnsupportedOperationException(s"Unsupported Spark version $SPARK_VERSION")
-    }
+    UnresolvedRelation(tableIdentifier)
   }
 
   def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
       relation: LogicalPlan,
       view: Option[TableIdentifier]): SubqueryAlias = {
-    val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias"
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      createObject(
-        className,
-        alias.getOrElse(""),
-        relation,
-        Option(view))._1.asInstanceOf[SubqueryAlias]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      createObject(
-        className,
-        alias.getOrElse(""),
-        relation)._1.asInstanceOf[SubqueryAlias]
-    } else {
-      throw new UnsupportedOperationException("Unsupported Spark version")
-    }
+    SubqueryAlias(alias.getOrElse(""), relation)
   }
 
   def getInsertIntoCommand(table: LogicalPlan,
@@ -106,58 +78,23 @@ object CarbonReflectionUtils {
       query: LogicalPlan,
       overwrite: Boolean,
       ifPartitionNotExists: Boolean): InsertIntoTable = {
-    val className = "org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable"
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val overwriteOptions = createObject(
-        "org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions",
-        overwrite.asInstanceOf[Object], Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object]
-      createObject(
-        className,
-        table,
-        partition,
-        query,
-        overwriteOptions,
-        ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2") ) {
-      createObject(
-        className,
-        table,
-        partition,
-        query,
-        overwrite.asInstanceOf[Object],
-        ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
-    } else {
-      throw new UnsupportedOperationException("Unsupported Spark version")
-    }
+    InsertIntoTable(
+      table,
+      partition,
+      query,
+      overwrite,
+      ifPartitionNotExists)
   }
 
   def getLogicalRelation(relation: BaseRelation,
       expectedOutputAttributes: Seq[Attribute],
       catalogTable: Option[CatalogTable],
       isStreaming: Boolean): LogicalRelation = {
-    val className = "org.apache.spark.sql.execution.datasources.LogicalRelation"
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      createObject(
-        className,
-        relation,
-        Some(expectedOutputAttributes),
-        catalogTable)._1.asInstanceOf[LogicalRelation]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      createObject(
-        className,
-        relation,
-        expectedOutputAttributes,
-        catalogTable)._1.asInstanceOf[LogicalRelation]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
-      createObject(
-        className,
-        relation,
-        expectedOutputAttributes,
-        catalogTable,
-        isStreaming.asInstanceOf[Object])._1.asInstanceOf[LogicalRelation]
-    } else {
-      throw new UnsupportedOperationException("Unsupported Spark version")
-    }
+    new LogicalRelation(
+      relation,
+      expectedOutputAttributes.asInstanceOf[Seq[AttributeReference]],
+      catalogTable,
+      isStreaming)
   }
 
 
@@ -208,46 +145,28 @@ object CarbonReflectionUtils {
   def getSessionState(sparkContext: SparkContext,
                       carbonSession: Object,
                       useHiveMetaStore: Boolean): Any = {
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+    if (useHiveMetaStore) {
       val className = sparkContext.conf.get(
         CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
-        "org.apache.spark.sql.hive.CarbonSessionState")
-      createObject(className, carbonSession)._1
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      if (useHiveMetaStore) {
-        val className = sparkContext.conf.get(
-          CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
-          "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
-        val tuple = createObject(className, carbonSession, None)
-        val method = tuple._2.getMethod("build")
-        method.invoke(tuple._1)
-      } else {
-        val className = sparkContext.conf.get(
-          CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
-          "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
-        val tuple = createObject(className, carbonSession, None)
-        val method = tuple._2.getMethod("build")
-        method.invoke(tuple._1)
-      }
+        "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
+      val tuple = createObject(className, carbonSession, None)
+      val method = tuple._2.getMethod("build")
+      method.invoke(tuple._1)
     } else {
-      throw new UnsupportedOperationException("Spark version not supported")
+      val className = sparkContext.conf.get(
+        CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+        "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
+      val tuple = createObject(className, carbonSession, None)
+      val method = tuple._2.getMethod("build")
+      method.invoke(tuple._1)
     }
   }
 
   def hasPredicateSubquery(filterExp: Expression) : Boolean = {
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.PredicateSubquery")
-      val method = tuple.getMethod("hasPredicateSubquery", classOf[Expression])
-      val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
-      hasSubquery
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
-      val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
-      val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
-      hasSubquery
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
+    val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
+    val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
+    hasSubquery
   }
 
   def getDescribeTableFormattedField[T: TypeTag : reflect.ClassTag](obj: T): Boolean = {
@@ -265,19 +184,10 @@ object CarbonReflectionUtils {
       rdd: RDD[InternalRow],
       partition: Partitioning,
       metadata: Map[String, String]): RowDataSourceScanExec = {
-    val className = "org.apache.spark.sql.execution.RowDataSourceScanExec"
-    if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
-      createObject(className, output, rdd, relation.relation,
-        partition, metadata,
-        relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      createObject(className, output, output.map(output.indexOf),
-        pushedFilters.toSet, handledFilters.toSet, rdd,
-        relation.relation,
-        relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    RowDataSourceScanExec(output, output.map(output.indexOf),
+      pushedFilters.toSet, handledFilters.toSet, rdd,
+      relation.relation,
+      relation.catalogTable.map(_.identifier))
   }
 
   def invokewriteAndReadMethod(dataSourceObj: DataSource,
@@ -287,25 +197,7 @@ object CarbonReflectionUtils {
       mode: SaveMode,
       query: LogicalPlan,
       physicalPlan: SparkPlan): BaseRelation = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val method: Method = dataSourceObj.getClass
-        .getMethod("writeAndRead", classOf[SaveMode], classOf[DataFrame])
-      method.invoke(dataSourceObj, mode, dataFrame)
-        .asInstanceOf[BaseRelation]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
-      val method: Method = dataSourceObj.getClass
-        .getMethod("writeAndRead",
-          classOf[SaveMode],
-          classOf[LogicalPlan],
-          classOf[Seq[String]],
-          classOf[SparkPlan])
-      // since spark 2.3.2 version (SPARK-PR#22346),
-      // change 'query.output' to 'query.output.map(_.name)'
-      method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan)
-        .asInstanceOf[BaseRelation]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    dataSourceObj.writeAndRead(mode, query, query.output.map(_.name), physicalPlan)
   }
 
   /**
@@ -316,9 +208,7 @@ object CarbonReflectionUtils {
    */
   def invokeAlterTableAddColumn(table: TableIdentifier,
       colsToAdd: Seq[StructField]): Object = {
-    val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand"
-    CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd)
-      ._1.asInstanceOf[RunnableCommand]
+    AlterTableAddColumnsCommand(table, colsToAdd)
   }
 
   def createSingleObject(className: String): Any = {
@@ -395,16 +285,6 @@ object CarbonReflectionUtils {
 
   def invokeAnalyzerExecute(analyzer: Analyzer,
       plan: LogicalPlan): LogicalPlan = {
-    if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val method: Method = analyzer.getClass
-        .getMethod("execute", classOf[LogicalPlan])
-      method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
-      val method: Method = analyzer.getClass
-        .getMethod("executeAndCheck", classOf[LogicalPlan])
-      method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    analyzer.executeAndCheck(plan)
   }
 }
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala b/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
similarity index 65%
rename from integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
rename to integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
index 4abf189..a2ca9e6 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
+++ b/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.carbondata.spark.adapter
 
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import scala.collection.mutable.ArrayBuffer
 
-/**
-  * This node is inserted at the top of a subquery when it is optimized. This makes sure we can
-  * recognize a subquery as such, and it allows us to write subquery aware transformations.
-  */
-case class Subquery(child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
\ No newline at end of file
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+
+object CarbonToSparkAdapter {
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
+    FilePartition(index, files.toArray.toSeq)
+  }
+}
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
similarity index 51%
rename from integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
rename to integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
index 605df66..dc5c14b 100644
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
+++ b/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
@@ -15,33 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql;
+package org.apache.carbondata.spark.adapter
 
-import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import scala.collection.mutable.ArrayBuffer
 
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.io.api.Binary;
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.types.{DataType, Metadata}
 
-public class CarbonDictionaryWrapper extends Dictionary {
-
-  private Binary[] binaries;
-
-  CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
-    super(encoding);
-    binaries = new Binary[dictionary.getDictionarySize()];
-    for (int i = 0; i < binaries.length; i++) {
-      binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
-    }
-  }
-
-  @Override
-  public int getMaxId() {
-    return binaries.length - 1;
-  }
-
-  @Override
-  public Binary decodeToBinary(int id) {
-    return binaries[id];
+object CarbonToSparkAdapter {
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]): FilePartition = {
+    FilePartition(index, files.toArray)
   }
 }
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
index cda1954..1f1cac3 100644
--- a/integration/spark-datasource/pom.xml
+++ b/integration/spark-datasource/pom.xml
@@ -192,86 +192,6 @@
       </properties>
     </profile>
     <profile>
-      <id>spark-2.1</id>
-      <properties>
-        <spark.version>2.1.0</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.3plus</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.1andspark2.2</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
-      <id>spark-2.2</id>
-      <properties>
-        <spark.version>2.2.1</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.3plus</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.1andspark2.2</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
       <id>spark-2.3</id>
       <activation>
         <activeByDefault>true</activeByDefault>
@@ -286,30 +206,6 @@
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.1andspark2.2</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.3plus</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
           </plugin>
         </plugins>
       </build>
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java
similarity index 100%
rename from integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
rename to integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java
similarity index 100%
rename from integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
rename to integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java
similarity index 100%
rename from integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
rename to integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 7d23d7c..0000000
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql;
-
-import java.lang.reflect.Field;
-import java.math.BigInteger;
-
-import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
-import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
-
-import org.apache.parquet.column.Encoding;
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
-  private ColumnarBatch columnarBatch;
-  private ColumnVectorProxy[] columnVectorProxies;
-
-
-  private void updateColumnVectors() {
-    try {
-      Field field = columnarBatch.getClass().getDeclaredField("columns");
-      field.setAccessible(true);
-      field.set(columnarBatch, columnVectorProxies);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Adapter class which handles the columnar vector reading of the carbondata
-   * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-   * handles the complexity of spark 2.3 version related api changes since
-   * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-   *
-   * @param memMode       which represent the type onheap or offheap vector.
-   * @param outputSchema, metadata related to current schema of table.
-   * @param rowNum        rows number for vector reading
-   * @param useLazyLoad   Whether to use lazy load while getting the data.
-   */
-  public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum,
-      boolean useLazyLoad) {
-    columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
-    columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
-    for (int i = 0; i < columnVectorProxies.length; i++) {
-      if (useLazyLoad) {
-        columnVectorProxies[i] =
-            new ColumnVectorProxyWithLazyLoad(columnarBatch.column(i), rowNum, memMode);
-      } else {
-        columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
-      }
-    }
-    updateColumnVectors();
-  }
-
-  public ColumnVectorProxy getColumnVector(int ordinal) {
-    return columnVectorProxies[ordinal];
-  }
-
-  /**
-   * Returns the number of rows for read, including filtered rows.
-   */
-  public int numRows() {
-    return columnarBatch.capacity();
-  }
-
-  /**
-   * This API will return a columnvector from a batch of column vector rows
-   * based on the ordinal
-   *
-   * @param ordinal
-   * @return
-   */
-  public ColumnVector column(int ordinal) {
-    return columnarBatch.column(ordinal);
-  }
-
-  /**
-   * Resets this column for writing. The currently stored values are no longer accessible.
-   */
-  public void reset() {
-    for (int i = 0; i < columnarBatch.numCols(); i++) {
-      ((ColumnVectorProxy) columnarBatch.column(i)).reset();
-    }
-  }
-
-  public void resetDictionaryIds(int ordinal) {
-    (((ColumnVectorProxy) columnarBatch.column(ordinal)).getVector()).getDictionaryIds().reset();
-  }
-
-  /**
-   * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-   */
-  public InternalRow getRow(int rowId) {
-    return columnarBatch.getRow(rowId);
-  }
-
-  /**
-   * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-   */
-  public Object getColumnarBatch() {
-    return columnarBatch;
-  }
-
-  /**
-   * Called to close all the columns in this batch. It is not valid to access the data after
-   * calling this. This must be called at the end to clean up memory allocations.
-   */
-  public void close() {
-    columnarBatch.close();
-  }
-
-  /**
-   * Sets the number of rows in this batch.
-   */
-  public void setNumRows(int numRows) {
-    columnarBatch.setNumRows(numRows);
-  }
-
-  public DataType dataType(int ordinal) {
-    return columnarBatch.column(ordinal).dataType();
-  }
-
-  public static class ColumnVectorProxy extends ColumnVector {
-
-    private ColumnVector vector;
-
-    public ColumnVectorProxy(ColumnVector columnVector, int capacity, MemoryMode mode) {
-      super(capacity, columnVector.dataType(), mode);
-      try {
-        Field childColumns =
-            columnVector.getClass().getSuperclass().getDeclaredField("childColumns");
-        childColumns.setAccessible(true);
-        Object o = childColumns.get(columnVector);
-        childColumns.set(this, o);
-        Field resultArray =
-            columnVector.getClass().getSuperclass().getDeclaredField("resultArray");
-        resultArray.setAccessible(true);
-        Object o1 = resultArray.get(columnVector);
-        resultArray.set(this, o1);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-      vector = columnVector;
-    }
-
-    public void putRowToColumnBatch(int rowId, Object value) {
-      org.apache.spark.sql.types.DataType t = dataType();
-      if (null == value) {
-        putNull(rowId);
-      } else {
-        if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-          putBoolean(rowId, (boolean) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-          putByte(rowId, (byte) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-          putShort(rowId, (short) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-          putInt(rowId, (int) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-          putLong(rowId, (long) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-          putFloat(rowId, (float) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-          putDouble(rowId, (double) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-          UTF8String v = (UTF8String) value;
-          putByteArray(rowId, v.getBytes());
-        } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
-          DecimalType dt = (DecimalType) t;
-          Decimal d = Decimal.fromDecimal(value);
-          if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-            putInt(rowId, (int) d.toUnscaledLong());
-          } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-            putLong(rowId, d.toUnscaledLong());
-          } else {
-            final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-            byte[] bytes = integer.toByteArray();
-            putByteArray(rowId, bytes, 0, bytes.length);
-          }
-        } else if (t instanceof CalendarIntervalType) {
-          CalendarInterval c = (CalendarInterval) value;
-          vector.getChildColumn(0).putInt(rowId, c.months);
-          vector.getChildColumn(1).putLong(rowId, c.microseconds);
-        } else if (t instanceof org.apache.spark.sql.types.DateType) {
-          putInt(rowId, (int) value);
-        } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-          putLong(rowId, (long) value);
-        }
-      }
-    }
-
-    public void putBoolean(int rowId, boolean value) {
-      vector.putBoolean(rowId, value);
-    }
-
-    public void putByte(int rowId, byte value) {
-      vector.putByte(rowId, value);
-    }
-
-    public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putBytes(rowId, count, src, srcIndex);
-    }
-
-    public void putShort(int rowId, short value) {
-      vector.putShort(rowId, value);
-    }
-
-    public void putInt(int rowId, int value) {
-      vector.putInt(rowId, value);
-    }
-
-    public void putFloat(int rowId, float value) {
-      vector.putFloat(rowId, value);
-    }
-
-    public void putFloats(int rowId, int count, float[] src, int srcIndex) {
-      vector.putFloats(rowId, count, src, srcIndex);
-    }
-
-    public void putLong(int rowId, long value) {
-      vector.putLong(rowId, value);
-    }
-
-    public void putDouble(int rowId, double value) {
-      vector.putDouble(rowId, value);
-    }
-
-    public void putInts(int rowId, int count, int value) {
-      vector.putInts(rowId, count, value);
-    }
-
-    public void putInts(int rowId, int count, int[] src, int srcIndex) {
-      vector.putInts(rowId, count, src, srcIndex);
-    }
-
-    public void putShorts(int rowId, int count, short value) {
-      vector.putShorts(rowId, count, value);
-    }
-
-    public void putShorts(int rowId, int count, short[] src, int srcIndex) {
-      vector.putShorts(rowId, count, src, srcIndex);
-    }
-
-    public void putLongs(int rowId, int count, long value) {
-      vector.putLongs(rowId, count, value);
-    }
-
-    public void putLongs(int rowId, int count, long[] src, int srcIndex) {
-      vector.putLongs(rowId, count, src, srcIndex);
-    }
-
-    public void putDoubles(int rowId, int count, double value) {
-      vector.putDoubles(rowId, count, value);
-    }
-
-    public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
-      vector.putDoubles(rowId, count, src, srcIndex);
-    }
-
-    public DataType dataType(int ordinal) {
-      return vector.dataType();
-    }
-
-    public void putNotNull(int rowId) {
-      vector.putNotNull(rowId);
-    }
-
-    public void putNotNulls(int rowId, int count) {
-      vector.putNotNulls(rowId, count);
-    }
-
-    public void putDictionaryInt(int rowId, int value) {
-      vector.getDictionaryIds().putInt(rowId, value);
-    }
-
-    public void setDictionary(CarbonDictionary dictionary) {
-      if (null != dictionary) {
-        CarbonDictionaryWrapper dictionaryWrapper =
-            new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary);
-        vector.setDictionary(dictionaryWrapper);
-        this.dictionary = dictionaryWrapper;
-      } else {
-        this.dictionary = null;
-        vector.setDictionary(null);
-      }
-    }
-
-    public void putNull(int rowId) {
-      vector.putNull(rowId);
-    }
-
-    public void putNulls(int rowId, int count) {
-      vector.putNulls(rowId, count);
-    }
-
-    public boolean hasDictionary() {
-      return vector.hasDictionary();
-    }
-
-    public ColumnVector reserveDictionaryIds(int capacity) {
-      this.dictionaryIds = vector.reserveDictionaryIds(capacity);
-      return dictionaryIds;
-    }
-
-    @Override
-    public boolean isNullAt(int i) {
-      return vector.isNullAt(i);
-    }
-
-    @Override
-    public boolean getBoolean(int i) {
-      return vector.getBoolean(i);
-    }
-
-    @Override
-    public byte getByte(int i) {
-      return vector.getByte(i);
-    }
-
-    @Override
-    public short getShort(int i) {
-      return vector.getShort(i);
-    }
-
-    @Override
-    public int getInt(int i) {
-      return vector.getInt(i);
-    }
-
-    @Override
-    public long getLong(int i) {
-      return vector.getLong(i);
-    }
-
-    @Override
-    public float getFloat(int i) {
-      return vector.getFloat(i);
-    }
-
-    @Override
-    public double getDouble(int i) {
-      return vector.getDouble(i);
-    }
-
-    @Override
-    protected void reserveInternal(int capacity) {
-    }
-
-    @Override
-    public void reserve(int requiredCapacity) {
-      vector.reserve(requiredCapacity);
-    }
-
-    @Override
-    public long nullsNativeAddress() {
-      return vector.nullsNativeAddress();
-    }
-
-    @Override
-    public long valuesNativeAddress() {
-      return vector.valuesNativeAddress();
-    }
-
-    @Override
-    public void putBooleans(int rowId, int count, boolean value) {
-      vector.putBooleans(rowId, count, value);
-    }
-
-    @Override
-    public void putBytes(int rowId, int count, byte value) {
-      vector.putBytes(rowId, count, value);
-    }
-
-    @Override
-    public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putIntsLittleEndian(rowId, count, src, srcIndex);
-    }
-
-    @Override
-    public int getDictId(int rowId) {
-      return vector.getDictId(rowId);
-    }
-
-    @Override
-    public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putLongsLittleEndian(rowId, count, src, srcIndex);
-    }
-
-    @Override
-    public void putFloats(int rowId, int count, float value) {
-      vector.putFloats(rowId, count, value);
-    }
-
-    @Override
-    public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putFloats(rowId, count, src, srcIndex);
-    }
-
-    @Override
-    public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putDoubles(rowId, count, src, srcIndex);
-    }
-
-    @Override
-    public void putArray(int rowId, int offset, int length) {
-      vector.putArray(rowId, offset, length);
-    }
-
-    @Override
-    public int getArrayLength(int rowId) {
-      return vector.getArrayLength(rowId);
-    }
-
-    @Override
-    public int getArrayOffset(int rowId) {
-      return vector.getArrayOffset(rowId);
-    }
-
-    @Override
-    public void loadBytes(Array array) {
-      vector.loadBytes(array);
-    }
-
-    @Override
-    public int putByteArray(int rowId, byte[] value, int offset, int count) {
-      return vector.putByteArray(rowId, value, offset, count);
-    }
-
-    /**
-     * It keeps all binary data of all rows to it.
-     * Should use along with @{putArray(int rowId, int offset, int length)} to keep lengths
-     * and offset.
-     */
-    public void putAllByteArray(byte[] data, int offset, int length) {
-      vector.arrayData().appendBytes(length, data, offset);
-    }
-
-    @Override
-    public void close() {
-      vector.close();
-    }
-
-    public void reset() {
-      if (isConstant) {
-        return;
-      }
-      vector.reset();
-    }
-
-    public void setLazyPage(LazyPageLoader lazyPage) {
-      lazyPage.loadPage();
-    }
-
-    public ColumnVector getVector() {
-      return vector;
-    }
-  }
-
-  public static class ColumnVectorProxyWithLazyLoad extends ColumnVectorProxy {
-
-    private ColumnVector vector;
-
-    private LazyPageLoader pageLoad;
-
-    private boolean isLoaded;
-
-    public ColumnVectorProxyWithLazyLoad(ColumnVector columnVector, int capacity, MemoryMode mode) {
-      super(columnVector, capacity, mode);
-      vector = columnVector;
-    }
-
-    @Override
-    public boolean isNullAt(int i) {
-      checkPageLoaded();
-      return vector.isNullAt(i);
-    }
-
-    @Override
-    public boolean getBoolean(int i) {
-      checkPageLoaded();
-      return vector.getBoolean(i);
-    }
-
-    @Override
-    public byte getByte(int i) {
-      checkPageLoaded();
-      return vector.getByte(i);
-    }
-
-    @Override
-    public short getShort(int i) {
-      checkPageLoaded();
-      return vector.getShort(i);
-    }
-
-    @Override
-    public int getInt(int i) {
-      checkPageLoaded();
-      return vector.getInt(i);
-    }
-
-    @Override
-    public long getLong(int i) {
-      checkPageLoaded();
-      return vector.getLong(i);
-    }
-
-    @Override
-    public float getFloat(int i) {
-      checkPageLoaded();
-      return vector.getFloat(i);
-    }
-
-    @Override
-    public double getDouble(int i) {
-      checkPageLoaded();
-      return vector.getDouble(i);
-    }
-
-    @Override
-    public int getArrayLength(int rowId) {
-      checkPageLoaded();
-      return vector.getArrayLength(rowId);
-    }
-
-    @Override
-    public int getArrayOffset(int rowId) {
-      checkPageLoaded();
-      return vector.getArrayOffset(rowId);
-    }
-
-    private void checkPageLoaded() {
-      if (!isLoaded) {
-        if (pageLoad != null) {
-          pageLoad.loadPage();
-        }
-        isLoaded = true;
-      }
-    }
-
-    public void reset() {
-      if (isConstant) {
-        return;
-      }
-      isLoaded = false;
-      vector.reset();
-    }
-
-    public void setLazyPage(LazyPageLoader lazyPage) {
-      this.pageLoad = lazyPage;
-    }
-
-  }
-}
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 7b65e0b..dfebf6f 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -253,9 +253,12 @@
       </properties>
     </profile>
     <profile>
-      <id>spark-2.1</id>
+      <id>spark-2.3</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
       <properties>
-        <spark.version>2.1.0</spark.version>
+        <spark.version>2.3.4</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>
@@ -266,9 +269,7 @@
             <artifactId>maven-compiler-plugin</artifactId>
             <configuration>
               <excludes>
-                <exclude>src/main/spark2.2</exclude>
-                <exclude>src/main/spark2.3</exclude>
-                <exclude>src/main/commonTo2.2And2.3</exclude>
+                <exclude>src/main/spark2.4</exclude>
               </excludes>
             </configuration>
           </plugin>
@@ -285,8 +286,7 @@
                 </goals>
                 <configuration>
                   <sources>
-                    <source>src/main/spark2.1</source>
-                    <source>src/main/commonTo2.1And2.2</source>
+                    <source>src/main/spark2.3</source>
                   </sources>
                 </configuration>
               </execution>
@@ -296,9 +296,9 @@
       </build>
     </profile>
     <profile>
-      <id>spark-2.2</id>
+      <id>spark-2.4</id>
       <properties>
-        <spark.version>2.2.1</spark.version>
+        <spark.version>2.4.4</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>
@@ -309,7 +309,6 @@
             <artifactId>maven-compiler-plugin</artifactId>
             <configuration>
               <excludes>
-                <exclude>src/main/spark2.1</exclude>
                 <exclude>src/main/spark2.3</exclude>
               </excludes>
             </configuration>
@@ -327,55 +326,7 @@
                 </goals>
                 <configuration>
                   <sources>
-                    <source>src/main/spark2.2</source>
-                    <source>src/main/commonTo2.2And2.3</source>
-                    <source>src/main/commonTo2.1And2.2</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
-      <id>spark-2.3</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
-      <properties>
-        <spark.version>2.3.4</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.1</exclude>
-                <exclude>src/main/spark2.2</exclude>
-                <exclude>src/main/commonTo2.1And2.2</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.3</source>
-                    <source>src/main/commonTo2.2And2.3</source>
+                    <source>src/main/spark2.4</source>
                   </sources>
                 </configuration>
               </execution>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
index aa650e0..78d6a46 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
@@ -17,13 +17,8 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.DataType
-
-import org.apache.carbondata.core.scan.expression.ColumnExpression
 
 case class CastExpr(expr: Expression) extends Filter {
   override def references: Array[String] = null
@@ -33,26 +28,6 @@ case class FalseExpr() extends Filter {
   override def references: Array[String] = null
 }
 
-case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
-  extends LeafExpression with NamedExpression with CodegenFallback {
-
-  type EvaluatedType = Any
-
-  override def toString: String = s"input[" + colExp.getColIndex + "]"
-
-  override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
-
-  override def name: String = colExp.getColumnName
-
-  override def toAttribute: Attribute = throw new UnsupportedOperationException
-
-  override def exprId: ExprId = throw new UnsupportedOperationException
-
-  override def qualifier: Option[String] = null
-
-  override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
-
 case class CarbonEndsWith(expr: Expression) extends Filter {
   override def references: Array[String] = null
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index e020a99..23c078a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -258,41 +258,67 @@ case class CarbonDictionaryDecoder(
             s"""
                |org.apache.spark.sql.DictTuple $value = $decodeDecimal($dictRef, ${ev.value});
                  """.stripMargin
-            ExprCode(code, s"$value.getIsNull()",
-              s"((org.apache.spark.sql.types.Decimal)$value.getValue())")
+            CarbonToSparkAdapter.createExprCode(
+              code,
+              s"$value.getIsNull()",
+              s"((org.apache.spark.sql.types.Decimal)$value.getValue())",
+              expr.dataType)
           } else {
             getDictionaryColumnIds(index)._3.getDataType match {
               case CarbonDataTypes.INT => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeInt($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Integer)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Integer)$value.getValue())",
+                  expr.dataType)
               case CarbonDataTypes.SHORT => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeShort($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Short)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Short)$value.getValue())",
+                  expr.dataType)
               case CarbonDataTypes.DOUBLE => code +=
                  s"""
                     |org.apache.spark.sql.DictTuple $value = $decodeDouble($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Double)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Double)$value.getValue())",
+                  expr.dataType)
               case CarbonDataTypes.LONG => code +=
                  s"""
                     |org.apache.spark.sql.DictTuple $value = $decodeLong($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Long)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Long)$value.getValue())",
+                  expr.dataType)
               case CarbonDataTypes.BOOLEAN => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeBool($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Boolean)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Boolean)$value.getValue())",
+                  expr.dataType)
               case _ => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeStr($dictRef, ${ev.value});
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((UTF8String)$value.getValue())")
-
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((UTF8String)$value.getValue())",
+                  expr.dataType)
             }
           }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
index 096aa19..3ca4538 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
@@ -79,33 +79,3 @@ case class OptimizerRule(session: SparkSession) extends Rule[LogicalPlan] {
   }
 }
 
-class OptimizerProxy(
-    session: SparkSession,
-    catalog: SessionCatalog,
-    optimizer: Optimizer) extends Optimizer(catalog) {
-
-  private lazy val firstBatchRules = Seq(Batch("First Batch Optimizers", Once,
-      Seq(CarbonMVRules(session), new CarbonPreOptimizerRule()): _*))
-
-  private lazy val LastBatchRules = Batch("Last Batch Optimizers", fixedPoint,
-    Seq(new CarbonIUDRule(), new CarbonUDFTransformRule(), new CarbonLateDecodeRule()): _*)
-
-  override def batches: Seq[Batch] = {
-    firstBatchRules ++ convertedBatch() :+ LastBatchRules
-  }
-
-  def convertedBatch(): Seq[Batch] = {
-    optimizer.batches.map { batch =>
-      Batch(
-        batch.name,
-        batch.strategy match {
-          case optimizer.Once =>
-            Once
-          case _: optimizer.FixedPoint =>
-            fixedPoint
-        },
-        batch.rules: _*
-      )
-    }
-  }
-}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 231d21c..e08fc16 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -25,16 +25,10 @@ import scala.collection.mutable
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.CarbonParserUtil
-import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
-import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.hive.CarbonMetaStore
-import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
-import org.apache.spark.sql.parser.{CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -44,12 +38,10 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil}
 import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamingQueryListener, StreamSinkFactory}
@@ -149,13 +141,6 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       Option(dataSchema))
   }
 
-  private def addLateDecodeOptimization(ss: SparkSession): Unit = {
-    if (ss.sessionState.experimentalMethods.extraStrategies.isEmpty) {
-      ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
-      ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
-    }
-  }
-
   /**
    * Returns the path of the table
    *
@@ -243,127 +228,110 @@ object CarbonSource {
 
   lazy val listenerAdded = new mutable.HashMap[Int, Boolean]()
 
-  def createTableInfoFromParams(
-      parameters: Map[String, String],
-      dataSchema: StructType,
-      identifier: AbsoluteTableIdentifier,
-      query: Option[LogicalPlan],
-      sparkSession: SparkSession): TableModel = {
-    val sqlParser = new CarbonSpark2SqlParser
-    val map = scala.collection.mutable.Map[String, String]()
-    parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
-    val options = new CarbonOption(parameters)
-    val fields = query match {
-      case Some(q) =>
-        // if query is provided then it is a CTAS flow
-        if (sqlParser.getFields(dataSchema).nonEmpty) {
-          throw new AnalysisException(
-            "Schema cannot be specified in a Create Table As Select (CTAS) statement")
-        }
-        sqlParser
-          .getFields(CarbonEnv.getInstance(sparkSession).carbonMetaStore
-            .getSchemaFromUnresolvedRelation(sparkSession, q))
-      case None =>
-        sqlParser.getFields(dataSchema)
-    }
-    val bucketFields = sqlParser.getBucketFields(map, fields, options)
-    CarbonParserUtil.prepareTableModel(
-      ifNotExistPresent = false,
-      Option(identifier.getDatabaseName),
-      identifier.getTableName,
-      fields,
-      Nil,
-      map,
-      bucketFields)
-  }
-
   /**
-   * Update spark catalog table with schema information in case of schema storage is hive metastore
-   * @param tableDesc
-   * @param sparkSession
-   * @return
+   * Create TableInfo object from the CatalogTable
    */
-  def updateCatalogTableWithCarbonSchema(
-      tableDesc: CatalogTable,
-      sparkSession: SparkSession,
-      ignoreIfExists: Boolean,
-      query: Option[LogicalPlan] = None): CatalogTable = {
-    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-    val storageFormat = tableDesc.storage
-    val properties = CarbonSparkSqlParserUtil.getProperties(tableDesc)
-    if (!properties.contains("carbonSchemaPartsNo")) {
-      val (map, tablePath) = updateAndCreateTable(
-        sparkSession,
-        metaStore,
-        properties,
-        query,
-        tableDesc,
-        ignoreIfExists)
-      // updating params
-      val updatedFormat = CarbonToSparkAdapter
-        .getUpdatedStorageFormat(storageFormat, map, tablePath)
-      val updatedSchema = if (tableDesc.tableType == CatalogTableType.EXTERNAL) {
-        tableDesc.schema
-      } else {
-        CarbonSparkUtil.updateStruct(tableDesc.schema)
-      }
-      tableDesc.copy(
-        storage = updatedFormat,
-        schema = updatedSchema,
-        partitionColumnNames = tableDesc.partitionColumnNames.map(_.toLowerCase)
-      )
-    } else {
-      val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
-      val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
-      tableInfo.setTransactionalTable(isTransactionalTable)
-      val isExternal = properties.getOrElse("isExternal", "true").contains("true")
-      val updatedTableType = if (isExternal) {
-        tableDesc.tableType
-      } else {
-        CatalogTableType.MANAGED
-      }
-      if (isTransactionalTable && !metaStore.isReadFromHiveMetaStore) {
-        // save to disk
-        metaStore.saveToDisk(tableInfo, properties("tablePath"))
-        // remove schema string from map as we don't store carbon schema to hive metastore
-        val map = CarbonUtil.removeSchemaFromMap(properties.asJava)
-        val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
-        tableDesc.copy(storage = updatedFormat, tableType = updatedTableType)
-      } else {
-        tableDesc.copy(tableType = updatedTableType)
-      }
-    }
-  }
-
-  def updateAndCreateTable(
-      sparkSession: SparkSession,
-      metaStore: CarbonMetaStore,
-      properties: Map[String, String],
-      query: Option[LogicalPlan],
-      tableDesc: CatalogTable,
-      ignoreIfExists: Boolean): (Map[String, String], String) = {
+  private def createTableInfo(sparkSession: SparkSession, table: CatalogTable): TableInfo = {
     val tableInfo = CarbonSparkSqlParserUtil.buildTableInfoFromCatalogTable(
-      tableDesc, true, sparkSession, query)
-    val tableLocation = if (tableDesc.storage.locationUri.isDefined) {
-      Some(tableDesc.storage.locationUri.get.toString)
+      table,
+      ifNotExists = true,
+      sparkSession)
+    val tableLocation = if (table.storage.locationUri.isDefined) {
+      Some(table.storage.locationUri.get.toString)
     } else {
       None
     }
-    val isExternal = tableDesc.tableType == CatalogTableType.EXTERNAL
     val tablePath = CarbonEnv.createTablePath(
       Some(tableInfo.getDatabaseName),
       tableInfo.getFactTable.getTableName,
       tableInfo.getFactTable.getTableId,
       tableLocation,
-      tableDesc.tableType == CatalogTableType.EXTERNAL,
+      table.tableType == CatalogTableType.EXTERNAL,
       tableInfo.isTransactionalTable)(sparkSession)
     tableInfo.setTablePath(tablePath)
     CarbonSparkSqlParserUtil.validateTableProperties(tableInfo)
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-    tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    tableInfo.getFactTable
+      .getSchemaEvolution
+      .getSchemaEvolutionEntryList
+      .add(schemaEvolutionEntry)
+    tableInfo
+  }
+
+  /**
+   * This function updates catalog table with new table properties and table path
+   *
+   * Carbon table needs to add more information in table properties including table name, database
+   * name, table path, etc. Because spark does not pass these information when calling data source
+   * interface (see [[CarbonSource]]). So we need to add them in table properties and use it in
+   * [[CarbonSource]].
+   */
+  private def createCatalogTableForCarbonExtension(
+      table: CatalogTable,
+      tableInfo: TableInfo,
+      properties: Map[String, String],
+      metaStore: CarbonMetaStore): CatalogTable = {
+    val storageFormat = table.storage
+    val isExternal = table.tableType == CatalogTableType.EXTERNAL
+    val updatedTableProperties = updateTableProperties(
+      tableInfo,
+      metaStore,
+      properties,
+      isExternal)
+    val updatedFormat = CarbonToSparkAdapter
+      .getUpdatedStorageFormat(storageFormat, updatedTableProperties, tableInfo.getTablePath)
+    val updatedSchema = if (isExternal) {
+      table.schema
+    } else {
+      CarbonSparkUtil.updateStruct(table.schema)
+    }
+    table.copy(
+      storage = updatedFormat,
+      schema = updatedSchema,
+      partitionColumnNames = table.partitionColumnNames.map(_.toLowerCase)
+    )
+  }
+
+  private def createCatalogTableForCarbonSession(
+      table: CatalogTable,
+      tableInfo: TableInfo,
+      properties: Map[String, String],
+      metaStore: CarbonMetaStore): CatalogTable = {
+    val storageFormat = table.storage
+    val isTransactionalTable = tableInfo.isTransactionalTable
+    val isExternal = properties.getOrElse("isExternal", "true").contains("true")
+    val updatedTableType = if (isExternal) {
+      table.tableType
+    } else {
+      CatalogTableType.MANAGED
+    }
+    if (isTransactionalTable && !metaStore.isReadFromHiveMetaStore) {
+      // remove schema string from map as we don't store carbon schema to hive metastore
+      val map = CarbonUtil.removeSchemaFromMap(properties.asJava)
+      val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
+      table.copy(storage = updatedFormat, tableType = updatedTableType)
+    } else {
+      table.copy(tableType = updatedTableType)
+    }
+  }
+
+  private def isCreatedByCarbonExtension(properties: Map[String, String]): Boolean = {
+    // if table is created by CarbonSession, there is a special property storing the TableInfo,
+    // otherwise it is created by CarbonExtension
+    !properties.contains("carbonSchemaPartsNo")
+  }
+
+  /**
+   * Update and return a new table properties by adding parameters required for
+   * relation creation in [[CarbonSource]]
+   */
+  private def updateTableProperties(
+      tableInfo: TableInfo,
+      metaStore: CarbonMetaStore,
+      properties: Map[String, String],
+      isExternalTable: Boolean): Map[String, String] = {
     val map = if (!metaStore.isReadFromHiveMetaStore && tableInfo.isTransactionalTable) {
-      saveToDisk(metaStore, ignoreIfExists, tableInfo)
       new java.util.HashMap[String, String]()
     } else {
       CarbonUtil.convertToMultiStringMap(tableInfo)
@@ -380,26 +348,71 @@ object CarbonSource {
     }
     map.put("tableName", tableInfo.getFactTable.getTableName)
     map.put("isTransactional", tableInfo.isTransactionalTable.toString)
-    map.put("isExternal", isExternal.toString)
-    (map.asScala.toMap, tableInfo.getTablePath)
+    map.put("isExternal", isExternalTable.toString)
+    map.asScala.toMap
+  }
+
+  /**
+   * create meta data for carbon table, including TableInfo object which
+   * will be persist in step3 and a new CatalogTable with updated information need by
+   * [[CarbonSource]]. (Because spark does not pass the information carbon needs when calling create
+   * relation API, so we need to update the [[CatalogTable]] to add those information, like table
+   * name, database name, table path, etc)
+   */
+  def createTableMeta(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      metaStore: CarbonMetaStore
+  ): (TableInfo, CatalogTable) = {
+    val properties = CarbonSparkSqlParserUtil.getProperties(table)
+    if (isCreatedByCarbonExtension(properties)) {
+      // Table is created by SparkSession with CarbonExtension,
+      // There is no TableInfo yet, so create it from CatalogTable
+      val tableInfo = createTableInfo(sparkSession, table)
+      val catalogTable = createCatalogTableForCarbonExtension(
+        table, tableInfo, properties, metaStore)
+      (tableInfo, catalogTable)
+    } else {
+      // Legacy code path (table is created by CarbonSession)
+      // Get the table info from the property
+      val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
+      val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
+      tableInfo.setTransactionalTable(isTransactionalTable)
+      val catalogTable = createCatalogTableForCarbonSession(table, tableInfo, properties, metaStore)
+      (tableInfo, catalogTable)
+    }
   }
 
-  private def saveToDisk(
+  /**
+   * Save carbon schema file in metastore, it will be saved only in case of CarbonFileMetaStore
+   * is used
+   */
+  def saveCarbonSchemaFile(
       metaStore: CarbonMetaStore, ignoreIfExists: Boolean, tableInfo: TableInfo): Unit = {
-    try {
-      metaStore.saveToDisk(tableInfo, tableInfo.getTablePath)
-    } catch {
-      case ex: IOException if ignoreIfExists =>
-        val schemaFile = CarbonTablePath.getMetadataPath(tableInfo.getTablePath) +
-                         CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SCHEMA_FILE
-        if (FileFactory.isFileExist(schemaFile)) {
-          LOGGER.error(ex)
-        } else {
-          throw ex
-        }
-      case ex => throw ex
+    if (!metaStore.isReadFromHiveMetaStore && tableInfo.isTransactionalTable) {
+      try {
+        metaStore.saveToDisk(tableInfo, tableInfo.getTablePath)
+      } catch {
+        case ex: IOException if ignoreIfExists =>
+          val schemaFile = CarbonTablePath.getMetadataPath(tableInfo.getTablePath) +
+                           CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SCHEMA_FILE
+          if (FileFactory.isFileExist(schemaFile)) {
+            LOGGER.error(ex)
+          } else {
+            throw ex
+          }
+        case ex => throw ex
+      }
     }
   }
+
+  def isCarbonDataSource(catalogTable: CatalogTable): Boolean = {
+    catalogTable.provider match {
+      case Some(x) => x.equalsIgnoreCase("carbondata")
+      case None => false
+    }
+  }
+
 }
 
 /**
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
index 8a37989..233f28d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -38,5 +38,5 @@ case class CustomDeterministicExpression(nonDt: Expression ) extends Expression
 
   override def genCode(ctx: CodegenContext): ExprCode = nonDt.genCode(ctx)
 
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy()
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 1577664..7546ac4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -159,6 +159,7 @@ case class CarbonDropDataMapCommand(
   }
 
   private def dropDataMapFromSystemFolder(sparkSession: SparkSession): Unit = {
+    LOGGER.info("Trying to drop DataMap from system folder")
     try {
       if (dataMapSchema == null) {
         dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 00871cb..0a4ddc9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -695,13 +695,13 @@ case class CarbonLoadDataCommand(
             attr.dataType
         }
       }
-      CarbonToSparkAdapter.createAttributeReference(attr.name,
+      CarbonToSparkAdapter.createAttributeReference(
+        attr.name,
         updatedDataType,
         attr.nullable,
         attr.metadata,
         attr.exprId,
-        attr.qualifier,
-        attr)
+        attr.qualifier)
     }
     // Only select the required columns
     var output = if (finalPartition.nonEmpty) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 17e628f..18190e5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -42,7 +42,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
-import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Command to register carbon table from existing carbon table data
@@ -70,7 +69,7 @@ case class RefreshCarbonTableCommand(
     // 2.1 check if the table already register with hive then ignore and continue with the next
     // schema
     val isCarbonDataSource = try {
-      CommonUtil.isCarbonDataSource(sparkSession.sessionState.catalog
+      CarbonSource.isCarbonDataSource(sparkSession.sessionState.catalog
         .getTableMetadata(TableIdentifier(tableName, databaseNameOp)))
     } catch {
       case _: NoSuchTableException =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index c2cda37..c526387 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -97,12 +97,12 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
           carbonTable,
           schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
           thriftTable)(sparkSession)
-      // In case of spark2.2 and above and , when we call
+      // when we call
       // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
       // in case of add column, spark gets the catalog table and then it itself adds the partition
       // columns if the table is partition table for all the new data schema sent by carbon,
       // so there will be duplicate partition columns, so send the columns without partition columns
-      val cols = if (SparkUtil.isSparkVersionXandAbove("2.2") && carbonTable.isHivePartitionTable) {
+      val cols = if (carbonTable.isHivePartitionTable) {
         val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
         val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains
         (col))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 01c5aa9..16025fb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -162,7 +162,7 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       if (isDataTypeChange) {
         // if column datatype change operation is on partition column, then fail the datatype change
         // operation
-        if (SparkUtil.isSparkVersionXandAbove("2.2") && null != carbonTable.getPartitionInfo) {
+        if (null != carbonTable.getPartitionInfo) {
           val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList
           partitionColumns.asScala.foreach {
             col =>
@@ -285,14 +285,13 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
     // update the schema changed column at the specific index in carbonColumns based on schema order
     carbonColumns
       .update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
-    // In case of spark2.2 and above and , when we call
+    // When we call
     // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
     // in case of rename column or change datatype, spark gets the catalog table and then it itself
     // adds the partition columns if the table is partition table for all the new data schema sent
     // by carbon, so there will be duplicate partition columns, so send the columns without
     // partition columns
-    val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
-                      carbonTable.isHivePartitionTable) {
+    val columns = if (carbonTable.isHivePartitionTable) {
       val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
       Some(carbonColumns.filterNot(col => partitionColumns.contains(col)))
     } else {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index e2bd789..9370396 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -151,13 +151,12 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       val cols = carbonTable.getCreateOrderColumn().asScala
         .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
         .filterNot(column => delCols.contains(column))
-      // In case of spark2.2 and above and , when we call
+      // When we call
       // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
       // in case of drop column, spark gets the catalog table and then it itself adds the partition
       // columns if the table is partition table for all the new data schema sent by carbon,
       // so there will be duplicate partition columns, so send the columns without partition columns
-      val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
-                        carbonTable.isHivePartitionTable) {
+      val columns = if (carbonTable.isHivePartitionTable) {
         val partitionColumns = partitionInfo.getColumnSchemaList.asScala
         val carbonColumnsWithoutPartition = cols.filterNot(col => partitionColumns.contains(col))
         Some(carbonColumnsWithoutPartition)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
index 32011fe..389a8da 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.execution.command.table
 
-import org.apache.spark.sql.{AnalysisException, CarbonSource, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSource, Row, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, MetadataCommand}
+import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, DropTableCommand, MetadataCommand}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 
@@ -56,9 +56,29 @@ case class CarbonCreateDataSourceTableCommand(
         throw new AnalysisException(s"Table ${ table.identifier.unquotedString } already exists.")
       }
     }
-    try {
-      new CreateDataSourceTableCommand(
-        CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession, ignoreIfExists),
+
+    // Step1:
+    // Create the table metadata required by carbondata table, including TableInfo object which
+    // will be persist in step3 and a new CatalogTable with updated information need by
+    // CarbonSource. (Because spark does not pass the information carbon needs when calling create
+    // relation API, so we need to update the CatalogTable to add those information, like table
+    // name, database name, table path, etc)
+    //
+    // Step2:
+    // Create a new CatalogTable containing an updated table properties.
+    // We need to update the table properties since carbon needs to use extra information
+    // when creating Relation in CarbonSource, but spark is not passing them, like table name,
+    // database name, table path, etc.
+    //
+    // Step3:
+    // Persist the TableInfo object in table path as schema file.
+
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val (tableInfo, catalogTable) = CarbonSource.createTableMeta(sparkSession, table, metaStore)
+
+    val rows = try {
+      CreateDataSourceTableCommand(
+        catalogTable,
         ignoreIfExists
       ).run(sparkSession)
     } catch {
@@ -68,5 +88,23 @@ case class CarbonCreateDataSourceTableCommand(
       case ex =>
         throw ex
     }
+
+    try {
+      CarbonSource.saveCarbonSchemaFile(metaStore, ignoreIfExists, tableInfo)
+    } catch {
+      case ex: Throwable =>
+        // drop the table if anything goes wrong
+        LOGGER.error(s"save carbon table schema file failed for table " +
+                     s"${table.database}.${table.identifier.table}, dropping the table")
+        DropTableCommand(
+          table.identifier,
+          ifExists = true,
+          isView = false,
+          purge = false
+        ).run(sparkSession)
+        throw ex
+    }
+    rows
   }
+
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 5104420..dc0187c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -197,8 +197,9 @@ case class CarbonCreateTableCommand(
             } catch {
               case _: Exception => // No operation
             }
+            throw e
             val msg = s"Create table'$tableName' in database '$dbName' failed"
-            throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage))
+            throwMetadataException(dbName, tableName, s"$msg, ${e.getMessage}")
         }
       }
       val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 68ea3bf..bb687e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -134,6 +134,7 @@ case class CarbonDropTableCommand(
       }
       val indexDatamapSchemas =
         DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+      LOGGER.info(s"Dropping DataMaps in table $tableName, size: ${indexDatamapSchemas.size()}")
       if (!indexDatamapSchemas.isEmpty) {
         childDropDataMapCommands = indexDatamapSchemas.asScala.map { schema =>
           val command = CarbonDropDataMapCommand(schema.getDataMapName,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 2e1f91f..8acc749 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -142,10 +142,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
       logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
     val attrs = projectExprsNeedToDecode.map { attr =>
-      val newAttr = AttributeReference(attr.name,
+      val newAttr = CarbonToSparkAdapter.createAttributeReference(
+        attr.name,
         attr.dataType,
         attr.nullable,
-        attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
+        attr.metadata,
+        attr.exprId,
+        Option(table.carbonRelation.tableName))
       relation.addAttribute(newAttr)
       newAttr
     }
@@ -194,8 +197,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
               attr.nullable,
               attr.metadata,
               attr.exprId,
-              attr.qualifier,
-              attr)
+              attr.qualifier)
         }
       }
       partitions =
@@ -403,7 +405,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
             newProjectList :+= reference
             a.transform {
               case s: ScalaUDF =>
-                ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
+                CarbonToSparkAdapter.createScalaUDF(s, reference)
             }
           case other => other
       }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 68f7442..b07f9a4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.execution.strategy
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand}
@@ -27,12 +28,10 @@ import org.apache.spark.sql.execution.command.mutation.CarbonTruncateCommand
 import org.apache.spark.sql.execution.command.schema._
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableLikeCommand, CarbonDropTableCommand, CarbonShowCreateTableCommand}
 import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, RefreshResource, RefreshTable}
-import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand, MatchResetCommand}
 import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
+import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand, MatchResetCommand}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Carbon strategies for ddl commands
@@ -115,9 +114,11 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       // create/describe/drop table
       case createTable: CreateTableCommand
         if isCarbonHiveTable(createTable.table) =>
+        // CREATE TABLE STORED AS carbondata
         ExecutedCommandExec(DDLHelper.createHiveTable(createTable, sparkSession)) :: Nil
       case createTable: CreateTableCommand
         if isCarbonFileHiveTable(createTable.table) =>
+        // CREATE TABLE STORED AS carbon
         if (EnvHelper.isCloud(sparkSession)) {
           Nil
         } else {
@@ -125,11 +126,13 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         }
       case ctas: CreateHiveTableAsSelectCommand
         if isCarbonHiveTable(ctas.tableDesc) =>
+        // CREATE TABLE STORED AS carbondata AS SELECT
         ExecutedCommandExec(
           DDLHelper.createHiveTableAsSelect(ctas, sparkSession)
         ) :: Nil
       case ctas: CreateHiveTableAsSelectCommand
         if isCarbonFileHiveTable(ctas.tableDesc) =>
+        // CREATE TABLE STORED AS carbon AS SELECT
         if (EnvHelper.isCloud(sparkSession)) {
           Nil
         } else {
@@ -149,20 +152,24 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if isCarbonTable(truncateTable.tableName) =>
         ExecutedCommandExec(CarbonTruncateCommand(truncateTable)) :: Nil
       case createTable@org.apache.spark.sql.execution.datasources.CreateTable(_, _, None)
-        if CommonUtil.isCarbonDataSource(createTable.tableDesc) =>
+        if CarbonSource.isCarbonDataSource(createTable.tableDesc) =>
+        // CREATE TABLE USING carbondata
         ExecutedCommandExec(DDLHelper.createDataSourceTable(createTable, sparkSession)) :: Nil
       case MatchCreateDataSourceTable(tableDesc, mode, query)
-        if CommonUtil.isCarbonDataSource(tableDesc) =>
+        if CarbonSource.isCarbonDataSource(tableDesc) =>
+        // CREATE TABLE USING carbondata AS SELECT
         ExecutedCommandExec(
           DDLHelper.createDataSourceTableAsSelect(tableDesc, query, mode, sparkSession)
         ) :: Nil
       case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, query)
-        if CommonUtil.isCarbonDataSource(tableDesc) =>
+        if CarbonSource.isCarbonDataSource(tableDesc) =>
+        // CREATE TABLE USING carbondata AS SELECT
         ExecutedCommandExec(
           DDLHelper.createDataSourceTableAsSelect(tableDesc, query.get, mode, sparkSession)
         ) :: Nil
       case createTable@CreateDataSourceTableCommand(table, _)
-        if CommonUtil.isCarbonDataSource(table) =>
+        if CarbonSource.isCarbonDataSource(table) =>
+        // CREATE TABLE USING carbondata
         ExecutedCommandExec(
           DDLHelper.createDataSourceTable(createTable, sparkSession)
         ) :: Nil
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index fd7defa..5e82eb3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -26,8 +26,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{MixedFormatHandlerUtil, SparkSession}
 import org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
-import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec}
 import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 1be3af3..78d69a8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -84,9 +84,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
             "Update operation is not supported for mv datamap table")
         }
       }
-      val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-        relation
-      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val tableRelation =
         alias match {
           case Some(_) =>
             CarbonReflectionUtils.getSubqueryAlias(
@@ -96,9 +94,6 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
               Some(table.tableIdentifier))
           case _ => relation
         }
-      } else {
-        throw new UnsupportedOperationException("Unsupported Spark version.")
-      }
 
       CarbonReflectionUtils.getSubqueryAlias(
         sparkSession,
@@ -232,21 +227,15 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
           }
         }
         // include tuple id in subquery
-        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-          Project(projList, relation)
-        } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-          alias match {
-            case Some(_) =>
-              val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
-                sparkSession,
-                alias,
-                relation,
-                Some(table.tableIdentifier))
-              Project(projList, subqueryAlias)
-            case _ => Project(projList, relation)
-          }
-        } else {
-          throw new UnsupportedOperationException("Unsupported Spark version.")
+        alias match {
+          case Some(_) =>
+            val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
+              sparkSession,
+              alias,
+              relation,
+              Some(table.tableIdentifier))
+            Project(projList, subqueryAlias)
+          case _ => Project(projList, relation)
         }
     }
     CarbonProjectForDeleteCommand(
@@ -325,13 +314,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
         }
       }
       val newChild: LogicalPlan = if (newChildOutput == child.output) {
-        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-          CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
-        } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-          CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
-        } else {
-          throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
-        }
+        throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
       } else {
         Project(newChildOutput, child)
       }
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
similarity index 99%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
index 8f4d45e..04beea7 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -47,4 +47,4 @@ class CarbonAnalyzer(catalog: SessionCatalog,
       logicalPlan
     }
   }
-}
\ No newline at end of file
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 15b2a51..686fa2c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -18,23 +18,21 @@
 package org.apache.spark.sql.hive
 
 import java.io.IOException
-import java.net.URI
-import java.util.Locale
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.locks.{Lock, ReentrantReadWriteLock}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, EnvHelper, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSource, EnvHelper, SparkSession}
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -47,12 +45,12 @@ import org.apache.carbondata.core.metadata.schema
 import org.apache.carbondata.core.metadata.schema.SchemaReader
 import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
+import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
   // use to lock the carbonTables
@@ -209,14 +207,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDatasourceHadoopRelation.carbonRelation
       case SubqueryAlias(_, c)
-        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
-           (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+        if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
               "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
         val catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
-        if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+        if (!CarbonSource.isCarbonDataSource(catalogTable)) {
           CarbonMetadata.getInstance().removeTable(database, tableIdentifier.table)
           throw new NoSuchTableException(database, tableIdentifier.table)
         }
@@ -529,15 +526,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
       carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDataSourceHadoopRelation
       case SubqueryAlias(_, c)
-        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
-           (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+        if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName
               .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
               "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
         val catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
-        if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+        if (!CarbonSource.isCarbonDataSource(catalogTable)) {
           throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table)
         }
         val tableLocation = catalogTable.storage.locationUri match {
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
similarity index 68%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
index a3d149b..d110133 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.CarbonDatasourceHadoopRelation
@@ -29,7 +46,7 @@ object CarbonOptimizerUtil {
             }
             Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
 
-          case In(value, Seq(l:ListQuery)) =>
+          case In(value, Seq(l: ListQuery)) =>
             val tPlan = l.plan.transform {
               case lr: LogicalRelation
                 if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 9b3ff87..68c293b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -18,11 +18,11 @@ package org.apache.spark.sql.hive
 
 import java.util.LinkedHashSet
 
-import scala.Array.canBuildFrom
 import scala.collection.JavaConverters._
 
+import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.{CarbonMetastoreTypes, SparkTypeConverter}
@@ -119,7 +119,8 @@ case class CarbonRelation(
             val dataType = SparkTypeConverter.addDecimalScaleAndPrecision(column, dType)
             CarbonMetastoreTypes.toDataType(dataType)
         }
-        AttributeReference(column.getColName, output, nullable = true )(
+        CarbonToSparkAdapter.createAttributeReference(
+          column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
           qualifier = Option(tableName + "." + column.getColName))
       } else {
         val output = CarbonMetastoreTypes.toDataType {
@@ -129,7 +130,8 @@ case class CarbonRelation(
             case others => others
           }
         }
-        AttributeReference(column.getColName, output, nullable = true)(
+        CarbonToSparkAdapter.createAttributeReference(
+          column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
           qualifier = Option(tableName + "." + column.getColName))
       }
     }
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
similarity index 96%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 3e23fd8..4fd9c20 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.hive
 
 import java.util.concurrent.Callable
@@ -24,15 +25,14 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.{AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand}
-import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 
 object CarbonSessionCatalogUtil {
 
@@ -135,7 +135,7 @@ object CarbonSessionCatalogUtil {
     //    using sparkSession.sharedState which internally contains all required carbon rules,
     //    optimizers pluged-in through SessionStateBuilder in spark-defaults.conf.
     //    spark.sql.session.state.builder=org.apache.spark.sql.hive.CarbonSessionStateBuilder
-    sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+    CarbonToSparkAdapter.getHiveExternalCatalog(sparkSession).client
   }
 
   def alterAddColumns(tableIdentifier: TableIdentifier,
@@ -195,9 +195,10 @@ object CarbonSessionCatalogUtil {
    * @param identifier
    * @return
    */
-  def getPartitionsAlternate(partitionFilters: Seq[Expression],
+  def getPartitionsAlternate(
+      partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
+      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
     CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
   }
 
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
similarity index 84%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index 968738a..be0ae04 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -1,40 +1,36 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.spark.sql.hive
 
-import java.util.concurrent.Callable
+import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSource, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
-import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
+import org.apache.spark.sql.util.SparkTypeConverter
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.util.SparkTypeConverter
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -61,7 +57,7 @@ object CarbonSessionUtil {
      * Set the stats to none in case of carbontable
      */
     def setStatsNone(catalogTable: CatalogTable): Unit = {
-      if (CommonUtil.isCarbonDataSource(catalogTable)) {
+      if (CarbonSource.isCarbonDataSource(catalogTable)) {
         // Update stats to none in case of carbon table as we are not expecting any stats from
         // Hive. Hive gives wrong stats for carbon table.
         catalogTable.stats match {
@@ -175,8 +171,7 @@ object CarbonSessionUtil {
 
   def updateCachedPlan(plan: LogicalPlan): LogicalPlan = {
     plan match {
-      case sa@SubqueryAlias(_,
-      MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) =>
+      case sa@SubqueryAlias(_, MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
         sa.copy(child = sa.child.asInstanceOf[LogicalRelation].copy())
       case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
         plan.asInstanceOf[LogicalRelation].copy()
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
similarity index 93%
rename from integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index 73b6790..36ec2c8 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -38,8 +38,8 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
         fileStorage.equalsIgnoreCase("'carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
       val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
-        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),ctx.locationSpec(0),
-        Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),
+        ctx.locationSpec(0), Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
       helper.createCarbonTable(createTableTuple)
     } else {
       super.visitCreateHiveTable(ctx)
@@ -47,6 +47,6 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
   }
 
   override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    visitAddTableColumns(parser,ctx)
+    visitAddTableColumns(parser, ctx)
   }
 }
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala
similarity index 100%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
similarity index 98%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index ee9fb0f..0335b36 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -20,12 +19,12 @@ package org.apache.spark.sql.hive
 
 import java.net.URI
 
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand, RunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand}
 import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
 import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
 import org.apache.spark.util.CarbonReflectionUtils
 
 /**
@@ -49,7 +48,7 @@ case class CreateCarbonSourceTableAsSelectCommand(
     Seq.empty
   }
 
-  override def processData(sparkSession: SparkSession): Seq[Row] ={
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
     assert(table.tableType != CatalogTableType.VIEW)
     assert(table.provider.isDefined)
 
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
similarity index 100%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 1183350..278b781 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -310,15 +310,13 @@ object CarbonFilters {
    * @return
    */
   def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
-    var isCompatible = true
-    if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
-        .asInstanceOf[Option[Expression]]
-      if (trimStr.isDefined) {
-        isCompatible = false
-      }
+    val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
+      .asInstanceOf[Option[Expression]]
+    if (trimStr.isDefined) {
+      false
+    } else {
+      true
     }
-    isCompatible
   }
 
   def transformExpression(expr: Expression): CarbonExpression = {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index ad4d6fa..1e8f4ab 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -152,7 +152,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
     val mapOfNonCarbonPlanNodes = new java.util.HashMap[LogicalPlan, ExtraNodeInfo]
     fillNodeInfo(plan, mapOfNonCarbonPlanNodes)
     val aliasMap = CarbonAliasDecoderRelation()
-    // collect alias information before hand.
+    // collect alias information beforehand.
     collectInformationOnAttributes(plan, aliasMap)
 
     def hasCarbonRelation(currentPlan: LogicalPlan): Boolean = {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 102a3df..79410e9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -306,7 +306,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           }
         }
         val (selectStmt, relation) =
-          if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined ||
+          if (selectPattern.findFirstIn(sel.toLowerCase).isEmpty ||
               !StringUtils.isEmpty(subQueryResults)) {
             // if subQueryResults are not empty means, it is not join with main table.
             // so use subQueryResults in update with value flow.
@@ -349,8 +349,6 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         rel
     }
 
-
-
   private def updateRelation(
       r: UnresolvedRelation,
       tableIdent: Seq[String],
@@ -363,8 +361,6 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           case Seq(dbName, tableName) => Some(tableName)
           case Seq(tableName) => Some(tableName)
         }
-        // Use Reflection to choose between Spark2.1 and Spark2.2
-        // Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection.
         CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, tableAlias)
     }
   }
@@ -387,8 +383,6 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
         val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent)
 
-        // Use Reflection to choose between Spark2.1 and Spark2.2
-        // Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection.
         val unresolvedRelation = CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, alias)
 
         (unresolvedRelation, tableIdent, alias, tableIdentifier)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 3f9853e..290bfda 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -309,7 +309,7 @@ object CarbonSparkSqlParserUtil {
       table: CatalogTable,
       ifNotExists: Boolean,
       sparkSession: SparkSession,
-      selectQuery: Option[LogicalPlan]): TableInfo = {
+      selectQuery: Option[LogicalPlan] = None): TableInfo = {
     val tableProperties = normalizeProperties(getProperties(table))
     val options = new CarbonOption(tableProperties)
     // validate streaming property
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
deleted file mode 100644
index 79a6240..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql
-
-import java.net.URI
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
-import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
-
-  def addSparkListener(sparkContext: SparkContext) = {
-    sparkContext.addSparkListener(new SparkListener {
-      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-        SparkSession.setDefaultSession(null)
-        SparkSession.sqlListener.set(null)
-      }
-    })
-  }
-
-  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
-                               attrRef : NamedExpression): AttributeReference = {
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qualifier,attrRef.isGenerated)
-  }
-
-  def createAliasRef(child: Expression,
-                     name: String,
-                     exprId: ExprId = NamedExpression.newExprId,
-                     qualifier: Option[String] = None,
-                     explicitMetadata: Option[Metadata] = None,
-                     namedExpr: Option[NamedExpression] = None): Alias = {
-    val isGenerated:Boolean = if (namedExpr.isDefined) {
-      namedExpr.get.isGenerated
-    } else {
-      false
-    }
-    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
-  }
-
-  def getExplainCommandObj() : ExplainCommand = {
-    ExplainCommand(OneRowRelation)
-  }
-
-  def getPartitionKeyFilter(
-      partitionSet: AttributeSet,
-      filterPredicates: Seq[Expression]): ExpressionSet = {
-    ExpressionSet(
-      ExpressionSet(filterPredicates)
-        .filter(_.references.subsetOf(partitionSet)))
-  }
-
-  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
-      map: Map[String, String],
-      tablePath: String): CatalogStorageFormat = {
-    storageFormat.copy(properties = map, locationUri = Some(tablePath))
-  }
-
-  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
-    Seq(OptimizeCodegen(conf))
-  }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
deleted file mode 100644
index eb3e88d..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.catalog
-
-import com.google.common.base.Objects
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
-  * A `LogicalPlan` that represents a hive table.
-  *
-  * TODO: remove this after we completely make hive as a data source.
-  */
-case class HiveTableRelation(
-                              tableMeta: CatalogTable,
-                              dataCols: Seq[AttributeReference],
-                              partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
-  assert(tableMeta.identifier.database.isDefined)
-  assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
-  assert(tableMeta.schema.sameType(dataCols.toStructType))
-
-  // The partition column should always appear after data columns.
-  override def output: Seq[AttributeReference] = dataCols ++ partitionCols
-
-  def isPartitioned: Boolean = partitionCols.nonEmpty
-
-  override def equals(relation: Any): Boolean = relation match {
-    case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    Objects.hashCode(tableMeta.identifier, output)
-  }
-
-  override def newInstance(): HiveTableRelation = copy(
-    dataCols = dataCols.map(_.newInstance()),
-    partitionCols = partitionCols.map(_.newInstance()))
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
deleted file mode 100644
index 9a88255..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseKeyWhen, CreateArray, CreateMap, CreateNamedStructLike, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal}
-import org.apache.spark.sql.catalyst.expressions.aggregate.First
-import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, UnaryNode}
-import org.apache.spark.sql.catalyst.rules.Rule
-
-class MigrateOptimizer {
-
-}
-
-/**
-  * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
-  */
-object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case Deduplicate(keys, child, streaming) if !streaming =>
-      val keyExprIds = keys.map(_.exprId)
-      val aggCols = child.output.map { attr =>
-        if (keyExprIds.contains(attr.exprId)) {
-          attr
-        } else {
-          Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
-        }
-      }
-      Aggregate(keys, aggCols, child)
-  }
-}
-
-/** A logical plan for `dropDuplicates`. */
-case class Deduplicate(
-                        keys: Seq[Attribute],
-                        child: LogicalPlan,
-                        streaming: Boolean) extends UnaryNode {
-
-  override def output: Seq[Attribute] = child.output
-}
-
-/**
-  * Remove projections from the query plan that do not make any modifications.
-  */
-object RemoveRedundantProject extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case p @ Project(_, child) if p.output == child.output => child
-  }
-}
-
-/**
-  * push down operations into [[CreateNamedStructLike]].
-  */
-object SimplifyCreateStructOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformExpressionsUp {
-      // push down field extraction
-      case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) =>
-        createNamedStructLike.valExprs(ordinal)
-    }
-  }
-}
-
-/**
-  * push down operations into [[CreateArray]].
-  */
-object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformExpressionsUp {
-      // push down field selection (array of structs)
-      case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) =>
-        // instead f selecting the field on the entire array,
-        // select it from each member of the array.
-        // pushing down the operation this way open other optimizations opportunities
-        // (i.e. struct(...,x,...).x)
-        CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name))))
-      // push down item selection.
-      case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
-        // instead of creating the array and then selecting one row,
-        // remove array creation altgether.
-        if (idx >= 0 && idx < elems.size) {
-          // valid index
-          elems(idx)
-        } else {
-          // out of bounds, mimic the runtime behavior and return null
-          Literal(null, ga.dataType)
-        }
-    }
-  }
-}
-
-/**
-  * push down operations into [[CreateMap]].
-  */
-object SimplifyCreateMapOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformExpressionsUp {
-      case GetMapValue(CreateMap(elems), key) => CaseKeyWhen(key, elems)
-    }
-  }
-}
-
-
-/**
-  * Removes MapObjects when the following conditions are satisfied
-  *   1. Mapobject(... lambdavariable(..., false) ...), which means types for input and output
-  *      are primitive types with non-nullable
-  *   2. no custom collection class specified representation of data item.
-  */
-object EliminateMapObjects extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
-    case MapObjects(_, _, _, LambdaVariable(_, _, _), inputData) => inputData
-  }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
deleted file mode 100644
index 8eb05fc..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
-  val carbonProperties = CarbonProperties.getInstance()
-
-  /**
-   * To initialize dynamic param defaults along with usage docs
-   */
-  def addDefaultCarbonParams(): Unit = {
-    val ENABLE_UNSAFE_SORT =
-        SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
-        .doc("To enable/ disable unsafe sort.")
-        .booleanConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
-      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
-        .doc("To set carbon task distribution.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    val BAD_RECORDS_LOGGER_ENABLE =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
-        .doc("To enable/ disable carbon bad record logger.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants
-          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    val BAD_RECORDS_ACTION =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
-        .doc("To configure the bad records action.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    val IS_EMPTY_DATA_BAD_RECORD =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
-        .doc("Property to decide weather empty data to be considered bad/ good record.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
-          .toBoolean)
-    val SORT_SCOPE =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
-        .doc("Property to specify sort scope.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    val BAD_RECORD_PATH =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
-        .doc("Property to configure the bad record location.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    val GLOBAL_SORT_PARTITIONS =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
-        .doc("Property to configure the global sort partitions.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    val DATEFORMAT =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
-        .doc("Property to configure data format for date type columns.")
-        .stringConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-    val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
-      "carbon.input.segments.<database_name>.<table_name>")
-      .doc("Property to configure the list of segments to query.").stringConf
-      .createWithDefault(carbonProperties
-        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
-  }
-  /**
-   * to set the dynamic properties default values
-   */
-  def addDefaultCarbonSessionParams(): Unit = {
-    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-      carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-  }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
deleted file mode 100644
index 6b94806..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser}
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, _}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, ShowTablesContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.spark.util.CarbonReflectionUtils
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonHiveSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    sparkSession: SparkSession,
-    functionResourceLoader: FunctionResourceLoader,
-    functionRegistry: FunctionRegistry,
-    conf: SQLConf,
-    hadoopConf: Configuration)
-  extends HiveSessionCatalog(
-    externalCatalog,
-    globalTempViewManager,
-    sparkSession,
-    functionResourceLoader,
-    functionRegistry,
-    conf,
-    hadoopConf) with CarbonSessionCatalog {
-
-  private lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-  /**
-   * return's the carbonEnv instance
-   * @return
-   */
-  override def getCarbonEnv() : CarbonEnv = {
-    carbonEnv
-  }
-
-  def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.init
-
-  /**
-   * This method will invalidate carbonrelation from cache if carbon table is updated in
-   * carbon catalog
-   *
-   * @param name
-   * @param alias
-   * @return
-   */
-  override def lookupRelation(name: TableIdentifier,
-      alias: Option[String]): LogicalPlan = {
-    val rtnRelation = super.lookupRelation(name, alias)
-    var toRefreshRelation = false
-    rtnRelation match {
-      case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
-        toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
-      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
-      case _ =>
-    }
-
-    if (toRefreshRelation) {
-      super.lookupRelation(name, alias)
-    } else {
-      rtnRelation
-    }
-  }
-
-  /**
-   * returns hive client from session state
-   *
-   * @return
-   */
-  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
-  }
-
-  override def createPartitions(
-      tableName: TableIdentifier,
-      parts: Seq[CatalogTablePartition],
-      ignoreIfExists: Boolean): Unit = {
-    try {
-      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
-      super.createPartitions(tableName, updatedParts, ignoreIfExists)
-    } catch {
-      case e: Exception =>
-        super.createPartitions(tableName, parts, ignoreIfExists)
-    }
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def getPartitionsAlternate(
-      partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
-    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
-    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
-    val partitionSchema = catalogTable.partitionSchema
-    if (partitionFilters.nonEmpty) {
-      val boundPredicate =
-        InterpretedPredicate.create(partitionFilters.reduce(And).transform {
-          case att: AttributeReference =>
-            val index = partitionSchema.indexWhere(_.name == att.name)
-            BoundReference(index, partitionSchema(index).dataType, nullable = true)
-        })
-      allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
-    } else {
-      allPartitions
-    }
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  override def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toString))
-  }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- * @param sparkSession
- */
-class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies = extraStrategies
-
-  experimentalMethods.extraOptimizations = extraOptimizations
-
-  def extraStrategies: Seq[Strategy] = {
-    Seq(
-      new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  }
-
-  def extraOptimizations: Seq[Rule[LogicalPlan]] = {
-    Seq(new CarbonIUDRule,
-      new CarbonUDFTransformRule,
-      new CarbonLateDecodeRule)
-  }
-
-  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
-  def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil
-  def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = {
-    catalog.ParquetConversions ::
-    catalog.OrcConversions ::
-    CarbonPreInsertionCasts(sparkSession) ::
-    CarbonIUDAnalysisRule(sparkSession) ::
-    AnalyzeCreateTable(sparkSession) ::
-    PreprocessTableInsertion(conf) ::
-    DataSourceAnalysis(conf) ::
-    (if (conf.runSQLonFile) {
-      new ResolveDataSource(sparkSession) :: Nil
-    } else {  Nil })
-  }
-
-  override lazy val analyzer: Analyzer =
-    new CarbonAnalyzer(catalog, conf, sparkSession,
-      new Analyzer(catalog, conf) {
-        override val extendedResolutionRules =
-          if (extendedAnalyzerRules.nonEmpty) {
-            extendedAnalyzerRules ++ internalAnalyzerRules
-          } else {
-            internalAnalyzerRules
-          }
-        override val extendedCheckRules = Seq(
-          PreWriteCheck(conf, catalog))
-      }
-  )
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  override lazy val catalog = {
-    new CarbonHiveSessionCatalog(
-      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
-      sparkSession.sharedState.globalTempViewManager,
-      sparkSession,
-      functionResourceLoader,
-      functionRegistry,
-      conf,
-      newHadoopConf())
-  }
-}
-
-class CarbonAnalyzer(catalog: SessionCatalog,
-    conf: CatalystConf,
-    sparkSession: SparkSession,
-    analyzer: Analyzer) extends Analyzer(catalog, conf) {
-
-  val mvPlan = try {
-    CarbonReflectionUtils.createObject(
-      "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
-      sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
-  } catch {
-    case e: Exception =>
-      null
-  }
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    val logicalPlan = analyzer.execute(plan)
-    if (mvPlan != null) {
-      mvPlan.apply(logicalPlan)
-    } else {
-      logicalPlan
-    }
-  }
-}
-
-class CarbonOptimizer(
-    catalog: SessionCatalog,
-    conf: SQLConf,
-    experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
-    super.execute(transFormedPlan)
-  }
-}
-
-object CarbonOptimizerUtil {
-  def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = {
-    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule,
-    // And optimize whole plan at once.
-    val transFormedPlan = plan.transform {
-      case filter: Filter =>
-        filter.transformExpressions {
-          case s: ScalarSubquery =>
-            val tPlan = s.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            ScalarSubquery(tPlan, s.children, s.exprId)
-          case p: PredicateSubquery =>
-            val tPlan = p.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
-        }
-    }
-    transFormedPlan
-  }
-}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) {
-
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
-
-    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("carbondata") ||
-        fileStorage.equalsIgnoreCase("'carbonfile'") ||
-        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec,
-        ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(),
-        Option(ctx.STRING()).map(string),
-        ctx.AS, ctx.query, fileStorage)
-        helper.createCarbonTable(createTableTuple)
-    } else {
-      super.visitCreateTable(ctx)
-    }
-  }
-
-  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
-    withOrigin(ctx) {
-      if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
-          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
-        super.visitShowTables(ctx)
-      } else {
-        CarbonShowTablesCommand(
-          Option(ctx.db).map(_.getText),
-          Option(ctx.pattern).map(string))
-      }
-    }
-  }
-
-  override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
-    CarbonExplainCommand(super.visitExplain(ctx))
-  }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
deleted file mode 100644
index dd690e4..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{
-  CatalogRelation, CatalogTable, CatalogTableType,
-  SimpleCatalogRelation
-}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{
-  AlterTableRecoverPartitionsCommand, DDLUtils,
-  RunnableCommand
-}
-import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.sources.InsertableRelation
-import org.apache.spark.sql.types.StructType
-
-/**
- * Create table 'using carbondata' and insert the query result into it.
- * @param table the Catalog Table
- * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
- * @param query the query whose result will be insert into the new relation
- */
-
-case class CreateCarbonSourceTableAsSelectCommand(
-    table: CatalogTable,
-    mode: SaveMode,
-    query: LogicalPlan)
-  extends RunnableCommand {
-
-  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    assert(table.tableType != CatalogTableType.VIEW)
-    assert(table.provider.isDefined)
-    assert(table.schema.isEmpty)
-
-    val provider = table.provider.get
-    val sessionState = sparkSession.sessionState
-    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = table.identifier.copy(database = Some(db))
-    val tableName = tableIdentWithDB.unquotedString
-
-    var createMetastoreTable = false
-    var existingSchema = Option.empty[StructType]
-    if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
-      // Check if we need to throw an exception or just return.
-      mode match {
-        case SaveMode.ErrorIfExists =>
-          throw new AnalysisException(s"Table $tableName already exists. " +
-                                      s"If you are using saveAsTable, you can set SaveMode to " +
-                                      s"SaveMode.Append to " +
-                                      s"insert data into the table or set SaveMode to SaveMode" +
-                                      s".Overwrite to overwrite" +
-                                      s"the existing data. " +
-                                      s"Or, if you are using SQL CREATE TABLE, you need to drop " +
-                                      s"$tableName first.")
-        case SaveMode.Ignore =>
-          // Since the table already exists and the save mode is Ignore, we will just return.
-          return Seq.empty[Row]
-        case SaveMode.Append =>
-          // Check if the specified data source match the data source of the existing table.
-          val existingProvider = DataSource.lookupDataSource(provider)
-          // TODO: Check that options from the resolved relation match the relation that we are
-          // inserting into (i.e. using the same compression).
-
-          // Pass a table identifier with database part, so that `lookupRelation` won't get temp
-          // views unexpectedly.
-          EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
-            case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
-              // check if the file formats match
-              l.relation match {
-                case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
-                  throw new AnalysisException(
-                    s"The file format of the existing table $tableName is " +
-                    s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " +
-                    s"format `$provider`")
-                case _ =>
-              }
-              if (query.schema.size != l.schema.size) {
-                throw new AnalysisException(
-                  s"The column number of the existing schema[${ l.schema }] " +
-                  s"doesn't match the data schema[${ query.schema }]'s")
-              }
-              existingSchema = Some(l.schema)
-            case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
-              existingSchema = Some(s.metadata.schema)
-            case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) =>
-              throw new AnalysisException("Saving data in the Hive serde table " +
-                                          s"${ c.catalogTable.identifier } is not supported yet. " +
-                                          s"Please use the insertInto() API as an alternative..")
-            case o =>
-              throw new AnalysisException(s"Saving data in ${ o.toString } is not supported.")
-          }
-        case SaveMode.Overwrite =>
-          sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
-          // Need to create the table again.
-          createMetastoreTable = true
-      }
-    } else {
-      // The table does not exist. We need to create it in metastore.
-      createMetastoreTable = true
-    }
-
-    val data = Dataset.ofRows(sparkSession, query)
-    val df = existingSchema match {
-      // If we are inserting into an existing table, just use the existing schema.
-      case Some(s) => data.selectExpr(s.fieldNames: _*)
-      case None => data
-    }
-
-    val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
-      Some(sessionState.catalog.defaultTablePath(table.identifier))
-    } else {
-      table.storage.locationUri
-    }
-
-    // Create the relation based on the data of df.
-    val pathOption = tableLocation.map("path" -> _)
-    val dataSource = DataSource(
-      sparkSession,
-      className = provider,
-      partitionColumns = table.partitionColumnNames,
-      bucketSpec = table.bucketSpec,
-      options = table.storage.properties ++ pathOption,
-      catalogTable = Some(table))
-
-    val result = try {
-      dataSource.write(mode, df)
-    } catch {
-      case ex: AnalysisException =>
-        logError(s"Failed to write to table $tableName in $mode mode", ex)
-        throw ex
-    }
-    result match {
-      case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
-                                   sparkSession.sqlContext.conf.manageFilesourcePartitions =>
-        // Need to recover partitions into the metastore so our saved data is visible.
-        sparkSession.sessionState.executePlan(
-          AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
-      case _ =>
-    }
-
-    // Refresh the cache of the table in the catalog.
-    sessionState.catalog.refreshTable(tableIdentWithDB)
-    Seq.empty[Row]
-  }
-}
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
deleted file mode 100644
index 446b5a5..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql
-
-import java.net.URI
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
-import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
-
-  def addSparkListener(sparkContext: SparkContext) = {
-    sparkContext.addSparkListener(new SparkListener {
-      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-        SparkSession.setDefaultSession(null)
-        SparkSession.sqlListener.set(null)
-      }
-    })
-  }
-
-  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
-                               attrRef : NamedExpression): AttributeReference = {
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qualifier,attrRef.isGenerated)
-  }
-
-  def createAliasRef(child: Expression,
-                     name: String,
-                     exprId: ExprId = NamedExpression.newExprId,
-                     qualifier: Option[String] = None,
-                     explicitMetadata: Option[Metadata] = None,
-                     namedExpr: Option[NamedExpression] = None): Alias = {
-    val isGenerated:Boolean = if (namedExpr.isDefined) {
-      namedExpr.get.isGenerated
-    } else {
-      false
-    }
-    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
-  }
-
-  def getExplainCommandObj() : ExplainCommand = {
-    ExplainCommand(OneRowRelation)
-  }
-
-  def getPartitionKeyFilter(
-      partitionSet: AttributeSet,
-      filterPredicates: Seq[Expression]): ExpressionSet = {
-    ExpressionSet(
-      ExpressionSet(filterPredicates)
-        .filter(_.references.subsetOf(partitionSet)))
-  }
-
-  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
-    Seq(OptimizeCodegen(conf))
-  }
-
-  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
-      map: Map[String, String],
-      tablePath: String): CatalogStorageFormat = {
-    storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
-  }
-}
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
deleted file mode 100644
index 7177f1a..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.parser.ParserUtils.string
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
-
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
-  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
-    val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
-
-    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("carbondata") ||
-        fileStorage.equalsIgnoreCase("'carbonfile'") ||
-        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
-        ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
-        Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
-      helper.createCarbonTable(createTableTuple)
-    } else {
-      super.visitCreateHiveTable(ctx)
-    }
-  }
-
-  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    visitAddTableColumns(parser,ctx)
-  }
-}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
similarity index 77%
copy from integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
copy to integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
index aa650e0..8060441 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
@@ -18,21 +18,12 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
 import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.core.scan.expression.ColumnExpression
 
-case class CastExpr(expr: Expression) extends Filter {
-  override def references: Array[String] = null
-}
-
-case class FalseExpr() extends Filter {
-  override def references: Array[String] = null
-}
-
 case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
   extends LeafExpression with NamedExpression with CodegenFallback {
 
@@ -52,11 +43,3 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu
 
   override def newInstance(): NamedExpression = throw new UnsupportedOperationException
 }
-
-case class CarbonEndsWith(expr: Expression) extends Filter {
-  override def references: Array[String] = null
-}
-
-case class CarbonContainsWith(expr: Expression) extends Filter {
-  override def references: Array[String] = null
-}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
index 9094dfe..0d0e9c7 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -22,12 +22,15 @@ import java.net.URI
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.hive.{CarbonMVRules, CarbonPreOptimizerRule, HiveExternalCatalog, HiveSessionCatalog}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.types.{DataType, Metadata}
 
 object CarbonToSparkAdapter {
@@ -41,8 +44,8 @@ object CarbonToSparkAdapter {
   }
 
   def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-                               metadata: Metadata, exprId: ExprId, qualifier: Option[String],
-                               attrRef : NamedExpression): AttributeReference = {
+      metadata: Metadata, exprId: ExprId, qualifier: Option[String],
+      attrRef : NamedExpression = null): AttributeReference = {
     AttributeReference(
       name,
       dataType,
@@ -50,14 +53,23 @@ object CarbonToSparkAdapter {
       metadata)(exprId, qualifier)
   }
 
+  def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = {
+    ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
+  }
+
+  def createExprCode(code: String, isNull: String, value: String, dataType: DataType = null
+  ): ExprCode = {
+    ExprCode(code, isNull, value)
+  }
+
   def createAliasRef(child: Expression,
-                     name: String,
-                     exprId: ExprId = NamedExpression.newExprId,
-                     qualifier: Option[String] = None,
-                     explicitMetadata: Option[Metadata] = None,
-                     namedExpr : Option[NamedExpression] = None ) : Alias = {
+      name: String,
+      exprId: ExprId = NamedExpression.newExprId,
+      qualifier: Option[String] = None,
+      explicitMetadata: Option[Metadata] = None,
+      namedExpr : Option[NamedExpression] = None ) : Alias = {
 
-      Alias(child, name)(exprId, qualifier, explicitMetadata)
+    Alias(child, name)(exprId, qualifier, explicitMetadata)
   }
 
   def getExplainCommandObj() : ExplainCommand = {
@@ -90,4 +102,40 @@ object CarbonToSparkAdapter {
       tablePath: String): CatalogStorageFormat = {
     storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
   }
+
+  def getHiveExternalCatalog(sparkSession: SparkSession): HiveExternalCatalog = {
+    sparkSession.sessionState.catalog.externalCatalog.asInstanceOf[HiveExternalCatalog]
+  }
+}
+
+
+class OptimizerProxy(
+    session: SparkSession,
+    catalog: SessionCatalog,
+    optimizer: Optimizer) extends Optimizer(catalog) {
+
+  private lazy val firstBatchRules = Seq(Batch("First Batch Optimizers", Once,
+    Seq(CarbonMVRules(session), new CarbonPreOptimizerRule()): _*))
+
+  private lazy val LastBatchRules = Batch("Last Batch Optimizers", fixedPoint,
+    Seq(new CarbonIUDRule(), new CarbonUDFTransformRule(), new CarbonLateDecodeRule()): _*)
+
+  override def batches: Seq[Batch] = {
+    firstBatchRules ++ convertedBatch() :+ LastBatchRules
+  }
+
+  def convertedBatch(): Seq[Batch] = {
+    optimizer.batches.map { batch =>
+      Batch(
+        batch.name,
+        batch.strategy match {
+          case optimizer.Once =>
+            Once
+          case _: optimizer.FixedPoint =>
+            fixedPoint
+        },
+        batch.rules: _*
+      )
+    }
+  }
 }
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
similarity index 99%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
rename to integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
index d180cd3..cfc7755 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.TableIdentifier
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 5435f04..25d5543 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
 package org.apache.spark.sql.execution.strategy
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
similarity index 75%
copy from integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
copy to integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
index aa650e0..5a8b11f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
@@ -18,21 +18,12 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.core.scan.expression.ColumnExpression
 
-case class CastExpr(expr: Expression) extends Filter {
-  override def references: Array[String] = null
-}
-
-case class FalseExpr() extends Filter {
-  override def references: Array[String] = null
-}
-
 case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
   extends LeafExpression with NamedExpression with CodegenFallback {
 
@@ -48,15 +39,7 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu
 
   override def exprId: ExprId = throw new UnsupportedOperationException
 
-  override def qualifier: Option[String] = null
+  override def qualifier: Seq[String] = null
 
   override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
-
-case class CarbonEndsWith(expr: Expression) extends Filter {
-  override def references: Array[String] = null
-}
-
-case class CarbonContainsWith(expr: Expression) extends Filter {
-  override def references: Array[String] = null
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
new file mode 100644
index 0000000..bea162f
--- /dev/null
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.hive.{CarbonMVRules, CarbonPreOptimizerRule, HiveExternalCatalog}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapter {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+      }
+    })
+  }
+
+  def createAttributeReference(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      metadata: Metadata,
+      exprId: ExprId,
+      qualifier: Option[String],
+      attrRef : NamedExpression = null): AttributeReference = {
+    val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qf)
+  }
+
+  def createAttributeReference(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      metadata: Metadata,
+      exprId: ExprId,
+      qualifier: Seq[String]): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier)
+  }
+
+  def createScalaUDF(s: ScalaUDF, reference: AttributeReference) = {
+    ScalaUDF(s.function, s.dataType, Seq(reference), s.inputsNullSafe, s.inputTypes)
+  }
+
+  def createExprCode(code: String, isNull: String, value: String, dataType: DataType) = {
+    ExprCode(
+      code"$code",
+      JavaCode.isNullVariable(isNull),
+      JavaCode.variable(value, dataType))
+  }
+
+  def createAliasRef(
+      child: Expression,
+      name: String,
+      exprId: ExprId = NamedExpression.newExprId,
+      qualifier: Seq[String] = Seq.empty,
+      explicitMetadata: Option[Metadata] = None,
+      namedExpr: Option[NamedExpression] = None) : Alias = {
+    Alias(child, name)(exprId, qualifier, explicitMetadata)
+  }
+
+  def createAliasRef(
+      child: Expression,
+      name: String,
+      exprId: ExprId,
+      qualifier: Option[String]) : Alias = {
+    Alias(child, name)(exprId,
+      if (qualifier.isEmpty) Seq.empty else Seq(qualifier.get),
+      None)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation())
+  }
+
+  /**
+   * As a part of SPARK-24085 Hive tables supports scala subquery for
+   * parition tables, so Carbon also needs to supports
+   * @param partitionSet
+   * @param filterPredicates
+   * @return
+   */
+  def getPartitionKeyFilter(
+      partitionSet: AttributeSet,
+      filterPredicates: Seq[Expression]): ExpressionSet = {
+    ExpressionSet(
+      ExpressionSet(filterPredicates)
+        .filterNot(SubqueryExpression.hasSubquery)
+        .filter(_.references.subsetOf(partitionSet)))
+  }
+
+  // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
+  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+    Seq.empty
+  }
+
+  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+      map: Map[String, String],
+      tablePath: String): CatalogStorageFormat = {
+    storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+  }
+
+  def getHiveExternalCatalog(sparkSession: SparkSession) =
+    sparkSession.sessionState.catalog.externalCatalog
+      .asInstanceOf[ExternalCatalogWithListener]
+      .unwrapped
+      .asInstanceOf[HiveExternalCatalog]
+}
+
+class OptimizerProxy(
+    session: SparkSession,
+    catalog: SessionCatalog,
+    optimizer: Optimizer) extends Optimizer(catalog) {
+
+  private lazy val firstBatchRules = Seq(Batch("First Batch Optimizers", Once,
+    Seq(CarbonMVRules(session), new CarbonPreOptimizerRule()): _*))
+
+  private lazy val LastBatchRules = Batch("Last Batch Optimizers", fixedPoint,
+    Seq(new CarbonIUDRule(), new CarbonUDFTransformRule(), new CarbonLateDecodeRule()): _*)
+
+  override def defaultBatches: Seq[Batch] = {
+    firstBatchRules ++ convertedBatch() :+ LastBatchRules
+  }
+
+  def convertedBatch(): Seq[Batch] = {
+    optimizer.batches.map { batch =>
+      Batch(
+        batch.name,
+        batch.strategy match {
+          case optimizer.Once =>
+            Once
+          case _: optimizer.FixedPoint =>
+            fixedPoint
+        },
+        batch.rules: _*
+      )
+    }
+  }
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
similarity index 88%
rename from integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
rename to integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
index 69ed477..7b20c06 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 
 object MixedFormatHandlerUtil {
@@ -32,13 +33,13 @@ object MixedFormatHandlerUtil {
       dataFilters: Seq[Expression],
       tableIdentifier: Option[TableIdentifier]
   ): FileSourceScanExec = {
-    val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
     FileSourceScanExec(
       relation,
       output,
       outputSchema,
       partitionFilters,
-      pushedDownFilters,
+      None,
+      dataFilters,
       tableIdentifier)
   }
 }
diff --git a/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
similarity index 81%
rename from integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
rename to integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 7605574..60ee7ea 100644
--- a/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
 package org.apache.spark.sql.execution.strategy
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 
@@ -32,7 +32,7 @@ class CarbonDataSourceScan(
     val rdd: RDD[InternalRow],
     @transient override val relation: HadoopFsRelation,
     val partitioning: Partitioning,
-    override val metadata: Map[String, String],
+    val md: Map[String, String],
     identifier: Option[TableIdentifier],
     @transient private val logicalRelation: LogicalRelation)
   extends FileSourceScanExec(
@@ -40,14 +40,20 @@ class CarbonDataSourceScan(
     output,
     relation.dataSchema,
     Seq.empty,
+    None,
     Seq.empty,
     identifier) {
 
-  override val supportsBatch: Boolean = true
+  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+  override lazy val supportsBatch: Boolean = true
 
-  override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
+  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
     (partitioning, Nil)
 
+  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+  override lazy val metadata: Map[String, String] = md
+
   override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
 
 }
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
similarity index 95%
rename from integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
rename to integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
index 2f9ad3e..808099d 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -23,15 +23,14 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkOptimizer
 import org.apache.spark.sql.internal.SQLConf
 
-
 class CarbonOptimizer(
     catalog: SessionCatalog,
     conf: SQLConf,
     experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, conf, experimentalMethods) {
+  extends SparkOptimizer(catalog, experimentalMethods) {
 
   override def execute(plan: LogicalPlan): LogicalPlan = {
     val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
     super.execute(transFormedPlan)
   }
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
index 1c115e2..5d02082 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
@@ -24,6 +24,7 @@ class BloomCoarseGrainDataMapFunctionSuite  extends QueryTest with BeforeAndAfte
   val dataMapName = "bloom_dm"
 
   override protected def beforeAll(): Unit = {
+    sqlContext.sparkContext.setLogLevel("info")
     deleteFile(bigFile)
     new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
     createFile(bigFile, line = 2000)
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
index 8010bb9..b30ebc1 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
@@ -500,6 +500,9 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with
         "from hive_table where carbon_table.intField=hive_table.intField)"),
       Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true), Row(false, 10, false), Row(false, 10, false), Row(false, 10, false))
     )
+
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
   }
 
   test("Inserting into carbon table from Hive table: support boolean data type and other format") {
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
deleted file mode 100644
index e69de29..0000000
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 5731261..b9812a1 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -46,7 +46,7 @@ class CarbonDataSourceSuite extends Spark2QueryTest with BeforeAndAfterAll {
          |    stringField string,
          |    decimalField decimal(13, 0),
          |    timestampField string)
-         | USING org.apache.spark.sql.CarbonSource OPTIONS('tableName'='carbon_testtable')
+         | USING carbondata OPTIONS('tableName'='carbon_testtable')
        """.stripMargin)
 
     sql(
@@ -187,15 +187,14 @@ class CarbonDataSourceSuite extends Spark2QueryTest with BeforeAndAfterAll {
           "\nL_RETURNFLAG string,\nL_RECEIPTDATE string,\nL_ORDERKEY string,\nL_PARTKEY string," +
           "\nL_SUPPKEY string,\nL_LINENUMBER int,\nL_QUANTITY decimal,\nL_EXTENDEDPRICE decimal," +
           "\nL_DISCOUNT decimal,\nL_TAX decimal,\nL_LINESTATUS string,\nL_COMMITDATE string," +
-          "\nL_COMMENT string \n) \nUSING org.apache.spark.sql.CarbonSource\nOPTIONS (tableName " +
-          "\"car\", DICTIONARY_EXCLUDE \"L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_COMMENT\")")
+          "\nL_COMMENT string \n) \nUSING carbondata\nOPTIONS (DICTIONARY_EXCLUDE \"L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_COMMENT\")")
     }
     assert(ex.getMessage.contains("dictionary_exclude is deprecated in CarbonData 2.0"))
   }
 
   test("test create table with complex datatype") {
     sql("drop table if exists create_source")
-    sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('tableName'='create_source')")
+    sql("create table create_source(intField int, stringField string, complexField array<string>) USING carbondata")
     sql("drop table create_source")
   }
 
@@ -207,7 +206,7 @@ class CarbonDataSourceSuite extends Spark2QueryTest with BeforeAndAfterAll {
          | intField INT,
          | stringField STRING,
          | complexField ARRAY<STRING>)
-         | USING org.apache.spark.sql.CarbonSource
+         | USING carbondata
        """.stripMargin)
     sql("DROP TABLE create_source")
   }
@@ -221,8 +220,7 @@ class CarbonDataSourceSuite extends Spark2QueryTest with BeforeAndAfterAll {
          | intField INT,
          | stringField STRING,
          | complexField ARRAY<STRING>)
-         | USING org.apache.spark.sql.CarbonSource
-         | OPTIONS('tableName'='create_source_test2')
+         | USING carbondata
        """.stripMargin)
     checkExistence(sql("show tables"), true, "create_source_test")
     checkExistence(sql("show tables"), false, "create_source_test2")
@@ -233,21 +231,21 @@ class CarbonDataSourceSuite extends Spark2QueryTest with BeforeAndAfterAll {
   test("test to create bucket columns with int field") {
     sql("drop table if exists create_source")
     intercept[Exception] {
-      sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='1', 'bucketcolumns'='intField','tableName'='create_source')")
+      sql("create table create_source(intField int, stringField string, complexField array<string>) USING carbondata OPTIONS('bucketnumber'='1', 'bucketcolumns'='intField')")
     }
   }
 
   test("test to create bucket columns with complex data type field") {
     sql("drop table if exists create_source")
     intercept[Exception] {
-      sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='1', 'bucketcolumns'='complexField','tableName'='create_source')")
+      sql("create table create_source(intField int, stringField string, complexField array<string>) USING carbondata OPTIONS('bucketnumber'='1', 'bucketcolumns'='complexField')")
     }
   }
 
   test("test check results of table with complex data type and bucketing") {
     sql("drop table if exists create_source")
     sql("create table create_source(intField int, stringField string, complexField array<int>) " +
-        "USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='1', 'bucketcolumns'='stringField', 'tableName'='create_source')")
+        "USING carbondata OPTIONS('bucketnumber'='1', 'bucketcolumns'='stringField')")
     sql("insert into create_source values(1,'source',array(1,2,3))")
     checkAnswer(sql("select * from create_source"), Row(1,"source", mutable.WrappedArray.newBuilder[Int].+=(1,2,3)))
     sql("drop table if exists create_source")
@@ -260,7 +258,7 @@ class CarbonDataSourceSuite extends Spark2QueryTest with BeforeAndAfterAll {
          | CREATE TABLE carbon_test(
          |    stringField string,
          |    intField int)
-         | USING org.apache.spark.sql.CarbonSource
+         | USING carbondata
       """.
         stripMargin
     )
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 7239f49..156fb75 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -83,8 +83,8 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
            CREATE TABLE t9
            (ID Int, date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int)
-           USING org.apache.spark.sql.CarbonSource
-           OPTIONS("bucketnumber"="-1", "bucketcolumns"="name", "tableName"="t9")
+           USING carbondata
+           OPTIONS("bucketnumber"="-1", "bucketcolumns"="name")
         """)
       assert(false)
     }
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
index 3e6f402..74aa510 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
@@ -17,15 +17,11 @@
 
 package org.apache.spark.sql.common.util
 
-import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.test.util.QueryTest
 
 class Spark2QueryTest extends QueryTest {
-  val hiveClient = sqlContext
-    .sparkSession
-    .sessionState
-    .catalog
-    .externalCatalog
-    .asInstanceOf[HiveExternalCatalog]
-    .client
+
+  val hiveClient = CarbonToSparkAdapter.getHiveExternalCatalog(sqlContext.sparkSession).client
+
 }
\ No newline at end of file
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index b37c356..584f64b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -75,8 +75,7 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll {
          |    dateField date,
          |    charField char(5)
          | )
-         | USING org.apache.spark.sql.CarbonSource
-         | OPTIONS ('tableName' '$tableName')
+         | USING carbondata
        """.stripMargin)
     sql(
       s"""
diff --git a/pom.xml b/pom.xml
index e149727..a1854b4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -486,7 +486,7 @@
 
   <profiles>
     <profile>
-      <!--This profile does not build spark module, so user should explicitly give spark profile also like spark-2.1 -->
+      <!--This profile does not build spark module, so user should explicitly give spark profile -->
       <id>build-with-format</id>
       <modules>
         <module>format</module>
@@ -500,57 +500,12 @@
       </properties>
     </profile>
     <profile>
-      <id>spark-2.1</id>
-      <properties>
-        <spark.version>2.1.0</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.eluder.coveralls</groupId>
-            <artifactId>coveralls-maven-plugin</artifactId>
-            <version>4.3.0</version>
-            <configuration>
-              <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
-              <sourceEncoding>UTF-8</sourceEncoding>
-              <jacocoReports>
-                <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
-                </jacocoReport>
-              </jacocoReports>
-              <sourceDirectories>
-                <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.1</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
-              </sourceDirectories>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
-      <id>spark-2.2</id>
+      <id>spark-2.3</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
       <properties>
-        <spark.version>2.2.1</spark.version>
+        <spark.version>2.3.4</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>
@@ -573,12 +528,11 @@
                 <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.3</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/spark2.3</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
@@ -597,12 +551,9 @@
       </build>
     </profile>
     <profile>
-      <id>spark-2.3</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
+      <id>spark-2.4</id>
       <properties>
-        <spark.version>2.3.4</spark.version>
+        <spark.version>2.4.4</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>
@@ -625,11 +576,11 @@
                 <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.3</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.4</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/spark2.4</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>


Mime
View raw message