carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [6/6] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, compatability issues
Date Tue, 10 Jul 2018 15:01:15 GMT
[CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, compatability issues

Changes continued..
All compatability issues when supporting 2.3 addressed
Supported pom profile -P"spark-2.3"

This closes #2366


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d0fa5239
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d0fa5239
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d0fa5239

Branch: refs/heads/carbonstore
Commit: d0fa52396687ccc1a5d029006e7204771c04a9eb
Parents: 2b8ae26
Author: sandeep-katta <sandeep.katta2007@gmail.com>
Authored: Fri Jul 6 10:01:29 2018 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Tue Jul 10 20:29:41 2018 +0530

----------------------------------------------------------------------
 examples/spark2/pom.xml                         |  20 ++
 integration/spark-common-test/pom.xml           |  32 +++
 .../preaggregate/TestPreAggregateDrop.scala     |   1 +
 .../testsuite/bigdecimal/TestBigDecimal.scala   |  10 +-
 ...tCreateTableUsingSparkCarbonFileFormat.scala |  29 +-
 .../filterexpr/AllDataTypesTestCaseFilter.scala |   7 +-
 .../sql/commands/StoredAsCarbondataSuite.scala  |   3 +-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |   4 +-
 .../streaming/CarbonAppendableStreamSink.scala  |   2 +-
 .../apache/spark/sql/test/util/PlanTest.scala   |   5 -
 .../spark/util/CarbonReflectionUtils.scala      | 115 ++++++--
 .../scala/org/apache/spark/util/SparkUtil.scala |  36 ++-
 .../org/apache/spark/util/SparkUtilTest.scala   |  58 ++++
 integration/spark2/pom.xml                      |  81 ++++++
 .../apache/spark/sql/hive/CarbonAnalyzer.scala  |  51 ++++
 .../sql/hive/CarbonInMemorySessionState.scala   | 276 ++++++++++++++++++
 .../spark/sql/hive/CarbonOptimizerUtil.scala    |  44 +++
 .../spark/sql/hive/CarbonSessionState.scala     | 269 ++++++++++++++++++
 .../spark/sql/hive/CarbonSessionUtil.scala      |  96 +++++++
 .../apache/spark/sql/hive/CarbonSqlConf.scala   | 148 ++++++++++
 ...CreateCarbonSourceTableAsSelectCommand.scala | 130 +++++++++
 .../spark/sql/hive/SqlAstBuilderHelper.scala    | 110 ++++++++
 .../org/apache/spark/CarbonInputMetrics.scala   |   1 -
 .../spark/sql/CarbonCatalystOperators.scala     |  57 ++--
 .../org/apache/spark/sql/CarbonCountStar.scala  |  10 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   7 +-
 .../sql/CustomDeterministicExpression.scala     |  42 ---
 .../execution/BatchedDataSourceScanExec.scala   | 147 ----------
 .../management/CarbonLoadDataCommand.scala      |  19 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |  39 +--
 .../sql/execution/strategy/DDLStrategy.scala    |  17 +-
 .../spark/sql/hive/CarbonAnalysisRules.scala    |  17 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  26 +-
 .../sql/hive/CarbonPreAggregateRules.scala      | 147 +++++++---
 .../spark/sql/optimizer/CarbonFilters.scala     |  26 +-
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  16 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   4 +-
 .../spark/sql/CarbonToSparkAdapater.scala       |  68 +++++
 .../sql/CustomDeterministicExpression.scala     |  42 +++
 .../execution/BatchedDataSourceScanExec.scala   | 147 ++++++++++
 ...CreateCarbonSourceTableAsSelectCommand.scala |   2 -
 .../spark/sql/CarbonToSparkAdapater.scala       |  68 +++++
 .../sql/CustomDeterministicExpression.scala     |  42 +++
 .../execution/BatchedDataSourceScanExec.scala   | 147 ++++++++++
 .../apache/spark/sql/hive/CarbonAnalyzer.scala  |  51 ----
 .../sql/hive/CarbonInMemorySessionState.scala   | 276 ------------------
 .../apache/spark/sql/hive/CarbonOptimizer.scala |  44 +--
 .../spark/sql/hive/CarbonSessionState.scala     | 277 -------------------
 .../spark/sql/hive/CarbonSessionUtil.scala      |  97 -------
 .../spark/sql/hive/CarbonSqlAstBuilder.scala    |  85 +-----
 .../apache/spark/sql/hive/CarbonSqlConf.scala   | 148 ----------
 ...CreateCarbonSourceTableAsSelectCommand.scala | 122 --------
 .../spark/sql/CarbonToSparkAdapater.scala       |  63 +++++
 .../sql/CustomDeterministicExpression.scala     |  42 +++
 .../execution/BatchedDataSourceScanExec.scala   | 142 ++++++++++
 .../apache/spark/sql/hive/CarbonOptimizer.scala |  37 +++
 .../spark/sql/hive/CarbonSqlAstBuilder.scala    |  52 ++++
 .../stream/CarbonStreamRecordReaderTest.java    | 100 +++++++
 .../BooleanDataTypesFilterTest.scala            |   2 +-
 .../booleantype/BooleanDataTypesLoadTest.scala  |   4 +-
 .../bucketing/TableBucketingTestCase.scala      |  26 +-
 .../carbondata/query/SubQueryTestSuite.scala    |   1 -
 pom.xml                                         |  55 ++++
 .../streaming/CarbonStreamInputFormatTest.java  |  99 -------
 64 files changed, 2748 insertions(+), 1593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index 7a55333..c824bb4 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -77,6 +77,26 @@
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>
     </dependency>
+    <!-- in spark 2.3 spark catalyst added dependency on spark-core-->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-core</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <!-- need to Exclude net.jpountz jar from this project.
+         Spark has changed this jar to org.lz4:lz4-java
+         net.jpountz and org.lz4 has same class Name -->
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index b8629bf..a9c7748 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -126,12 +126,27 @@
       <artifactId>carbondata-lucene</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <!-- need to Exclude net.jpountz jar from this project.
+         Spark has changed this jar to org.lz4:lz4-java
+         net.jpountz and org.lz4 has same class Name -->
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-bloom</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
@@ -145,6 +160,23 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <!-- spark catalyst added runtime dependency on spark-core,so
+      while executing the testcases spark-core should be present else it
+      will fail to execute -->
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <!-- need to Exclude Avro jar from this project,spark core is using
+        the version 1.7.4 which is not compatible with Carbon -->
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index 2a9fdcd..189814b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -91,6 +91,7 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
     sql(
       "create datamap preagg2 on table maintable using 'preaggregate' as select" +
       " a,sum(c) from maintable group by a")
+    sql("drop table if exists maintable")
     checkExistence(sql("show tables").select("database", "tableName"), false, "defaultmaintable_preagg1", "defaultmaintable", "defaultmaintable_preagg2")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
index 937aee9..20c858d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
@@ -70,7 +70,6 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select min(salary) from carbonTable"),
       sql("select min(salary) from hiveTable"))
   }
