carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [6/8] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version(Make API changes in carbon to be compatible with spark 2.3)
Date Wed, 05 Sep 2018 12:40:14 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala
new file mode 100644
index 0000000..0b3d35b
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.{SPARK_VERSION, TaskContext}
+
+/*
+ * this object use to handle file splits
+ */
+object SparkUtil {
+
+  def setTaskContext(context: TaskContext): Unit = {
+    val localThreadContext = TaskContext.get()
+    if (localThreadContext == null) {
+      TaskContext.setTaskContext(context)
+    }
+  }
+
+  /**
+   * Utility method to compare the Spark Versions.
+   * This API ignores the sub-version and compares with only major version
+   * Version passed should be of format x.y  e.g 2.2 ,2.3 , SPARK_VERSION
+   * will be of format x.y.z e.g 2.3.0,2.2.1
+   */
+  def isSparkVersionXandAbove(xVersion: String, isEqualComparision: Boolean = false): Boolean = {
+    val tmpArray = SPARK_VERSION.split("\\.")
+    // convert to float
+    val sparkVersion = if (tmpArray.length >= 2) {
+      (tmpArray(0) + "." + tmpArray(1)).toFloat
+    } else {
+      (tmpArray(0) + ".0").toFloat
+    }
+    // compare the versions
+    if (isEqualComparision) {
+      sparkVersion == xVersion.toFloat
+    } else {
+      sparkVersion >= xVersion.toFloat
+    }
+  }
+
+  def isSparkVersionEqualTo(xVersion: String): Boolean = {
+    isSparkVersionXandAbove(xVersion, true)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
new file mode 100644
index 0000000..f39bc93
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import java.math.BigInteger;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Adapter class which handles the columnar vector reading of the carbondata
+ * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+ * handles the complexity of spark 2.1 version related api changes since
+ * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+ */
+public class CarbonVectorProxy {
+
+    private ColumnarBatch columnarBatch;
+
+    /**
+     * Adapter class which handles the columnar vector reading of the carbondata
+     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+     * handles the complexity of spark 2.3 version related api changes since
+     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+     *
+     * @param memMode       which represent the type onheap or offheap vector.
+     * @param rowNum        rows number for vector reading
+     * @param structFileds, metadata related to current schema of table.
+     */
+    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
+        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
+    }
+
+    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
+        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
+    }
+
+    public ColumnVector getColumnVector(int ordinal) {
+        return columnarBatch.column(ordinal);
+    }
+
+    /**
+     * Sets the number of rows in this batch.
+     */
+    public void setNumRows(int numRows) {
+        columnarBatch.setNumRows(numRows);
+    }
+
+    public Object reserveDictionaryIds(int capacity , int ordinal) {
+        return columnarBatch.column(ordinal).reserveDictionaryIds(capacity);
+    }
+
+    /**
+     * Returns the number of rows for read, including filtered rows.
+     */
+    public int numRows() {
+        return columnarBatch.capacity();
+    }
+
+    public void setDictionary(Object dictionary, int ordinal) {
+        if (dictionary instanceof Dictionary) {
+            columnarBatch.column(ordinal).setDictionary((Dictionary) dictionary);
+        } else {
+            columnarBatch.column(ordinal).setDictionary(null);
+        }
+    }
+
+    public void putNull(int rowId, int ordinal) {
+        columnarBatch.column(ordinal).putNull(rowId);
+    }
+
+    public void putNulls(int rowId, int count, int ordinal) {
+        columnarBatch.column(ordinal).putNulls(rowId, count);
+    }
+
+    /**
+     * Called to close all the columns in this batch. It is not valid to access the data after
+     * calling this. This must be called at the end to clean up memory allocations.
+     */
+    public void close() {
+        columnarBatch.close();
+    }
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public InternalRow getRow(int rowId) {
+        return columnarBatch.getRow(rowId);
+    }
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public Object getColumnarBatch() {
+        return columnarBatch;
+    }
+
+    public void resetDictionaryIds(int ordinal) {
+        columnarBatch.column(ordinal).getDictionaryIds().reset();
+    }
+
+    /**
+     * This API will return a columnvector from a batch of column vector rows
+     * based on the ordinal
+     *
+     * @param ordinal
+     * @return
+     */
+    public ColumnVector column(int ordinal) {
+        return columnarBatch.column(ordinal);
+    }
+
+    public boolean hasDictionary(int ordinal) {
+        return columnarBatch.column(ordinal).hasDictionary();
+    }
+
+    /**
+     * Resets this column for writing. The currently stored values are no longer accessible.
+     */
+    public void reset() {
+        columnarBatch.reset();
+    }
+
+    public void putRowToColumnBatch(int rowId, Object value, int offset) {
+        org.apache.spark.sql.types.DataType t = dataType(offset);
+        if (null == value) {
+            putNull(rowId, offset);
+        } else {
+            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+                putBoolean(rowId, (boolean) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+                putByte(rowId, (byte) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+                putShort(rowId, (short) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+                putInt(rowId, (int) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+                putLong(rowId, (long) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+                putFloat(rowId, (float) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+                putDouble(rowId, (double) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+                UTF8String v = (UTF8String) value;
+                putByteArray(rowId, v.getBytes(), offset);
+            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+                DecimalType dt = (DecimalType) t;
+                Decimal d = Decimal.fromDecimal(value);
+                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+                    putInt(rowId, (int) d.toUnscaledLong(), offset);
+                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+                    putLong(rowId, d.toUnscaledLong(), offset);
+                } else {
+                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+                    byte[] bytes = integer.toByteArray();
+                    putByteArray(rowId, bytes, 0, bytes.length, offset);
+                }
+            } else if (t instanceof CalendarIntervalType) {
+                CalendarInterval c = (CalendarInterval) value;
+                columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
+                columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
+            } else if (t instanceof org.apache.spark.sql.types.DateType) {
+                putInt(rowId, (int) value, offset);
+            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+                putLong(rowId, (long) value, offset);
+            }
+        }
+    }
+
+    public void putBoolean(int rowId, boolean value, int ordinal) {
+        columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
+    }
+
+    public void putByte(int rowId, byte value, int ordinal) {
+        columnarBatch.column(ordinal).putByte(rowId, (byte) value);
+    }
+
+    public void putShort(int rowId, short value, int ordinal) {
+        columnarBatch.column(ordinal).putShort(rowId, (short) value);
+    }
+
+    public void putInt(int rowId, int value, int ordinal) {
+        columnarBatch.column(ordinal).putInt(rowId, (int) value);
+    }
+
+    public void putFloat(int rowId, float value, int ordinal) {
+        columnarBatch.column(ordinal).putFloat(rowId, (float) value);
+    }
+
+    public void putLong(int rowId, long value, int ordinal) {
+        columnarBatch.column(ordinal).putLong(rowId, (long) value);
+    }
+
+    public void putDouble(int rowId, double value, int ordinal) {
+        columnarBatch.column(ordinal).putDouble(rowId, (double) value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int ordinal) {
+        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
+    }
+
+    public void putInts(int rowId, int count, int value, int ordinal) {
+        columnarBatch.column(ordinal).putInts(rowId, count, value);
+    }
+
+    public void putShorts(int rowId, int count, short value, int ordinal) {
+        columnarBatch.column(ordinal).putShorts(rowId, count, value);
+    }
+
+    public void putLongs(int rowId, int count, long value, int ordinal) {
+        columnarBatch.column(ordinal).putLongs(rowId, count, value);
+    }
+
+    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
+        columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
+
+    }
+
+    public void putDoubles(int rowId, int count, double value, int ordinal) {
+        columnarBatch.column(ordinal).putDoubles(rowId, count, value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
+        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
+    }
+
+    public boolean isNullAt(int rowId, int ordinal) {
+        return columnarBatch
+                .column(ordinal).isNullAt(rowId);
+    }
+
+    public DataType dataType(int ordinal) {
+        return columnarBatch.column(ordinal).dataType();
+    }
+
+    public void putNotNull(int rowId, int ordinal) {
+        columnarBatch.column(ordinal).putNotNull(rowId);
+    }
+
+    public void putNotNulls(int rowId, int count, int ordinal) {
+        columnarBatch.column(ordinal).putNotNulls(rowId, count);
+    }
+
+    public void putDictionaryInt(int rowId, int value, int ordinal) {
+        columnarBatch.column(ordinal).getDictionaryIds().putInt(rowId, (int) value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
new file mode 100644
index 0000000..0f23294
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import java.math.BigInteger;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.Dictionary;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Adapter class which handles the columnar vector reading of the carbondata
+ * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+ * handles the complexity of spark 2.3 version related api changes since
+ * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+ */
+public class CarbonVectorProxy {
+
+    private ColumnarBatch columnarBatch;
+    private WritableColumnVector[] columnVectors;
+
+    /**
+     * Adapter class which handles the columnar vector reading of the carbondata
+     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+     * handles the complexity of spark 2.3 version related api changes since
+     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+     *
+     * @param memMode       which represent the type onheap or offheap vector.
+     * @param rowNum        rows number for vector reading
+     * @param structFileds, metadata related to current schema of table.
+     */
+    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
+        columnVectors = ColumnVectorFactory
+                .getColumnVector(memMode, new StructType(structFileds), rowNum);
+        columnarBatch = new ColumnarBatch(columnVectors);
+        columnarBatch.setNumRows(rowNum);
+    }
+
+    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
+        columnVectors = ColumnVectorFactory
+                .getColumnVector(memMode, outputSchema, rowNum);
+        columnarBatch = new ColumnarBatch(columnVectors);
+        columnarBatch.setNumRows(rowNum);
+    }
+
+    /**
+     * Returns the number of rows for read, including filtered rows.
+     */
+    public int numRows() {
+        return columnarBatch.numRows();
+    }
+
+    public Object reserveDictionaryIds(int capacity, int ordinal) {
+        return columnVectors[ordinal].reserveDictionaryIds(capacity);
+    }
+
+    /**
+     * This API will return a columnvector from a batch of column vector rows
+     * based on the ordinal
+     *
+     * @param ordinal
+     * @return
+     */
+    public WritableColumnVector column(int ordinal) {
+        return (WritableColumnVector) columnarBatch.column(ordinal);
+    }
+
+    public WritableColumnVector getColumnVector(int ordinal) {
+        return columnVectors[ordinal];
+    }
+
+    /**
+     * Resets this column for writing. The currently stored values are no longer accessible.
+     */
+    public void reset() {
+        for (WritableColumnVector col : columnVectors) {
+            col.reset();
+        }
+    }
+
+    public void resetDictionaryIds(int ordinal) {
+        columnVectors[ordinal].getDictionaryIds().reset();
+    }
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public InternalRow getRow(int rowId) {
+        return columnarBatch.getRow(rowId);
+    }
+
+
+    /**
+     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+     */
+    public Object getColumnarBatch() {
+        return columnarBatch;
+    }
+
+    /**
+     * Called to close all the columns in this batch. It is not valid to access the data after
+     * calling this. This must be called at the end to clean up memory allocations.
+     */
+    public void close() {
+        columnarBatch.close();
+    }
+
+    /**
+     * Sets the number of rows in this batch.
+     */
+    public void setNumRows(int numRows) {
+        columnarBatch.setNumRows(numRows);
+    }
+
+    public void putRowToColumnBatch(int rowId, Object value, int offset) {
+        org.apache.spark.sql.types.DataType t = dataType(offset);
+        if (null == value) {
+            putNull(rowId, offset);
+        } else {
+            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+                putBoolean(rowId, (boolean) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+                putByte(rowId, (byte) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+                putShort(rowId, (short) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+                putInt(rowId, (int) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+                putLong(rowId, (long) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+                putFloat(rowId, (float) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+                putDouble(rowId, (double) value, offset);
+            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+                UTF8String v = (UTF8String) value;
+                putByteArray(rowId, v.getBytes(), offset);
+            } else if (t instanceof DecimalType) {
+                DecimalType dt = (DecimalType) t;
+                Decimal d = Decimal.fromDecimal(value);
+                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+                    putInt(rowId, (int) d.toUnscaledLong(), offset);
+                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+                    putLong(rowId, d.toUnscaledLong(), offset);
+                } else {
+                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+                    byte[] bytes = integer.toByteArray();
+                    putByteArray(rowId, bytes, 0, bytes.length, offset);
+                }
+            } else if (t instanceof CalendarIntervalType) {
+                CalendarInterval c = (CalendarInterval) value;
+                columnVectors[offset].getChild(0).putInt(rowId, c.months);
+                columnVectors[offset].getChild(1).putLong(rowId, c.microseconds);
+            } else if (t instanceof org.apache.spark.sql.types.DateType) {
+                putInt(rowId, (int) value, offset);
+            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+                putLong(rowId, (long) value, offset);
+            }
+        }
+    }
+
+    public void putBoolean(int rowId, boolean value, int ordinal) {
+        columnVectors[ordinal].putBoolean(rowId, (boolean) value);
+    }
+
+    public void putByte(int rowId, byte value, int ordinal) {
+        columnVectors[ordinal].putByte(rowId, (byte) value);
+    }
+
+    public void putShort(int rowId, short value, int ordinal) {
+        columnVectors[ordinal].putShort(rowId, (short) value);
+    }
+
+    public void putInt(int rowId, int value, int ordinal) {
+        columnVectors[ordinal].putInt(rowId, (int) value);
+    }
+
+    public void putDictionaryInt(int rowId, int value, int ordinal) {
+        columnVectors[ordinal].getDictionaryIds().putInt(rowId, (int) value);
+    }
+
+    public void putFloat(int rowId, float value, int ordinal) {
+        columnVectors[ordinal].putFloat(rowId, (float) value);
+    }
+
+    public void putLong(int rowId, long value, int ordinal) {
+        columnVectors[ordinal].putLong(rowId, (long) value);
+    }
+
+    public void putDouble(int rowId, double value, int ordinal) {
+        columnVectors[ordinal].putDouble(rowId, (double) value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int ordinal) {
+        columnVectors[ordinal].putByteArray(rowId, (byte[]) value);
+    }
+
+    public void putInts(int rowId, int count, int value, int ordinal) {
+        columnVectors[ordinal].putInts(rowId, count, value);
+    }
+
+    public void putShorts(int rowId, int count, short value, int ordinal) {
+        columnVectors[ordinal].putShorts(rowId, count, value);
+    }
+
+    public void putLongs(int rowId, int count, long value, int ordinal) {
+        columnVectors[ordinal].putLongs(rowId, count, value);
+    }
+
+    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
+        columnVectors[ordinal].putDecimal(rowId, value, precision);
+
+    }
+
+    public void putDoubles(int rowId, int count, double value, int ordinal) {
+        columnVectors[ordinal].putDoubles(rowId, count, value);
+    }
+
+    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
+        columnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length);
+    }
+
+    public void putNull(int rowId, int ordinal) {
+        columnVectors[ordinal].putNull(rowId);
+    }
+
+    public void putNulls(int rowId, int count, int ordinal) {
+        columnVectors[ordinal].putNulls(rowId, count);
+    }
+
+    public void putNotNull(int rowId, int ordinal) {
+        columnVectors[ordinal].putNotNull(rowId);
+    }
+
+    public void putNotNulls(int rowId, int count, int ordinal) {
+        columnVectors[ordinal].putNotNulls(rowId, count);
+    }
+
+    public boolean isNullAt(int rowId, int ordinal) {
+        return columnVectors[ordinal].isNullAt(rowId);
+    }
+
+    public boolean hasDictionary(int ordinal) {
+        return columnVectors[ordinal].hasDictionary();
+    }
+
+    public void setDictionary(Object dictionary, int ordinal) {
+        if (dictionary instanceof Dictionary) {
+            columnVectors[ordinal].setDictionary((Dictionary) dictionary);
+        } else {
+            columnVectors[ordinal].setDictionary(null);
+        }
+    }
+
+    public DataType dataType(int ordinal) {
+        return columnVectors[ordinal].dataType();
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
new file mode 100644
index 0000000..6fe5ede
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class ColumnVectorFactory {
+
+
+    public static WritableColumnVector[] getColumnVector(MemoryMode memMode, StructType outputSchema, int rowNums) {
+
+
+        WritableColumnVector[] writableColumnVectors = null;
+        switch (memMode) {
+            case ON_HEAP:
+                writableColumnVectors = OnHeapColumnVector
+                        .allocateColumns(rowNums, outputSchema);
+                break;
+            case OFF_HEAP:
+                writableColumnVectors = OffHeapColumnVector
+                        .allocateColumns(rowNums, outputSchema);
+                break;
+        }
+        return writableColumnVectors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 12b5cbc..a0c4a0b 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.commons.io.FileUtils
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
+import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
@@ -110,10 +111,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     //data source file format
-    if (spark.sparkContext.version.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       //data source file format
       spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """)
-    } else if (spark.sparkContext.version.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
@@ -158,10 +159,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     //data source file format
-    if (spark.sparkContext.version.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       //data source file format
       spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """)
-    } else if (spark.sparkContext.version.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
@@ -180,7 +181,6 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
     cleanTestData()
   }
 
-
   // TODO: Make the sparkCarbonFileFormat to work without index file
   test("Read sdk writer output file without Carbondata file should fail") {
     buildTestData(false)
@@ -190,10 +190,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
 
     val exception = intercept[Exception] {
       //    data source file format
-      if (spark.sparkContext.version.startsWith("2.1")) {
+      if (SparkUtil.isSparkVersionEqualTo("2.1")) {
         //data source file format
         spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """)
-      } else if (spark.sparkContext.version.startsWith("2.2")) {
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
         //data source file format
         spark.sql(
           s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
@@ -220,10 +220,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
 
     val exception = intercept[Exception] {
       //data source file format
-      if (spark.sparkContext.version.startsWith("2.1")) {
+      if (SparkUtil.isSparkVersionEqualTo("2.1")) {
         //data source file format
         spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """)
-      } else if (spark.sparkContext.version.startsWith("2.2")) {
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
         //data source file format
         spark.sql(
           s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
@@ -250,10 +250,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
     //data source file format
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
-    if (spark.sparkContext.version.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       //data source file format
       spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """)
-    } else if (spark.sparkContext.version.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
@@ -299,10 +299,10 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
     assert(new File(filePath).exists())
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
-    if (spark.sparkContext.version.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       //data source file format
       spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$filePath') """)
-    } else if (spark.sparkContext.version.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable USING carbon LOCATION

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 24af8ec..5af4fbe 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -54,11 +54,26 @@
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-lucene</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <!-- need to Exclude net.jpountz jar from this project.
+         Spark has changed this jar to org.lz4:lz4-java
+         net.jpountz and org.lz4 has same class Name -->
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-bloom</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -75,6 +90,25 @@
       <scope>${spark.deps.scope}</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <exclusions>
+        <!-- from transitive dependency com.univocity:univocity-parsers:2.5.9
+        is added from the org.apache.spark:spark-sql_2.11,so need to remove
+        this version.Carbon uses 2.2.1 version  -->
+        <exclusion>
+          <groupId>com.univocity</groupId>
+          <artifactId>univocity-parsers</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -233,6 +267,8 @@
             <configuration>
               <excludes>
                 <exclude>src/main/spark2.2</exclude>
+                <exclude>src/main/spark2.3</exclude>
+                <exclude>src/main/commonTo2.2And2.3</exclude>
               </excludes>
             </configuration>
           </plugin>
@@ -276,6 +312,7 @@
             <configuration>
               <excludes>
                 <exclude>src/main/spark2.1</exclude>
+                <exclude>src/main/spark2.3</exclude>
               </excludes>
             </configuration>
           </plugin>
@@ -293,6 +330,49 @@
                 <configuration>
                   <sources>
                     <source>src/main/spark2.2</source>
+                    <source>src/main/commonTo2.2And2.3</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.3</id>
+      <properties>
+        <spark.version>2.3.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.1</exclude>
+                <exclude>src/main/spark2.2</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.3</source>
+                    <source>src/main/commonTo2.2And2.3</source>
                   </sources>
                 </configuration>
               </execution>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
new file mode 100644
index 0000000..dfb89fd
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.CarbonReflectionUtils
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+    conf: SQLConf,
+    sparkSession: SparkSession,
+    analyzer: Analyzer) extends Analyzer(catalog, conf) {
+
+  val mvPlan = try {
+    CarbonReflectionUtils.createObject(
+      "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
+      sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
+  } catch {
+    case e: Exception =>
+      null
+  }
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    var logicalPlan = analyzer.execute(plan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
+    logicalPlan = CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+    if (mvPlan != null) {
+      mvPlan.apply(logicalPlan)
+    } else {
+      logicalPlan
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
new file mode 100644
index 0000000..ba6aae5
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class InMemorySessionCatalog(
+    externalCatalog: ExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    functionRegistry: FunctionRegistry,
+    sparkSession: SparkSession,
+    conf: SQLConf,
+    hadoopConf: Configuration,
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
+  extends SessionCatalog(
+    externalCatalog,
+    globalTempViewManager,
+    functionRegistry,
+    conf,
+    hadoopConf,
+    parser,
+    functionResourceLoader
+  ) with CarbonSessionCatalog {
+
+  override def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit = {
+    sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier)
+  }
+
+  override def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    // NOt Required in case of In-memory catalog
+  }
+
+  override def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val structType = catalogTable.schema
+    var newStructType = structType
+    newColumns.get.foreach {cols =>
+      newStructType = structType
+        .add(cols.getColumnName,
+          CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(cols.getDataType))
+    }
+    alterSchema(newStructType, catalogTable, tableIdentifier)
+  }
+
+  override def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val fields = catalogTable.schema.fields.filterNot { field =>
+      dropCols.get.exists { col =>
+        col.getColumnName.equalsIgnoreCase(field.name)
+      }
+    }
+    alterSchema(new StructType(fields), catalogTable, tableIdentifier)
+  }
+
+  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val a = catalogTable.schema.fields.flatMap { field =>
+      columns.get.map { col =>
+        if (col.getColumnName.equalsIgnoreCase(field.name)) {
+          StructField(col.getColumnName,
+            CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(col.getDataType))
+        } else {
+          field
+        }
+      }
+    }
+    alterSchema(new StructType(a), catalogTable, tableIdentifier)
+  }
+
+  private def alterSchema(structType: StructType,
+      catalogTable: CatalogTable,
+      tableIdentifier: TableIdentifier): Unit = {
+    val copy = catalogTable.copy(schema = structType)
+    sparkSession.sessionState.catalog.alterTable(copy)
+    sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+  }
+
+  lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+
+  def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
+  def getThriftTableInfo(tablePath: String): TableInfo = {
+    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+    CarbonUtil.readSchemaFile(tableMetadataFile)
+  }
+
+  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name)
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+    if (isRelationRefreshed) {
+      super.lookupRelation(name)
+    } else {
+      rtnRelation
+    }
+  }
+
+  /**
+   * returns hive client from HiveExternalCatalog
+   *
+   * @return
+   */
+  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    null
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
+  }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  override def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
+}
+
+class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends SessionStateBuilder(sparkSession, parentState) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy,
+      new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule,
+    new CarbonLateDecodeRule)
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  override protected lazy val catalog: InMemorySessionCatalog = {
+    val catalog = new InMemorySessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      sparkSession,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  private def externalCatalog: ExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog]
+
+  override protected lazy val resourceLoader: SessionResourceLoader = {
+    new SessionResourceLoader(session)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+    new Analyzer(catalog, conf) {
+      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+        new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        new CarbonIUDAnalysisRule(sparkSession) +:
+        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+        PreWriteCheck :: HiveOnlyCheck :: Nil
+      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+        PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        customPostHocResolutionRules
+    }
+  )
+  override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _)
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
new file mode 100644
index 0000000..72d3ae2
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
@@ -0,0 +1,44 @@
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+object CarbonOptimizerUtil {
+  def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
+    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+    // optimize whole plan at once.
+    val transFormedPlan = plan.transform {
+      case filter: Filter =>
+        filter.transformExpressions {
+          case s: ScalarSubquery =>
+            val tPlan = s.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            ScalarSubquery(tPlan, s.children, s.exprId)
+          case e: Exists =>
+            val tPlan = e.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
+
+          case In(value, Seq(l:ListQuery)) =>
+            val tPlan = l.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            In(value, Seq(ListQuery(tPlan, l.children, l.exprId)))
+        }
+    }
+    transFormedPlan
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..f3168d7
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{SQLConf, SessionState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonHiveSessionCatalog(
+    externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    functionRegistry: FunctionRegistry,
+    sparkSession: SparkSession,
+    conf: SQLConf,
+    hadoopConf: Configuration,
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
+  extends HiveSessionCatalog (
+    externalCatalog,
+    globalTempViewManager,
+    new HiveMetastoreCatalog(sparkSession),
+    functionRegistry,
+    conf,
+    hadoopConf,
+    parser,
+    functionResourceLoader
+  ) with CarbonSessionCatalog {
+
+  private lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+  /**
+   * return's the carbonEnv instance
+   * @return
+   */
+  override def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
+  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name)
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+    if (isRelationRefreshed) {
+      super.lookupRelation(name)
+    } else {
+      rtnRelation
+    }
+  }
+
+  /**
+   * returns hive client from HiveExternalCatalog
+   *
+   * @return
+   */
+  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+      .asInstanceOf[HiveExternalCatalog].client
+  }
+
+  def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit = {
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
+      s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
+      s"SET SERDEPROPERTIES" +
+      s"('tableName'='${ newTableIdentifier.table }', " +
+      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
+  }
+
+  override def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    getClient()
+      .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
+                  s"SET TBLPROPERTIES(${ schemaParts })")
+  }
+
+  override def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
+  }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  override def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ *
+ * @param sparkSession
+ */
+class CarbonSessionStateBuilder(sparkSession: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends HiveSessionStateBuilder(sparkSession, parentState) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(new StreamingTableStrategy(sparkSession),
+        new CarbonLateDecodeStrategy,
+        new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule,
+    new CarbonLateDecodeRule)
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  /**
+   * Create a [[CarbonSessionStateBuilder]].
+   */
+  override protected lazy val catalog: CarbonHiveSessionCatalog = {
+    val catalog = new CarbonHiveSessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      sparkSession,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  private def externalCatalog: HiveExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+
+  /**
+   * Create a Hive aware resource loader.
+   */
+  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
+    val client: HiveClient = externalCatalog.client.newSession()
+    new HiveSessionResourceLoader(session, client)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+    new Analyzer(catalog, conf) {
+
+      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+        new ResolveHiveSerdeTable(session) +:
+        new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        new CarbonIUDAnalysisRule(sparkSession) +:
+        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+
+      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+      PreWriteCheck :: HiveOnlyCheck :: Nil
+
+      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+        new DetermineTableStats(session) +:
+        RelationConversions(conf, catalog) +:
+        PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        HiveAnalysis +:
+        customPostHocResolutionRules
+    }
+  )
+
+  override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
new file mode 100644
index 0000000..1a22e99
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.util.CarbonReflectionUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * This class refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table.
+ */
+object CarbonSessionUtil {
+
+  val LOGGER = LogServiceFactory.getLogService("CarbonSessionUtil")
+
+  /**
+   * The method refreshes the cache entry
+   *
+   * @param rtnRelation [[LogicalPlan]] represents the given table or view.
+   * @param name        tableName
+   * @param sparkSession
+   * @return
+   */
+  def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    var isRelationRefreshed = false
+    rtnRelation match {
+      case SubqueryAlias(_,
+      MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)
+      ) =>
+        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case SubqueryAlias(_, relation) if
+      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+      relation.getClass.getName
+        .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+      relation.getClass.getName.equals(
+        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation"
+      ) =>
+        val catalogTable =
+          CarbonReflectionUtils.getFieldOfCatalogTable(
+            "tableMeta",
+            relation
+          ).asInstanceOf[CatalogTable]
+        isRelationRefreshed =
+          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+      case _ =>
+    }
+    isRelationRefreshed
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   *
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def prunePartitionsByFilter(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
+    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+    ExternalCatalogUtils.prunePartitionsByFilter(
+      sparkSession.sessionState.catalog.getTableMetadata(identifier),
+      allPartitions,
+      partitionFilters,
+      sparkSession.sessionState.conf.sessionLocalTimeZone
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
new file mode 100644
index 0000000..2128ffd
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.buildConf
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+  val carbonProperties = CarbonProperties.getInstance()
+
+  /**
+   * To initialize dynamic param defaults along with usage docs
+   */
+  def addDefaultCarbonParams(): Unit = {
+    val ENABLE_UNSAFE_SORT =
+      buildConf(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+        .doc("To enable/ disable unsafe sort.")
+        .booleanConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+      buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+        .doc("To set carbon task distribution.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    val BAD_RECORDS_LOGGER_ENABLE =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+        .doc("To enable/ disable carbon bad record logger.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    val BAD_RECORDS_ACTION =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+        .doc("To configure the bad records action.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    val IS_EMPTY_DATA_BAD_RECORD =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+        .doc("Property to decide weather empty data to be considered bad/ good record.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+          .toBoolean)
+    val SORT_SCOPE =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+        .doc("Property to specify sort scope.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    val BATCH_SORT_SIZE_INMB =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+        .doc("Property to specify batch sort size in MB.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    val SINGLE_PASS =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+        .doc("Property to enable/disable single_pass.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    val BAD_RECORD_PATH =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+        .doc("Property to configure the bad record location.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    val GLOBAL_SORT_PARTITIONS =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+        .doc("Property to configure the global sort partitions.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    val DATEFORMAT =
+      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+        .doc("Property to configure data format for date type columns.")
+        .stringConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+    val CARBON_INPUT_SEGMENTS = buildConf(
+      "carbon.input.segments.<database_name>.<table_name>")
+      .doc("Property to configure the list of segments to query.").stringConf
+      .createWithDefault(carbonProperties
+        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
+  }
+  /**
+   * to set the dynamic properties default values
+   */
+  def addDefaultCarbonSessionParams(): Unit = {
+    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+      carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
new file mode 100644
index 0000000..74c0f97
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -0,0 +1,130 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.net.URI
+
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.util.CarbonReflectionUtils
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ *
+ * @param table the Catalog Table
+ * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ *
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+    table: CatalogTable,
+    mode: SaveMode,
+    query: LogicalPlan)
+  extends RunnableCommand {
+
+  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+
+    val sessionState = sparkSession.sessionState
+    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+    val tableIdentWithDB = table.identifier.copy(database = Some(db))
+    val tableName = tableIdentWithDB.unquotedString
+
+    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+      assert(mode != SaveMode.Overwrite,
+        s"Expect the table $tableName has been dropped when the save mode is Overwrite")
+
+      if (mode == SaveMode.ErrorIfExists) {
+        throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
+      }
+      if (mode == SaveMode.Ignore) {
+        // Since the table already exists and the save mode is Ignore, we will just return.
+        return Seq.empty
+      }
+
+      saveDataIntoTable(
+        sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
+    } else {
+      assert(table.schema.isEmpty)
+
+      val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+        Some(sessionState.catalog.defaultTablePath(table.identifier))
+      } else {
+        table.storage.locationUri
+      }
+      val result = saveDataIntoTable(
+        sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
+
+      result match {
+        case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+                                     sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+          // Need to recover partitions into the metastore so our saved data is visible.
+          sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+        case _ =>
+      }
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def saveDataIntoTable(
+      session: SparkSession,
+      table: CatalogTable,
+      tableLocation: Option[URI],
+      data: LogicalPlan,
+      mode: SaveMode,
+      tableExists: Boolean): BaseRelation = {
+    // Create the relation based on the input logical plan: `data`.
+    val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
+    val dataSource = DataSource(
+      session,
+      className = table.provider.get,
+      partitionColumns = table.partitionColumnNames,
+      bucketSpec = table.bucketSpec,
+      options = table.storage.properties ++ pathOption,
+      catalogTable = if (tableExists) {
+        Some(table)
+      } else {
+        None
+      })
+
+    try {
+      val physicalPlan = session.sessionState.executePlan(data).executedPlan
+      CarbonReflectionUtils.invokewriteAndReadMethod(dataSource,
+        Dataset.ofRows(session, query),
+        data,
+        session,
+        mode,
+        query,
+        physicalPlan)
+    } catch {
+      case ex: AnalysisException =>
+        logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
+        throw ex
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
new file mode 100644
index 0000000..8f1477c
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext,ChangeColumnContext, CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel,AlterTableDataTypeChangeModel}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.types.DecimalType
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+trait SqlAstBuilderHelper extends SparkSqlAstBuilder {
+
+
+  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
+
+    val newColumn = visitColType(ctx.colType)
+    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
+      throw new MalformedCarbonCommandException(
+        "Column names provided are different. Both the column names should be same")
+    }
+
+    val (typeString, values): (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
+      case d: DecimalType => ("decimal", Some(List((d.precision, d.scale))))
+      case _ => (newColumn.dataType.typeName.toLowerCase, None)
+    }
+
+    val alterTableChangeDataTypeModel =
+      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
+        new CarbonSpark2SqlParser()
+          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
+        ctx.tableIdentifier().table.getText.toLowerCase,
+        ctx.identifier.getText.toLowerCase,
+        newColumn.name.toLowerCase)
+
+    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+  }
+
+
+  def visitAddTableColumns(parser: CarbonSpark2SqlParser,
+      ctx: AddTableColumnsContext): LogicalPlan = {
+    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+    val fields = parser.getFields(cols)
+    val tblProperties = scala.collection.mutable.Map.empty[String, String]
+    val tableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
+        .map(_.getText)),
+      ctx.tableIdentifier.table.getText.toLowerCase,
+      fields,
+      Seq.empty,
+      tblProperties,
+      None,
+      true)
+
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      Option(ctx.tableIdentifier().db).map(_.getText),
+      ctx.tableIdentifier.table.getText,
+      tblProperties.toMap,
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highcardinalitydims.getOrElse(Seq.empty))
+
+    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
+  }
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    super.visitCreateTable(ctx)
+  }
+
+  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
+    withOrigin(ctx) {
+      if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
+          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
+        super.visitShowTables(ctx)
+      } else {
+        CarbonShowTablesCommand(
+          Option(ctx.db).map(_.getText),
+          Option(ctx.pattern).map(string))
+      }
+    }
+  }
+
+  override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
+    CarbonExplainCommand(super.visitExplain(ctx))
+  }
+}


Mime
View raw message