-  
   test("test min datatype on big decimal column") {
     val output = sql("select min(salary) from carbonTable").collectAsList().get(0).get(0)
     assert(output.isInstanceOf[java.math.BigDecimal])
@@ -80,7 +79,6 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     val output = sql("select max(salary) from carbonTable").collectAsList().get(0).get(0)
     assert(output.isInstanceOf[java.math.BigDecimal])
   }
-  
   test("test count function on big decimal column") {
     checkAnswer(sql("select count(salary) from carbonTable"),
       sql("select count(salary) from hiveTable"))
@@ -149,8 +147,8 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test sum*10 aggregation on big decimal column with high precision") {
-    checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"),
-      sql("select sum(salary)*10 from hiveBigDecimal"))
+    checkAnswer(sql("select cast(sum(salary)*10 as double) from carbonBigDecimal_2"),
+      sql("select cast(sum(salary)*10 as double) from hiveBigDecimal"))
   }
 
   test("test sum/10 aggregation on big decimal column with high precision") {
@@ -164,8 +162,8 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test sum-distinct*10 aggregation on big decimal column with high precision") {
-    checkAnswer(sql("select sum(distinct(salary))*10 from carbonBigDecimal_2"),
-      sql("select sum(distinct(salary))*10 from hiveBigDecimal"))
+    checkAnswer(sql("select cast(sum(distinct(salary))*10 as decimal)from carbonBigDecimal_2"),
+      sql("select cast(sum(distinct(salary))*10 as decimal) from hiveBigDecimal"))
   }
 
   test("test sum-distinct/10 aggregation on big decimal column with high precision") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
index d7e500e..c62af8e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.util.SparkUtil
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -111,10 +112,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
     sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     //data source file format
-    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       //data source file format
       sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
-    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       sql(
         s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
@@ -159,10 +160,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
     sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     //data source file format
-    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       //data source file format
       sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
-    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       sql(
         s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
@@ -188,10 +189,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
     sql("DROP TABLE IF EXISTS sdkOutputTable")
 
 
-    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       //data source file format
       sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
-    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       sql(
         s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
@@ -221,10 +222,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
 
     val exception = intercept[org.apache.spark.SparkException] {
       //    data source file format
-      if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      if (SparkUtil.isSparkVersionEqualToX("2.1")) {
         //data source file format
         sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
-      } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
         //data source file format
         sql(
           s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
@@ -251,10 +252,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
 
     val exception = intercept[org.apache.spark.SparkException] {
       //data source file format
-      if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      if (SparkUtil.isSparkVersionEqualToX("2.1")) {
         //data source file format
         sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
-      } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
         //data source file format
         sql(
           s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
@@ -281,10 +282,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
     //data source file format
     sql("DROP TABLE IF EXISTS sdkOutputTable")
 
-    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       //data source file format
       sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
-    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       sql(
         s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
@@ -330,10 +331,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
     assert(new File(filePath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
 
-    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       //data source file format
       sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
-    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       sql(
         s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
index 15ac1f4..c0f0d06 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.spark.testsuite.filterexpr
 
+import org.apache.spark.sql.execution.BatchedDataSourceScanExec
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -56,13 +57,15 @@ class AllDataTypesTestCaseFilter extends QueryTest with BeforeAndAfterAll {
   test("verify like query ends with filter push down") {
     val df = sql("select * from alldatatypestableFilter where empname like '%nandh'").queryExecution
       .sparkPlan
-    assert(df.metadata.get("PushedFilters").get.contains("CarbonEndsWith"))
+    assert(df.asInstanceOf[BatchedDataSourceScanExec].metadata
+      .get("PushedFilters").get.contains("CarbonEndsWith"))
   }
 
   test("verify like query contains with filter push down") {
     val df = sql("select * from alldatatypestableFilter where empname like '%nand%'").queryExecution
       .sparkPlan
-    assert(df.metadata.get("PushedFilters").get.contains("CarbonContainsWith"))
+    assert(df.asInstanceOf[BatchedDataSourceScanExec].metadata
+      .get("PushedFilters").get.contains("CarbonContainsWith"))
   }
   
   override def afterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala
index 2029e93..6400ed1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala
@@ -87,7 +87,8 @@ class StoredAsCarbondataSuite extends QueryTest with BeforeAndAfterEach {
       sql("CREATE TABLE carbon_table(key INT, value STRING) STORED AS  ")
     } catch {
       case e: Exception =>
-        assert(e.getMessage.contains("no viable alternative at input"))
+        assert(e.getMessage.contains("no viable alternative at input") ||
+        e.getMessage.contains("mismatched input '<EOF>' expecting "))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 0e8f660..a6e16b2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -163,11 +163,11 @@ class StreamHandoffRDD[K, V](
     val format = new CarbonTableInputFormat[Array[Object]]()
     val model = format.createQueryModel(inputSplit, attemptContext)
     val inputFormat = new CarbonStreamInputFormat
-    val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
-      .asInstanceOf[RecordReader[Void, Any]]
     inputFormat.setVectorReader(false)
     inputFormat.setModel(model)
     inputFormat.setUseRawRow(true)
+    val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
+      .asInstanceOf[RecordReader[Void, Any]]
     streamReader.initialize(inputSplit, attemptContext)
     val iteratorList = new util.ArrayList[RawResultIterator](1)
     iteratorList.add(new StreamingRawResultIterator(streamReader))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 44f96bd..ec7dd92 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -127,7 +127,7 @@ class CarbonAppendableStreamSink(
         className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
         jobId = batchId.toString,
         outputPath = fileLogPath,
-        isAppend = false)
+        false)
 
       committer match {
         case manifestCommitter: ManifestFileCommitProtocol =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala
index 9883607..00156e9 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala
@@ -26,11 +26,6 @@ import org.apache.spark.sql.catalyst.util._
  */
 class PlanTest extends CarbonFunSuite {
 
-  /** Fails the test if the two expressions do not match */
-  protected def compareExpressions(e1: Expression, e2: Expression): Unit = {
-    comparePlans(Filter(e1, OneRowRelation), Filter(e2, OneRowRelation))
-  }
-
   /** Fails the test if the two plans do not match */
   protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
     val normalized1 = normalizeExprIds(plan1)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 4264aa1..b08eee3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -17,22 +17,25 @@
 
 package org.apache.spark.util
 
+import java.lang.reflect.Method
+
 import scala.reflect.runtime._
 import scala.reflect.runtime.universe._
 
-import org.apache.spark.SPARK_VERSION
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.{SPARK_VERSION, SparkContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.parser.AstBuilder
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
 
-import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
 /**
@@ -60,12 +63,12 @@ object CarbonReflectionUtils {
       tableIdentifier: TableIdentifier,
       tableAlias: Option[String] = None): UnresolvedRelation = {
     val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       createObject(
         className,
         tableIdentifier,
         tableAlias)._1.asInstanceOf[UnresolvedRelation]
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       createObject(
         className,
         tableIdentifier)._1.asInstanceOf[UnresolvedRelation]
@@ -78,13 +81,13 @@ object CarbonReflectionUtils {
       relation: LogicalPlan,
       view: Option[TableIdentifier]): SubqueryAlias = {
     val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias"
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       createObject(
         className,
         alias.getOrElse(""),
         relation,
         Option(view))._1.asInstanceOf[SubqueryAlias]
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       createObject(
         className,
         alias.getOrElse(""),
@@ -100,7 +103,7 @@ object CarbonReflectionUtils {
       overwrite: Boolean,
       ifPartitionNotExists: Boolean): InsertIntoTable = {
     val className = "org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable"
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       val overwriteOptions = createObject(
         "org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions",
         overwrite.asInstanceOf[Object], Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object]
@@ -111,7 +114,7 @@ object CarbonReflectionUtils {
         query,
         overwriteOptions,
         ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2") ) {
       createObject(
         className,
         table,
@@ -126,20 +129,28 @@ object CarbonReflectionUtils {
 
   def getLogicalRelation(relation: BaseRelation,
       expectedOutputAttributes: Seq[Attribute],
-      catalogTable: Option[CatalogTable]): LogicalRelation = {
+      catalogTable: Option[CatalogTable],
+      isStreaming: Boolean): LogicalRelation = {
     val className = "org.apache.spark.sql.execution.datasources.LogicalRelation"
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       createObject(
         className,
         relation,
         Some(expectedOutputAttributes),
         catalogTable)._1.asInstanceOf[LogicalRelation]
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionEqualToX("2.2")) {
       createObject(
         className,
         relation,
         expectedOutputAttributes,
         catalogTable)._1.asInstanceOf[LogicalRelation]
+    } else if (SparkUtil.isSparkVersionEqualToX("2.3")) {
+      createObject(
+        className,
+        relation,
+        expectedOutputAttributes,
+        catalogTable,
+        isStreaming.asInstanceOf[Object])._1.asInstanceOf[LogicalRelation]
     } else {
       throw new UnsupportedOperationException("Unsupported Spark version")
     }
@@ -182,27 +193,23 @@ object CarbonReflectionUtils {
   def getAstBuilder(conf: Object,
       sqlParser: Object,
       sparkSession: SparkSession): AstBuilder = {
-    if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) {
-      val className = sparkSession.sparkContext.conf.get(
-        CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME,
-        "org.apache.spark.sql.hive.CarbonSqlAstBuilder")
-      createObject(className,
-        conf,
-        sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    val className = sparkSession.sparkContext.conf.get(
+      CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME,
+      "org.apache.spark.sql.hive.CarbonSqlAstBuilder")
+    createObject(className,
+      conf,
+      sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
   }
 
   def getSessionState(sparkContext: SparkContext,
       carbonSession: Object,
       useHiveMetaStore: Boolean): Any = {
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       val className = sparkContext.conf.get(
         CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
         "org.apache.spark.sql.hive.CarbonSessionState")
       createObject(className, carbonSession)._1
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       if (useHiveMetaStore) {
         val className = sparkContext.conf.get(
           CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
@@ -224,12 +231,12 @@ object CarbonReflectionUtils {
   }
 
   def hasPredicateSubquery(filterExp: Expression) : Boolean = {
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualToX("2.1")) {
       val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.PredicateSubquery")
       val method = tuple.getMethod("hasPredicateSubquery", classOf[Expression])
       val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
       hasSubquery
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
       val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
       val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
@@ -247,6 +254,54 @@ object CarbonReflectionUtils {
     isFormatted
   }
 
+  def getRowDataSourceScanExecObj(relation: LogicalRelation,
+      output: Seq[Attribute],
+      pushedFilters: Seq[Filter],
+      handledFilters: Seq[Filter],
+      rdd: RDD[InternalRow],
+      partition: Partitioning,
+      metadata: Map[String, String]): RowDataSourceScanExec = {
+    val className = "org.apache.spark.sql.execution.RowDataSourceScanExec"
+    if (SparkUtil.isSparkVersionEqualToX("2.1") || SparkUtil.isSparkVersionEqualToX("2.2")) {
+      createObject(className, output, rdd, relation.relation,
+        partition, metadata,
+        relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      createObject(className, output, output.map(output.indexOf),
+        pushedFilters.toSet, handledFilters.toSet, rdd,
+        relation.relation,
+        relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
+  def invokewriteAndReadMethod(dataSourceObj: DataSource,
+      dataFrame: DataFrame,
+      data: LogicalPlan,
+      session: SparkSession,
+      mode: SaveMode,
+      query: LogicalPlan,
+      physicalPlan: SparkPlan): BaseRelation = {
+    if (SparkUtil.isSparkVersionEqualToX("2.2")) {
+      val method: Method = dataSourceObj.getClass
+        .getMethod("writeAndRead", classOf[SaveMode], classOf[DataFrame])
+      method.invoke(dataSourceObj, mode, dataFrame)
+        .asInstanceOf[BaseRelation]
+    } else if (SparkUtil.isSparkVersionEqualToX("2.3")) {
+      val method: Method = dataSourceObj.getClass
+        .getMethod("writeAndRead",
+          classOf[SaveMode],
+          classOf[LogicalPlan],
+          classOf[Seq[Attribute]],
+          classOf[SparkPlan])
+      method.invoke(dataSourceObj, mode, query, query.output, physicalPlan)
+        .asInstanceOf[BaseRelation]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
   def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
     val clazz = Utils.classForName(className)
     val ctor = clazz.getConstructors.head

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
index 4635fc7..cd0965b 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
@@ -17,15 +17,7 @@
 
 package org.apache.spark.util
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{SparkContext, TaskContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD}
-
-import org.apache.carbondata.processing.loading.csvinput.BlockDetails
+import org.apache.spark.{SPARK_VERSION, TaskContext}
 
 /*
  * this object use to handle file splits
@@ -39,4 +31,30 @@ object SparkUtil {
     }
   }
 
+  /**
+   * Utility method to compare the Spark Versions.
+   * This API ignores the sub-version and compares with only major version
+   * Version passed should be of format x.y  e.g 2.2 ,2.3 , SPARK_VERSION
+   * will be of format x.y.z e.g 2.3.0,2.2.1
+   */
+  def isSparkVersionXandAbove(xVersion: String, isEqualComparision: Boolean = false): Boolean = {
+    val tmpArray = SPARK_VERSION.split("\\.")
+    // convert to float
+    val sparkVersion = if (tmpArray.length >= 2) {
+      (tmpArray(0) + "." + tmpArray(1)).toFloat
+    } else {
+      (tmpArray(0) + ".0").toFloat
+    }
+    // compare the versions
+    if (isEqualComparision) {
+      sparkVersion == xVersion.toFloat
+    } else {
+      sparkVersion >= xVersion.toFloat
+    }
+  }
+
+  def isSparkVersionEqualToX(xVersion: String): Boolean = {
+    isSparkVersionXandAbove(xVersion, true)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala b/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala
new file mode 100644
index 0000000..9de1c08
--- /dev/null
+++ b/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SPARK_VERSION
+import org.scalatest.FunSuite
+
+class SparkUtilTest extends FunSuite{
+
+  test("Test Spark Version API with X and Above") {
+    if (SPARK_VERSION.startsWith("2.1")) {
+      assert(SparkUtil.isSparkVersionXandAbove("2.1"))
+      assert(!SparkUtil.isSparkVersionXandAbove("2.2"))
+      assert(!SparkUtil.isSparkVersionXandAbove("2.3"))
+    } else if (SPARK_VERSION.startsWith("2.2")) {
+      assert(SparkUtil.isSparkVersionXandAbove("2.1"))
+      assert(SparkUtil.isSparkVersionXandAbove("2.2"))
+      assert(!SparkUtil.isSparkVersionXandAbove("2.3"))
+    } else {
+      assert(SparkUtil.isSparkVersionXandAbove("2.1"))
+      assert(SparkUtil.isSparkVersionXandAbove("2.2"))
+      assert(SparkUtil.isSparkVersionXandAbove("2.3"))
+      assert(!SparkUtil.isSparkVersionXandAbove("2.4"))
+    }
+  }
+
+  test("Test Spark Version API Equal to X") {
+    if (SPARK_VERSION.startsWith("2.1")) {
+      assert(SparkUtil.isSparkVersionEqualToX("2.1"))
+      assert(!SparkUtil.isSparkVersionEqualToX("2.2"))
+      assert(!SparkUtil.isSparkVersionEqualToX("2.3"))
+    } else if (SPARK_VERSION.startsWith("2.2")) {
+      assert(!SparkUtil.isSparkVersionEqualToX("2.1"))
+      assert(SparkUtil.isSparkVersionEqualToX("2.2"))
+      assert(!SparkUtil.isSparkVersionEqualToX("2.3"))
+    } else {
+      assert(!SparkUtil.isSparkVersionEqualToX("2.1"))
+      assert(!SparkUtil.isSparkVersionEqualToX("2.2"))
+      assert(SparkUtil.isSparkVersionEqualToX("2.3"))
+      assert(!SparkUtil.isSparkVersionEqualToX("2.4"))
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 6b05800..d72cec2 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -53,11 +53,26 @@
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-lucene</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <!-- need to Exclude net.jpountz jar from this project.
+         Spark has changed this jar to org.lz4:lz4-java
+         net.jpountz and org.lz4 has same class Name -->
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-bloom</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -74,6 +89,25 @@
       <scope>${spark.deps.scope}</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <exclusions>
+        <!-- from transitive dependency com.univocity:univocity-parsers:2.5.9
+        is added from the org.apache.spark:spark-sql_2.11,so need to remove
+        this version.Carbon uses 2.2.1 version  -->
+        <exclusion>
+          <groupId>com.univocity</groupId>
+          <artifactId>univocity-parsers</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -232,6 +266,8 @@
             <configuration>
               <excludes>
                 <exclude>src/main/spark2.2</exclude>
+                <exclude>src/main/spark2.3</exclude>
+                <exclude>src/main/commonTo2.2And2.3</exclude>
               </excludes>
             </configuration>
           </plugin>
@@ -275,6 +311,8 @@
             <configuration>
               <excludes>
                 <exclude>src/main/spark2.1</exclude>
+                <exclude>src/main/spark2.3</exclude>
+                <exclude>src/main/commonTo2.2And2.3</exclude>
               </excludes>
             </configuration>
           </plugin>
@@ -292,6 +330,49 @@
                 <configuration>
                   <sources>
                     <source>src/main/spark2.2</source>
+                    <source>src/main/commonTo2.2And2.3</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.3</id>
+      <properties>
+        <spark.version>2.3.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.1</exclude>
+                <exclude>src/main/spark2.2</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.3</source>
+                    <source>src/main/commonTo2.2And2.3</source>
                   </sources>
                 </configuration>
               </execution>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
new file mode 100644
index 0000000..dfb89fd
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.CarbonReflectionUtils
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+    conf: SQLConf,
+    sparkSession: SparkSession,
+    analyzer: Analyzer) extends Analyzer(catalog, conf) {
+
+  val mvPlan = try {
+    CarbonReflectionUtils.createObject(
+      "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
+      sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
+  } catch {
+    case e: Exception =>
+      null
+  }
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    var logicalPlan = analyzer.execute(plan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
+    logicalPlan = CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+    if (mvPlan != null) {
+      mvPlan.apply(logicalPlan)
+    } else {
+      logicalPlan
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
new file mode 100644
index 0000000..8b8d27c
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class InMemorySessionCatalog(
+    externalCatalog: ExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    functionRegistry: FunctionRegistry,
+    sparkSession: SparkSession,
+    conf: SQLConf,
+    hadoopConf: Configuration,
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
+  extends SessionCatalog(
+    externalCatalog,
+    globalTempViewManager,
+    functionRegistry,
+    conf,
+    hadoopConf,
+    parser,
+    functionResourceLoader
+  ) with CarbonSessionCatalog {
+
+  override def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit = {
+    sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier)
+  }
+
+  override def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    // NOt Required in case of In-memory catalog
+  }
+
+  override def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val structType = catalogTable.schema
+    var newStructType = structType
+    newColumns.get.foreach {cols =>
+      newStructType = structType
+        .add(cols.getColumnName, CarbonScalaUtil.convertCarbonToSparkDataType(cols.getDataType))
+    }
+    alterSchema(newStructType, catalogTable, tableIdentifier)
+  }
+
+  override def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val fields = catalogTable.schema.fields.filterNot { field =>
+      dropCols.get.exists { col =>
+        col.getColumnName.equalsIgnoreCase(field.name)
+      }
+    }
+    alterSchema(new StructType(fields), catalogTable, tableIdentifier)
+  }
+
+  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val a = catalogTable.schema.fields.flatMap { field =>
+      columns.get.map { col =>
+        if (col.getColumnName.equalsIgnoreCase(field.name)) {
+          StructField(col.getColumnName,
+            CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType))
+        } else {
+          field
+        }
+      }
+    }
+    alterSchema(new StructType(a), catalogTable, tableIdentifier)
+  }
+
+  private def alterSchema(structType: StructType,
+      catalogTable: CatalogTable,
+      tableIdentifier: TableIdentifier): Unit = {
+    val copy = catalogTable.copy(schema = structType)
+    sparkSession.sessionState.catalog.alterTable(copy)
+    sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+  }
+
+  lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+
+  def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
+  def getThriftTableInfo(tablePath: String): TableInfo = {
+    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+    CarbonUtil.readSchemaFile(tableMetadataFile)
+  }
+
+  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name)
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+    if (isRelationRefreshed) {
+      super.lookupRelation(name)
+    } else {
+      rtnRelation
+    }
+  }
+
+  /**
+   * returns hive client from HiveExternalCatalog
+   *
+   * @return
+   */
+  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    null
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
+  }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  override def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
+}
+
+class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends SessionStateBuilder(sparkSession, parentState) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy,
+      new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule,
+    new CarbonLateDecodeRule)
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  override protected lazy val catalog: InMemorySessionCatalog = {
+    val catalog = new InMemorySessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      sparkSession,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  private def externalCatalog: ExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog]
+
+  override protected lazy val resourceLoader: SessionResourceLoader = {
+    new SessionResourceLoader(session)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+    new Analyzer(catalog, conf) {
+      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+        new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        new CarbonIUDAnalysisRule(sparkSession) +:
+        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+        PreWriteCheck :: HiveOnlyCheck :: Nil
+      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+        PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        customPostHocResolutionRules
+    }
+  )
+  override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _)
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
new file mode 100644
index 0000000..72d3ae2
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
@@ -0,0 +1,44 @@
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+object CarbonOptimizerUtil {
+  def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
+    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+    // optimize whole plan at once.
+    val transFormedPlan = plan.transform {
+      case filter: Filter =>
+        filter.transformExpressions {
+          case s: ScalarSubquery =>
+            val tPlan = s.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            ScalarSubquery(tPlan, s.children, s.exprId)
+          case e: Exists =>
+            val tPlan = e.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
+
+          case In(value, Seq(l:ListQuery)) =>
+            val tPlan = l.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            In(value, Seq(ListQuery(tPlan, l.children, l.exprId)))
+        }
+    }
+    transFormedPlan
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..f3168d7
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{SQLConf, SessionState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonHiveSessionCatalog(
+    externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    functionRegistry: FunctionRegistry,
+    sparkSession: SparkSession,
+    conf: SQLConf,
+    hadoopConf: Configuration,
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
+  extends HiveSessionCatalog (
+    externalCatalog,
+    globalTempViewManager,
+    new HiveMetastoreCatalog(sparkSession),
+    functionRegistry,
+    conf,
+    hadoopConf,
+    parser,
+    functionResourceLoader
+  ) with CarbonSessionCatalog {
+
+  private lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+  /**
+   * return's the carbonEnv instance
+   * @return
+   */
+  override def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
+  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name)
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+    if (isRelationRefreshed) {
+      super.lookupRelation(name)
+    } else {
+      rtnRelation
+    }
+  }
+
+  /**
+   * returns hive client from HiveExternalCatalog
+   *
+   * @return
+   */
+  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+      .asInstanceOf[HiveExternalCatalog].client
+  }
+
+  def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit = {
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
+      s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
+      s"SET SERDEPROPERTIES" +
+      s"('tableName'='${ newTableIdentifier.table }', " +
+      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
+  }
+
+  override def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    getClient()
+      .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
+                  s"SET TBLPROPERTIES(${ schemaParts })")
+  }
+
+  override def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
+  }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  override def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ *
+ * @param sparkSession
+ */
+class CarbonSessionStateBuilder(sparkSession: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends HiveSessionStateBuilder(sparkSession, parentState) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(new StreamingTableStrategy(sparkSession),
+        new CarbonLateDecodeStrategy,
+        new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule,
+    new CarbonLateDecodeRule)
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  /**
+   * Create a [[CarbonSessionStateBuilder]].
+   */
+  override protected lazy val catalog: CarbonHiveSessionCatalog = {
+    val catalog = new CarbonHiveSessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      sparkSession,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  private def externalCatalog: HiveExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+
+  /**
+   * Create a Hive aware resource loader.
+   */
+  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
+    val client: HiveClient = externalCatalog.client.newSession()
+    new HiveSessionResourceLoader(session, client)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+    new Analyzer(catalog, conf) {
+
+      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+        new ResolveHiveSerdeTable(session) +:
+        new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        new CarbonIUDAnalysisRule(sparkSession) +:
+        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+
+      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+      PreWriteCheck :: HiveOnlyCheck :: Nil
+
+      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+        new DetermineTableStats(session) +:
+        RelationConversions(conf, catalog) +:
+        PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        HiveAnalysis +:
+        customPostHocResolutionRules
+    }
+  )
+
+  override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
new file mode 100644
index 0000000..1a22e99
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.util.CarbonReflectionUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * This class refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table.
+ */
+object CarbonSessionUtil {
+
+  val LOGGER = LogServiceFactory.getLogService("CarbonSessionUtil")
+
+  /**
+   * The method refreshes the cache entry
+   *
+   * @param rtnRelation [[LogicalPlan]] represents the given table or view.
+   * @param name        tableName
+   * @param sparkSession
+   * @return
+   */
+  def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    var isRelationRefreshed = false
+    rtnRelation match {
+      case SubqueryAlias(_,
+      MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)
+      ) =>
+        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case SubqueryAlias(_, relation) if
+      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+      relation.getClass.getName
+        .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+      relation.getClass.getName.equals(
+        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation"
+      ) =>
+        val catalogTable =
+          CarbonReflectionUtils.getFieldOfCatalogTable(
+            "tableMeta",
+            relation
+          ).asInstanceOf[CatalogTable]
+        isRelationRefreshed =
+          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+      case _ =>
+    }
+    isRelationRefreshed
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   *
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def prunePartitionsByFilter(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
+    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+    ExternalCatalogUtils.prunePartitionsByFilter(
+      sparkSession.sessionState.catalog.getTableMetadata(identifier),
+      allPartitions,
+      partitionFilters,
+      sparkSession.sessionState.conf.sessionLocalTimeZone
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0fa5239/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
new file mode 100644
index 0000000..2128ffd
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.buildConf
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+  val carbonProperties = CarbonProperties.getInstance()
+
+  /**
+   * To initialize dynamic param defaults along with usage docs
+   */
+  def addDefaultCarbonParams(): Unit = {
+    val ENABLE_UNSAFE_SORT =
+      buildConf(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+        .doc("To enable/ disable unsafe sort.")
+        .booleanConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+      buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+        .doc("To set carbon task distribution.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    val BAD_RECORDS_LOGGER_ENABLE =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+        .doc("To enable/ disable carbon bad record logger.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    val BAD_RECORDS_ACTION =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+        .doc("To configure the bad records action.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    val IS_EMPTY_DATA_BAD_RECORD =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+        .doc("Property to decide weather empty data to be considered bad/ good record.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+          .toBoolean)
+    val SORT_SCOPE =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+        .doc("Property to specify sort scope.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    val BATCH_SORT_SIZE_INMB =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+        .doc("Property to specify batch sort size in MB.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    val SINGLE_PASS =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+        .doc("Property to enable/disable single_pass.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    val BAD_RECORD_PATH =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+        .doc("Property to configure the bad record location.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    val GLOBAL_SORT_PARTITIONS =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+        .doc("Property to configure the global sort partitions.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    val DATEFORMAT =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+        .doc("Property to configure data format for date type columns.")
+        .stringConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+    val CARBON_INPUT_SEGMENTS = buildConf(
+      "carbon.input.segments.<database_name>.<table_name>")
+      .doc("Property to configure the list of segments to query.").stringConf
+      .createWithDefault(carbonProperties
+        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
+  }
+  /**
+   * to set the dynamic properties default values
+   */
+  def addDefaultCarbonSessionParams(): Unit = {
+    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+      carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+}


Mime
View raw message