carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject carbondata git commit: [CARBONDATA-2163][CARBONDATA-2164] Remove spark dependency in core and processing modules
Date Fri, 23 Mar 2018 14:21:51 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 5725b7eb1 -> 982d03fea


[CARBONDATA-2163][CARBONDATA-2164] Remove spark dependency in core and processing modules

This closes #2070


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/982d03fe
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/982d03fe
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/982d03fe

Branch: refs/heads/master
Commit: 982d03fea2a3b0869b81ce1a0f896b522a25af64
Parents: 5725b7e
Author: Jacky Li <jacky.likun@qq.com>
Authored: Mon Mar 19 00:11:49 2018 +0800
Committer: QiangCai <qiangcai@qq.com>
Committed: Fri Mar 23 22:21:22 2018 +0800

----------------------------------------------------------------------
 core/pom.xml                                    | 39 +++++++---
 ...feVariableLengthDimensionDataChunkStore.java |  4 +-
 ...afeVariableLengthDimesionDataChunkStore.java |  4 +-
 .../carbondata/core/memory/MemoryBlock.java     |  8 --
 .../impl/AbstractScannedResultCollector.java    |  6 +-
 ...structureBasedDictionaryResultCollector.java | 10 ++-
 .../RestructureBasedRawResultCollector.java     |  7 +-
 .../RestructureBasedVectorResultCollector.java  |  9 +--
 .../core/scan/complextypes/ArrayQueryType.java  | 11 +--
 .../scan/complextypes/PrimitiveQueryType.java   | 26 -------
 .../core/scan/complextypes/StructQueryType.java | 18 +----
 .../scan/executor/util/RestructureUtil.java     |  7 +-
 .../core/scan/filter/GenericQueryType.java      |  4 -
 .../apache/carbondata/core/util/CarbonUtil.java |  3 +-
 .../carbondata/core/util/DataTypeConverter.java | 11 ++-
 .../core/util/DataTypeConverterImpl.java        | 41 +++++++++-
 .../carbondata/core/util/DataTypeUtil.java      |  6 +-
 .../complextypes/PrimitiveQueryTypeTest.java    | 29 -------
 .../scan/complextypes/StructQueryTypeTest.java  |  5 --
 .../conditional/EqualToExpressionUnitTest.java  |  3 +-
 .../GreaterThanEqualToExpressionUnitTest.java   |  3 +-
 .../GreaterThanExpressionUnitTest.java          |  5 +-
 .../conditional/InExpressionUnitTest.java       |  5 +-
 .../LessThanEqualToExpressionUnitTest.java      |  5 +-
 .../conditional/LessThanExpressionUnitTest.java |  5 +-
 .../NotEqualsExpressionUnitTest.java            |  5 +-
 .../conditional/NotInExpressionUnitTest.java    |  5 +-
 .../carbondata/core/util/DataTypeUtilTest.java  |  2 +-
 hadoop/pom.xml                                  |  4 +
 integration/presto/pom.xml                      |  5 ++
 .../presto/CarbonColumnVectorWrapper.java       | 64 ++++++++++++++++
 .../spark/util/SparkDataTypeConverterImpl.java  | 42 ++++++++--
 .../spark/rdd/CarbonScanPartitionRDD.scala      |  3 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  2 +
 .../vectorreader/ColumnarVectorWrapper.java     | 11 +--
 .../spark/rdd/CarbonDataRDDFactory.scala        |  1 -
 .../spark/sql/SparkUnknownExpression.scala      |  4 +-
 .../CarbonAlterTableDataTypeChangeCommand.scala | 57 ++++++++++++--
 .../datasources/CarbonFileFormat.scala          |  2 +-
 .../sql/hive/CarbonPreAggregateRules.scala      |  3 +-
 processing/pom.xml                              |  6 +-
 .../loading/TableProcessingOperations.java      |  2 -
 .../sort/unsafe/sort/SortDataFormat.java        | 80 ++++++++++++++++++++
 .../loading/sort/unsafe/sort/TimSort.java       |  2 -
 .../unsafe/sort/UnsafeIntSortDataFormat.java    |  2 -
 .../merger/CompactionResultSortProcessor.java   |  5 +-
 .../partition/spliter/CarbonSplitExecutor.java  |  7 +-
 .../carbondata/processing/store/TablePage.java  |  4 +-
 48 files changed, 392 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 824de0d..d9c756e 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,11 +49,6 @@
       <artifactId>gson</artifactId>
       <version>2.3.1</version>
     </dependency>
-    <!--<dependency>-->
-      <!--<groupId>io.netty</groupId>-->
-      <!--<artifactId>netty-all</artifactId>-->
-      <!--<version>4.1.8.Final</version>-->
-    <!--</dependency>-->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
@@ -63,6 +58,11 @@
       <artifactId>hadoop-hdfs</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.xerial.snappy</groupId>
       <artifactId>snappy-java</artifactId>
       <version>${snappy.version}</version>
@@ -78,10 +78,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>3.4.7</version>
@@ -96,6 +92,31 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+      <version>0.5.11</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <version>4.0.42.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>net.jpountz.lz4</groupId>
+      <artifactId>lz4</artifactId>
+      <version>1.3.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index f498c6e..09230dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -153,8 +153,8 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens
       } else if (dt == DataTypes.INT) {
         vector.putInt(vectorRow, ByteUtil.toInt(data, currentDataOffset, length));
       } else if (dt == DataTypes.LONG) {
-        vector.putLong(vectorRow, DataTypeUtil
-            .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(),
+        vector.putLong(vectorRow,
+            DataTypeUtil.getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(),
                 currentDataOffset, length));
       } else if (dt  == DataTypes.TIMESTAMP) {
         vector.putLong(vectorRow, ByteUtil.toLong(data, currentDataOffset, length) * 1000L);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
index e1eb378..0321ee7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
@@ -177,8 +177,8 @@ public class UnsafeVariableLengthDimesionDataChunkStore
       } else if (dt == DataTypes.INT) {
         vector.putInt(vectorRow, ByteUtil.toInt(value, 0, value.length));
       } else if (dt == DataTypes.LONG) {
-        vector.putLong(vectorRow, DataTypeUtil
-            .getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0,
+        vector.putLong(vectorRow,
+            DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0,
                 value.length));
       } else if (dt == DataTypes.TIMESTAMP) {
         vector.putLong(vectorRow, ByteUtil.toLong(value, 0, value.length) * 1000L);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
index d6cb184..fd4f06c 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.core.memory;
 
 import javax.annotation.Nullable;
 
-import org.apache.spark.unsafe.Platform;
-
 /**
  * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
  * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
@@ -55,10 +53,4 @@ public class MemoryBlock extends MemoryLocation {
     this.isFreed = freedStatus;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-    return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8);
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index 694271e..9ac5a06 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -75,7 +75,8 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
         Object defaultValue = measureInfo.getDefaultValues()[i];
         if (null != defaultValue && DataTypes.isDecimal(measureInfo.getMeasureDataTypes()[i])) {
           // convert data type as per the computing engine
-          defaultValue = DataTypeUtil.getDataTypeConverter().convertToDecimal(defaultValue);
+          defaultValue =
+              DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(defaultValue);
         }
         msrValues[i + offset] = defaultValue;
       }
@@ -100,7 +101,8 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
               bigDecimalMsrValue.setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
         }
         // convert data type as per the computing engine
-        return DataTypeUtil.getDataTypeConverter().convertToDecimal(bigDecimalMsrValue);
+        return DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(
+            bigDecimalMsrValue);
       } else {
         return dataChunk.getDouble(index);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 8b42a4a..f0d6898 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -20,11 +20,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * class for handling restructure scenarios for filling result
@@ -86,6 +88,9 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
             if (dictionaryEncodingArray[i] || directDictionaryEncodingArray[i]) {
               row[order[i]] = dimensionInfo.getDefaultValues()[i];
               dictionaryColumnIndex++;
+            } else if (queryDimensions[i].getDimension().getDataType() == DataTypes.STRING) {
+              row[order[i]] = DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8String(
+                  (byte[])dimensionInfo.getDefaultValues()[i]);
             } else {
               row[order[i]] = dimensionInfo.getDefaultValues()[i];
             }
@@ -119,8 +124,11 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
             scannedResult.getMeasureChunk(measureInfo.getMeasureOrdinals()[measureExistIndex]),
             scannedResult.getCurrentRowId(), queryMeasure.getMeasure());
         measureExistIndex++;
-      } else {
+      } else if (DataTypes.isDecimal(measureInfo.getMeasureDataTypes()[i])) {
         // if not then get the default value
+        msrValues[i + offset] = DataTypeUtil.getDataTypeConverter()
+            .convertFromBigDecimalToDecimal(measureDefaultValues[i]);
+      } else {
         msrValues[i + offset] = measureDefaultValues[i];
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index 6544a75..d776b5e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -34,9 +34,9 @@ import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * It is not a collector it is just a scanned result holder.
@@ -239,10 +239,11 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector
           byte[] newColumnDefaultValue = null;
           Object defaultValue = dimensionInfo.getDefaultValues()[i];
           if (null != defaultValue) {
-            newColumnDefaultValue = ((UTF8String) defaultValue).getBytes();
+            newColumnDefaultValue = (byte[]) defaultValue;
           } else if (actualQueryDimensions[i].getDimension().getDataType() == DataTypes.STRING) {
             newColumnDefaultValue =
-                UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL).getBytes();
+                DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
+                    CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
           } else {
             newColumnDefaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
index 61a2992..b95bffe 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.scan.collector.impl;
 
+import java.math.BigDecimal;
 import java.util.List;
 
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -31,9 +32,6 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.types.UTF8String;
-
 /**
  * It is not a collector it is just a scanned result holder.
  */
@@ -206,8 +204,7 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector
       } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
         vector.putLongs(columnVectorInfo.vectorOffset, columnVectorInfo.size, (long) defaultValue);
       } else {
-        vector.putBytes(columnVectorInfo.vectorOffset, columnVectorInfo.size,
-            ((UTF8String) defaultValue).getBytes());
+        vector.putBytes(columnVectorInfo.vectorOffset, columnVectorInfo.size, (byte[])defaultValue);
       }
     } else {
       vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size);
@@ -240,7 +237,7 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector
                 (long) defaultValue);
           } else if (DataTypes.isDecimal(dataType)) {
             vector.putDecimals(columnVectorInfo.vectorOffset, columnVectorInfo.size,
-                ((Decimal) defaultValue).toJavaBigDecimal(), measure.getPrecision());
+                (BigDecimal) defaultValue, measure.getPrecision());
           } else if (dataType == DataTypes.BOOLEAN) {
             vector.putBoolean(columnVectorInfo.vectorOffset, (Boolean) defaultValue);
           } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index 30dd1dd..24c1c9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -24,10 +24,7 @@ import java.nio.ByteBuffer;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
-
-import org.apache.spark.sql.catalyst.util.GenericArrayData;
-import org.apache.spark.sql.types.ArrayType;
-import org.apache.spark.sql.types.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 public class ArrayQueryType extends ComplexQueryType implements GenericQueryType {
 
@@ -82,10 +79,6 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
     return children.getColsCount() + 1;
   }
 
-  @Override public DataType getSchemaType() {
-    return new ArrayType(null, true);
-  }
-
   @Override public void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder)
       throws IOException {
     readBlockDataChunk(blockChunkHolder);
@@ -101,7 +94,7 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
     for (int i = 0; i < dataLength; i++) {
       data[i] = children.getDataBasedOnDataTypeFromSurrogates(surrogateData);
     }
-    return new GenericArrayData(data);
+    return DataTypeUtil.getDataTypeConverter().wrapWithGenericArrayData(data);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index b8aa912..8c75caf 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -30,14 +30,6 @@ import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
-import org.apache.spark.sql.types.BooleanType$;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DateType$;
-import org.apache.spark.sql.types.DoubleType$;
-import org.apache.spark.sql.types.IntegerType$;
-import org.apache.spark.sql.types.LongType$;
-import org.apache.spark.sql.types.TimestampType$;
-
 public class PrimitiveQueryType extends ComplexQueryType implements GenericQueryType {
 
   private String name;
@@ -95,24 +87,6 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
     dataOutputStream.write(currentVal);
   }
 
-  @Override public DataType getSchemaType() {
-    if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
-      return IntegerType$.MODULE$;
-    } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
-      return DoubleType$.MODULE$;
-    } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
-      return LongType$.MODULE$;
-    } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
-      return BooleanType$.MODULE$;
-    } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
-      return TimestampType$.MODULE$;
-    } else if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
-      return DateType$.MODULE$;
-    } else {
-      return IntegerType$.MODULE$;
-    }
-  }
-
   @Override public void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder)
       throws IOException {
     readBlockDataChunk(blockChunkHolder);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index 1d4f141..1064694 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -26,12 +26,7 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
-
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 public class StructQueryType extends ComplexQueryType implements GenericQueryType {
 
@@ -97,15 +92,6 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
     }
   }
 
-  @Override public DataType getSchemaType() {
-    StructField[] fields = new StructField[children.size()];
-    for (int i = 0; i < children.size(); i++) {
-      fields[i] = new StructField(children.get(i).getName(), null, true,
-          Metadata.empty());
-    }
-    return new StructType(fields);
-  }
-
   @Override public void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder)
       throws IOException {
     readBlockDataChunk(blockChunkHolder);
@@ -122,6 +108,6 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
       fields[i] =  children.get(i).getDataBasedOnDataTypeFromSurrogates(surrogateData);
     }
 
-    return new GenericInternalRow(fields);
+    return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index e67d822..d7247b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -41,8 +41,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * Utility class for restructuring
@@ -231,7 +229,8 @@ public class RestructureUtil {
         long timestampValue = ByteUtil.toLong(defaultValue, 0, defaultValue.length);
         noDictionaryDefaultValue = timestampValue * 1000L;
       } else {
-        noDictionaryDefaultValue = UTF8String.fromBytes(defaultValue);
+        noDictionaryDefaultValue =
+            DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(defaultValue);
       }
     }
     return noDictionaryDefaultValue;
@@ -318,7 +317,7 @@ public class RestructureUtil {
         if (columnSchema.getScale() > decimal.scale()) {
           decimal = decimal.setScale(columnSchema.getScale(), RoundingMode.HALF_UP);
         }
-        measureDefaultValue = Decimal.apply(decimal);
+        measureDefaultValue = decimal;
       } else {
         value = new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
         Double parsedValue = Double.valueOf(value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
index 214bd9d..b5d8d82 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
@@ -24,8 +24,6 @@ import java.nio.ByteBuffer;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
 
-import org.apache.spark.sql.types.DataType;
-
 public interface GenericQueryType {
 
   String getName();
@@ -43,8 +41,6 @@ public interface GenericQueryType {
   void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
       int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException;
 
-  DataType getSchemaType();
-
   void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException;
 
   Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 06511f8..5bb1c8e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -95,6 +95,7 @@ import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TIOStreamTransport;
+import scala.StringContext;
 
 public final class CarbonUtil {
 
@@ -656,7 +657,7 @@ public final class CarbonUtil {
    * @return
    */
   public static String unescapeChar(String parseStr) {
-    return scala.StringContext.treatEscapes(parseStr);
+    return StringContext.treatEscapes(parseStr);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
index 8c9e058..7c63860 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
@@ -19,9 +19,16 @@ package org.apache.carbondata.core.util;
 
 public interface DataTypeConverter {
 
-  Object convertToDecimal(Object data);
-  Object convertFromByteToUTF8String(Object data);
+  Object convertFromStringToDecimal(Object data);
+  Object convertFromBigDecimalToDecimal(Object data);
+  Object convertFromDecimalToBigDecimal(Object data);
+
+  Object convertFromByteToUTF8String(byte[] data);
+  byte[] convertFromByteToUTF8Bytes(byte[] data);
   byte[] convertFromStringToByte(Object data);
   Object convertFromStringToUTF8String(Object Data);
 
+  Object wrapWithGenericArrayData(Object data);
+  Object wrapWithGenericRow(Object[] fields);
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
index f1603dc..ea5740d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
@@ -26,7 +26,8 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
 
   private static final long serialVersionUID = -1718154403432354200L;
 
-  public Object convertToDecimal(Object data) {
+  @Override
+  public Object convertFromStringToDecimal(Object data) {
     if (null == data) {
       return null;
     }
@@ -36,13 +37,35 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
     return new BigDecimal(data.toString());
   }
 
-  public Object convertFromByteToUTF8String(Object data) {
+  @Override
+  public Object convertFromBigDecimalToDecimal(Object data) {
     if (null == data) {
       return null;
     }
-    return new String((byte[]) data, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+    if (data instanceof BigDecimal) {
+      return data;
+    }
+    return new BigDecimal(data.toString());
+  }
+
+  @Override public Object convertFromDecimalToBigDecimal(Object data) {
+    return convertFromBigDecimalToDecimal(data);
+  }
+
+  @Override
+  public Object convertFromByteToUTF8String(byte[] data) {
+    if (null == data) {
+      return null;
+    }
+    return new String(data, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
   }
 
+  @Override
+  public byte[] convertFromByteToUTF8Bytes(byte[] data) {
+    return data;
+  }
+
+  @Override
   public byte[] convertFromStringToByte(Object data) {
     if (null == data) {
       return null;
@@ -50,10 +73,22 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
     return data.toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
   }
 
+  @Override
   public Object convertFromStringToUTF8String(Object data) {
     if (null == data) {
       return null;
     }
     return data.toString();
   }
+
+  @Override
+  public Object wrapWithGenericArrayData(Object data) {
+    return data;
+  }
+
+  @Override
+  public Object wrapWithGenericRow(Object[] fields) {
+    return fields;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 4602cc4..a4d6094 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -103,7 +103,7 @@ public final class DataTypeUtil {
           new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
       BigDecimal decimal = normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision());
       if (useConverter) {
-        return converter.convertToDecimal(decimal);
+        return converter.convertFromBigDecimalToDecimal(decimal);
       } else {
         return decimal;
       }
@@ -309,7 +309,7 @@ public final class DataTypeUtil {
         if (data.isEmpty()) {
           return null;
         }
-        return converter.convertToDecimal(data);
+        return converter.convertFromStringToDecimal(data);
       } else {
         return converter.convertFromStringToUTF8String(data);
       }
@@ -544,7 +544,7 @@ public final class DataTypeUtil {
         if (dimension.getColumnSchema().getScale() > javaDecVal.scale()) {
           javaDecVal = javaDecVal.setScale(dimension.getColumnSchema().getScale());
         }
-        return getDataTypeConverter().convertToDecimal(javaDecVal);
+        return getDataTypeConverter().convertFromBigDecimalToDecimal(javaDecVal);
       } else {
         return getDataTypeConverter().convertFromByteToUTF8String(dataInBytes);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
index 5149136..3236f16 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
@@ -29,11 +29,6 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.spark.sql.types.BooleanType$;
-import org.apache.spark.sql.types.DoubleType$;
-import org.apache.spark.sql.types.IntegerType$;
-import org.apache.spark.sql.types.LongType$;
-import org.apache.spark.sql.types.TimestampType$;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -78,30 +73,6 @@ public class PrimitiveQueryTypeTest {
 
   }
 
-  @Test public void testGetDataTypeForDefault() {
-    assertEquals(IntegerType$.MODULE$, primitiveQueryType.getSchemaType());
-  }
-
-  @Test public void testGetDataTypeForInt() {
-    assertEquals(IntegerType$.MODULE$, primitiveQueryTypeForInt.getSchemaType());
-  }
-
-  @Test public void testGetDataTypeForDouble() {
-    assertEquals(DoubleType$.MODULE$, primitiveQueryTypeForDouble.getSchemaType());
-  }
-
-  @Test public void testGetDataTypeForBoolean() {
-    assertEquals(BooleanType$.MODULE$, primitiveQueryTypeForBoolean.getSchemaType());
-  }
-
-  @Test public void testGetDataTypeForTimeStamp() {
-    assertEquals(TimestampType$.MODULE$, primitiveQueryTypeForTimeStamp.getSchemaType());
-  }
-
-  @Test public void testGetDataTypeForLong() {
-    assertEquals(LongType$.MODULE$, primitiveQueryTypeForLong.getSchemaType());
-  }
-
   @Test public void testGetDataBasedOnDataTypeFromSurrogates() {
     ByteBuffer surrogateData = ByteBuffer.allocate(10);
     surrogateData.put(3, (byte) 1);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
index 3215ef0..b09d9dd 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
@@ -64,9 +64,4 @@ public class StructQueryTypeTest {
     assertEquals(expectedValue, actualValue);
   }
 
-  @Test public void testGetSchemaType() {
-    List children = new ArrayList();
-    children.add(null);
-    assertNotNull(structQueryType.getSchemaType());
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java
index b2843bc..f349fc9 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.spark.sql.types.Decimal;
 import org.junit.Test;
 
 import static junit.framework.Assert.assertEquals;
@@ -297,7 +296,7 @@ public class EqualToExpressionUnitTest {
     right.setColIndex(0);
     equalToExpression = new EqualToExpression(right, right);
     RowImpl value = new RowImpl();
-    Decimal[] row = new Decimal[] { Decimal.apply(12345.0) };
+    BigDecimal[] row = new BigDecimal[] { new BigDecimal(12345.0) };
     Object objectRow[] = { row };
     value.setValues(objectRow);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java
index adff4e4..95e5935 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.spark.sql.types.Decimal;
 import org.junit.Test;
 
 import static junit.framework.Assert.assertEquals;
@@ -210,7 +209,7 @@ public class GreaterThanEqualToExpressionUnitTest {
     left.setColIndex(1);
     greaterThanEqualToExpression = new GreaterThanEqualToExpression(left, right);
     RowImpl value = new RowImpl();
-    Decimal[] row = new Decimal[] { Decimal.apply(12345.0) };
+    BigDecimal[] row = new BigDecimal[] { new BigDecimal(12345.0) };
     Object objectRow[] = { row, row };
     value.setValues(objectRow);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java
index b5decc2..38a7222 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.spark.sql.types.Decimal;
 import org.junit.Test;
 
 import static junit.framework.Assert.assertEquals;
@@ -256,8 +255,8 @@ public class GreaterThanExpressionUnitTest {
     left.setColIndex(1);
     greaterThanExpression = new GreaterThanExpression(left, right);
     RowImpl value = new RowImpl();
-    Decimal[] row = new Decimal[] { Decimal.apply(12345.0) };
-    Decimal[] row1 = new Decimal[] { Decimal.apply(123451245.0) };
+    BigDecimal[] row = new BigDecimal[] { new BigDecimal(12345.0) };
+    BigDecimal[] row1 = new BigDecimal[] { new BigDecimal(123451245.0) };
     Object objectRow[] = { row1, row };
     value.setValues(objectRow);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java
index 4f4203d..3b48e44 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.spark.sql.types.Decimal;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -209,8 +208,8 @@ public class InExpressionUnitTest {
     right.setColIndex(1);
     inExpression = new InExpression(left, right);
     RowImpl value = new RowImpl();
-    Decimal row = Decimal.apply(123452154.0);
-    Decimal row1 = Decimal.apply(123452154.0);
+    BigDecimal row = new BigDecimal(123452154.0);
+    BigDecimal row1 = new BigDecimal(123452154.0);
     Object objectRow[] = { row, row1 };
     value.setValues(objectRow);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java
index f1001ae..d3c8cf4 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.spark.sql.types.Decimal;
 import org.junit.Test;
 
 import static junit.framework.Assert.assertEquals;
@@ -259,8 +258,8 @@ public class LessThanEqualToExpressionUnitTest {
     left.setColIndex(1);
     lessThanEqualToExpression = new LessThanEqualToExpression(left, right);
     RowImpl value = new RowImpl();
-    Decimal[] row = new Decimal[] { Decimal.apply(46851.2) };
-    Decimal[] row1 = new Decimal[] { Decimal.apply(45821.02) };
+    BigDecimal[] row = new BigDecimal[] { new BigDecimal(46851.2) };
+    BigDecimal[] row1 = new BigDecimal[] { new BigDecimal(45821.02) };
     Object objectRow[] = { row1, row };
     value.setValues(objectRow);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java
index 4dfeaa3..e038a56 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.spark.sql.types.Decimal;
 import org.junit.Test;
 
 import static junit.framework.Assert.assertEquals;
@@ -256,8 +255,8 @@ public class LessThanExpressionUnitTest {
     left.setColIndex(1);
     lessThanExpression = new LessThanExpression(left, right);
     RowImpl value = new RowImpl();
-    Decimal[] row = new Decimal[] { Decimal.apply(256324.0) };
-    Decimal[] row1 = new Decimal[] { Decimal.apply(123451245.0) };
+    BigDecimal[] row = new BigDecimal[] { new BigDecimal(256324.0) };
+    BigDecimal[] row1 = new BigDecimal[] { new BigDecimal(123451245.0) };
     Object objectRow[] = { row1, row };
     value.setValues(objectRow);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java
index c9707e9..34448f4 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.spark.sql.types.Decimal;
 import org.junit.Test;
 
 import static junit.framework.Assert.assertEquals;
@@ -266,8 +265,8 @@ public class NotEqualsExpressionUnitTest {
     left.setColIndex(0);
     notEqualsExpression = new NotEqualsExpression(left, right);
     RowImpl value = new RowImpl();
-    Decimal[] row = new Decimal[] { Decimal.apply(12345.0) };
-    Decimal[] row1 = new Decimal[] { Decimal.apply(1235445.0) };
+    BigDecimal[] row = new BigDecimal[] { new BigDecimal(12345.0) };
+    BigDecimal[] row1 = new BigDecimal[] { new BigDecimal(1235445.0) };
     Object objectRow[] = { row, row1 };
     value.setValues(objectRow);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java
index 207ce5d..d31361b 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.spark.sql.types.Decimal;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -208,8 +207,8 @@ public class NotInExpressionUnitTest {
     right.setColIndex(1);
     notInExpression = new NotInExpression(left, right);
     RowImpl value = new RowImpl();
-    Decimal row = Decimal.apply(123452154.0);
-    Decimal row1 = Decimal.apply(1234521215454.0);
+    BigDecimal row = new BigDecimal(123452154.0);
+    BigDecimal row1 = new BigDecimal(1234521215454.0);
     Object objectRow[] = { row, row1 };
     value.setValues(objectRow);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
index 0c8f926..c67da7d 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
@@ -71,7 +71,7 @@ public class DataTypeUtilTest {
     java.math.BigDecimal javaDecVal = new java.math.BigDecimal(1);
     scala.math.BigDecimal scalaDecVal = new scala.math.BigDecimal(javaDecVal);
     assertEquals(getDataBasedOnDataType("1", DataTypes.createDefaultDecimalType()),
-        DataTypeUtil.getDataTypeConverter().convertToDecimal(scalaDecVal));
+        DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(scalaDecVal));
     assertEquals(getDataBasedOnDataType("default", DataTypes.NULL),
         DataTypeUtil.getDataTypeConverter().convertFromStringToUTF8String("default"));
     assertEquals(getDataBasedOnDataType((String) null, DataTypes.NULL), null);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop/pom.xml b/hadoop/pom.xml
index c3964c5..916b9db 100644
--- a/hadoop/pom.xml
+++ b/hadoop/pom.xml
@@ -40,6 +40,10 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index c3c7c64..de69df0 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -488,6 +488,11 @@
       <artifactId>commons-io</artifactId>
       <version>2.4</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
 
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
index 78a1ea8..4560241 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
@@ -18,11 +18,30 @@
 package org.apache.carbondata.presto;
 
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.NullType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+
 public class CarbonColumnVectorWrapper implements CarbonColumnVector {
 
   private CarbonColumnVectorImpl columnVector;
@@ -218,4 +237,49 @@ public class CarbonColumnVectorWrapper implements CarbonColumnVector {
   @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
     this.filteredRowsExist = filteredRowsExist;
   }
+
+  // TODO: this is copied from carbondata-spark-common module, use presto type instead of this
+  private org.apache.carbondata.core.metadata.datatype.DataType
+  convertSparkToCarbonDataType(org.apache.spark.sql.types.DataType dataType) {
+    if (dataType instanceof StringType) {
+      return DataTypes.STRING;
+    } else if (dataType instanceof ShortType) {
+      return DataTypes.SHORT;
+    } else if (dataType instanceof IntegerType) {
+      return DataTypes.INT;
+    } else if (dataType instanceof LongType) {
+      return DataTypes.LONG;
+    } else if (dataType instanceof DoubleType) {
+      return DataTypes.DOUBLE;
+    } else if (dataType instanceof FloatType) {
+      return DataTypes.FLOAT;
+    } else if (dataType instanceof DateType) {
+      return DataTypes.DATE;
+    } else if (dataType instanceof BooleanType) {
+      return DataTypes.BOOLEAN;
+    } else if (dataType instanceof TimestampType) {
+      return DataTypes.TIMESTAMP;
+    } else if (dataType instanceof NullType) {
+      return DataTypes.NULL;
+    } else if (dataType instanceof DecimalType) {
+      DecimalType decimal = (DecimalType) dataType;
+      return DataTypes.createDecimalType(decimal.precision(), decimal.scale());
+    } else if (dataType instanceof ArrayType) {
+      org.apache.spark.sql.types.DataType elementType = ((ArrayType) dataType).elementType();
+      return DataTypes.createArrayType(convertSparkToCarbonDataType(elementType));
+    } else if (dataType instanceof StructType) {
+      StructType structType = (StructType) dataType;
+      org.apache.spark.sql.types.StructField[] fields = structType.fields();
+      List<StructField> carbonFields = new ArrayList<>();
+      for (org.apache.spark.sql.types.StructField field : fields) {
+        carbonFields.add(
+            new StructField(
+                field.name(),
+                convertSparkToCarbonDataType(field.dataType())));
+      }
+      return DataTypes.createStructType(carbonFields);
+    } else {
+      throw new UnsupportedOperationException("getting " + dataType + " from presto");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
index 3670e11..6e9e0a6 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
@@ -18,9 +18,12 @@
 package org.apache.carbondata.spark.util;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 
 import org.apache.carbondata.core.util.DataTypeConverter;
 
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -30,14 +33,26 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
 
   private static final long serialVersionUID = -4379212832935070583L;
 
-  public Object convertToDecimal(Object data) {
+  @Override
+  public Object convertFromStringToDecimal(Object data) {
+    java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString());
+    return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+  }
+
+  @Override
+  public Object convertFromBigDecimalToDecimal(Object data) {
     if (null == data) {
       return null;
     }
-    java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString());
-    return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+    return org.apache.spark.sql.types.Decimal.apply((BigDecimal)data);
+  }
+
+  @Override
+  public Object convertFromDecimalToBigDecimal(Object data) {
+    return ((org.apache.spark.sql.types.Decimal) data).toJavaBigDecimal();
   }
 
+  @Override
   public byte[] convertFromStringToByte(Object data) {
     if (null == data) {
       return null;
@@ -45,17 +60,34 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
     return UTF8String.fromString((String) data).getBytes();
   }
 
-  public Object convertFromByteToUTF8String(Object data) {
+  @Override
+  public Object convertFromByteToUTF8String(byte[] data) {
     if (null == data) {
       return null;
     }
-    return UTF8String.fromBytes((byte[]) data);
+    return UTF8String.fromBytes(data);
+  }
+
+  @Override
+  public byte[] convertFromByteToUTF8Bytes(byte[] data) {
+    return UTF8String.fromBytes(data).getBytes();
   }
 
+  @Override
   public Object convertFromStringToUTF8String(Object data) {
     if (null == data) {
       return null;
     }
     return UTF8String.fromString((String) data);
   }
+
+  @Override
+  public Object wrapWithGenericArrayData(Object data) {
+    return new GenericArrayData(data);
+  }
+
+  @Override
+  public Object wrapWithGenericRow(Object[] fields) {
+    return new GenericInternalRow(fields);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 5647427..452db56 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -50,6 +50,7 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.CarbonCompactionUtil
 import org.apache.carbondata.processing.partition.spliter.CarbonSplitExecutor
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
 
 
 /**
@@ -141,7 +142,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
         var result : java.util.List[PartitionSpliterRawResultIterator] = null
         try {
           exec = new CarbonSplitExecutor(segmentMapping, carbonTable)
-          result = exec.processDataBlocks(segmentId)
+          result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl())
         } catch {
           case e: Throwable =>
             LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 59ed8ba..3250a53 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -49,6 +49,8 @@ import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingExcep
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 
 object CarbonScalaUtil {
+
+  // TODO: move this to spark module
   def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
     dataType match {
       case StringType => CarbonDataTypes.STRING

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 432d50a..9e0c102 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -36,14 +36,11 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   private boolean filteredRowsExist;
 
-  private DataType dataType;
-
   private DataType blockDataType;
 
   ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
     this.columnVector = columnVector;
     this.filteredRows = filteredRows;
-    this.dataType = CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType());
   }
 
   @Override public void putBoolean(int rowId, boolean value) {
@@ -117,16 +114,16 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
     if (!filteredRows[rowId]) {
-      Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(value);
+      Decimal toDecimal = Decimal.apply(value);
       columnVector.putDecimal(counter++, toDecimal, precision);
     }
   }
 
   @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+    Decimal decimal = Decimal.apply(value);
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
-        Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(value);
-        columnVector.putDecimal(counter++, toDecimal, precision);
+        columnVector.putDecimal(counter++, decimal, precision);
       }
       rowId++;
     }
@@ -210,7 +207,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   }
 
   @Override public DataType getType() {
-    return dataType;
+    return CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 25ddd0b..0d00023 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1079,5 +1079,4 @@ object CarbonDataRDDFactory {
       hadoopConf
     ).collect()
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index 1de66c1..b7df9b4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -45,8 +45,8 @@ class SparkUnknownExpression(
 
     val values = carbonRowInstance.getValues.toSeq.map {
       case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
-      case d: java.math.BigDecimal =>
-        org.apache.spark.sql.types.Decimal.apply(d)
+      case d: java.math.BigDecimal => org.apache.spark.sql.types.Decimal.apply(d)
+      case b: Array[Byte] => org.apache.spark.unsafe.types.UTF8String.fromBytes(b)
       case value => value
     }
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index c8f7ac7..ff17cfd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -20,16 +20,17 @@ package org.apache.spark.sql.execution.command.schema
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, MetadataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, DataTypeInfo, MetadataCommand}
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.spark.util.DataTypeConverterUtil
 
 private[sql] case class CarbonAlterTableDataTypeChangeCommand(
     alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
@@ -65,8 +66,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       }
       val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
       if (carbonColumn.size == 1) {
-        CarbonScalaUtil
-          .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
+        validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
       } else {
         LOGGER.audit(s"Alter table change data type request has failed. " +
                      s"Column $columnName is invalid")
@@ -119,4 +119,51 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
     }
     Seq.empty
   }
+
+  /**
+   * This method will validate a column for its data type and check whether the column data type
+   * can be modified and update if conditions are met.
+   */
+  private def validateColumnDataType(
+      dataTypeInfo: DataTypeInfo,
+      carbonColumn: CarbonColumn): Unit = {
+    carbonColumn.getDataType.getName match {
+      case "INT" =>
+        if (!dataTypeInfo.dataType.equals("bigint") && !dataTypeInfo.dataType.equals("long")) {
+          sys.error(s"Given column ${ carbonColumn.getColName } with data type " +
+                    s"${carbonColumn.getDataType.getName} cannot be modified. " +
+                    s"Int can only be changed to bigInt or long")
+        }
+      case "DECIMAL" =>
+        if (!dataTypeInfo.dataType.equals("decimal")) {
+          sys.error(s"Given column ${ carbonColumn.getColName } with data type" +
+                    s" ${ carbonColumn.getDataType.getName} cannot be modified." +
+                    s" Decimal can be only be changed to Decimal of higher precision")
+        }
+        if (dataTypeInfo.precision <= carbonColumn.getColumnSchema.getPrecision) {
+          sys.error(s"Given column ${carbonColumn.getColName} cannot be modified. " +
+                    s"Specified precision value ${dataTypeInfo.precision} should be " +
+                    s"greater than current precision value " +
+                    s"${carbonColumn.getColumnSchema.getPrecision}")
+        } else if (dataTypeInfo.scale < carbonColumn.getColumnSchema.getScale) {
+          sys.error(s"Given column ${carbonColumn.getColName} cannot be modified. " +
+                    s"Specified scale value ${dataTypeInfo.scale} should be greater or " +
+                    s"equal to current scale value ${carbonColumn.getColumnSchema.getScale}")
+        } else {
+          // difference of precision and scale specified by user should not be less than the
+          // difference of already existing precision and scale else it will result in data loss
+          val carbonColumnPrecisionScaleDiff = carbonColumn.getColumnSchema.getPrecision -
+                                               carbonColumn.getColumnSchema.getScale
+          val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - dataTypeInfo.scale
+          if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) {
+            sys.error(s"Given column ${carbonColumn.getColName} cannot be modified. " +
+                      s"Specified precision and scale values will lead to data loss")
+          }
+        }
+      case _ =>
+        sys.error(s"Given column ${carbonColumn.getColName} with data type " +
+                  s"${carbonColumn.getDataType.getName} cannot be modified. " +
+                  s"Only Int and Decimal data types are allowed for modification")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 2eed988..3cb46f7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -51,7 +51,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWrit
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, SparkDataTypeConverterImpl, Util}
 
 class CarbonFileFormat
   extends FileFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 7b4bc0d..d2ffac7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -1141,7 +1141,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
     if(null == columnSchema) {
       null
     } else {
-        new QueryColumn(columnSchema.getColumnSchema,
+        new QueryColumn(
+          columnSchema.getColumnSchema,
           isFilterColumn,
           timeseriesFunction.toLowerCase)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/pom.xml
----------------------------------------------------------------------
diff --git a/processing/pom.xml b/processing/pom.xml
index dfabaa2..648810d 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -39,11 +39,7 @@
       <artifactId>carbondata-core</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
-    </dependency>
-	<dependency>
+	  <dependency>
       <groupId>com.univocity</groupId>
       <artifactId>univocity-parsers</artifactId>
       <version>2.2.1</version>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index bbc3697..bc28ace 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -40,9 +40,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.processing.util.CarbonLoaderUtil;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.spark.annotation.DeveloperApi;
 
-@DeveloperApi
 public class TableProcessingOperations {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/SortDataFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/SortDataFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/SortDataFormat.java
new file mode 100644
index 0000000..bcf283e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/SortDataFormat.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.sort;
+
+/**
+ * Ported from Apache Spark.
+ *
+ * Abstraction for sorting an arbitrary input buffer of data. This interface requires determining
+ * the sort key for a given element index, as well as swapping elements and moving data from one
+ * buffer to another.
+ * Example format: an array of numbers, where each element is also the key.
+ * See [[KVArraySortDataFormat]] for a more exciting format.
+ * Note: Declaring and instantiating multiple subclasses of this class would prevent JIT inlining
+ * overridden methods and hence decrease the shuffle performance.
+ *
+ * @tparam K Type of the sort key of each element
+ * @tparam Buffer Internal data structure used by a particular format (e.g., Array[Int]).
+ */
+// TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity.
+abstract class SortDataFormat<K, Buffer> {
+
+  /**
+   * Creates a new mutable key for reuse. This should be implemented if you want to override
+   * [[getKey(Buffer, Int, K)]].
+   */
+  K newKey() {
+    return null;
+  }
+
+  /**
+   * Return the sort key for the element at the given index.
+   */
+  protected abstract K getKey(Buffer data, int pos);
+
+  /**
+   * Returns the sort key for the element at the given index and reuse the input key if possible.
+   * The default implementation ignores the reuse parameter and invokes [[getKey(Buffer, Int]].
+   * If you want to override this method, you must implement [[newKey()]].
+   */
+  K getKey(Buffer data, int pos, K reuse) {
+    return getKey(data, pos);
+  }
+
+  /**
+   * Swap two elements.
+   */
+  abstract void swap(Buffer data, int pos0, int pos1);
+
+  /**
+   * Copy a single element from src(srcPos) to dst(dstPos).
+   */
+  abstract void copyElement(Buffer src, int srcPos, Buffer dst, int dstPos);
+
+  /**
+   * Copy a range of elements starting at src(srcPos) to dst, starting at dstPos.
+   * Overlapping ranges are allowed.
+   */
+  abstract void copyRange(Buffer src, int srcPos, Buffer dst, int dstPos, int length);
+
+  /**
+   * Allocates a Buffer that can hold up to 'length' elements.
+   * All elements of the buffer should be considered invalid until data is explicitly copied in.
+   */
+  abstract Buffer allocate(int length);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java
index dac3b47..377edfc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/TimSort.java
@@ -18,8 +18,6 @@ package org.apache.carbondata.processing.loading.sort.unsafe.sort;
 
 import java.util.Comparator;
 
-import org.apache.spark.util.collection.SortDataFormat;
-
 /**
  * A port of the Apache Spark's TimSort and they originally ported from Android TimSort class,
  * which utilizes a "stable, adaptive, iterative mergesort."

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java
index 92962d9..0c205d7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/sort/UnsafeIntSortDataFormat.java
@@ -20,8 +20,6 @@ import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
 
-import org.apache.spark.util.collection.SortDataFormat;
-
 /**
  * Interface implementation for utilities to sort the data.
  */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index e02f3ab..850ceca 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
@@ -47,8 +48,6 @@ import org.apache.carbondata.processing.store.CarbonFactHandler;
 import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
-import org.apache.spark.sql.types.Decimal;
-
 /**
  * This class will process the query result and convert the data
  * into a format compatible for data load
@@ -295,7 +294,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   private Object getConvertedMeasureValue(Object value, DataType type) {
     if (DataTypes.isDecimal(type)) {
       if (value != null) {
-        value = ((Decimal) value).toJavaBigDecimal();
+        value = DataTypeUtil.getDataTypeConverter().convertFromDecimalToBigDecimal(value);
       }
       return value;
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
index b18207d..173a5c0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
-import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import org.apache.carbondata.core.util.DataTypeConverter;
 
 /**
  * Used to read carbon blocks when add/split partition
@@ -46,10 +46,11 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
     this.carbonTable = carbonTable;
   }
 
-  public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId)
+  public List<PartitionSpliterRawResultIterator> processDataBlocks(
+      String segmentId, DataTypeConverter converter)
       throws QueryExecutionException, IOException {
     List<TableBlockInfo> list = null;
-    queryModel = carbonTable.createQueryModelWithProjectAllColumns(new DataTypeConverterImpl());
+    queryModel = carbonTable.createQueryModelWithProjectAllColumns(converter);
     queryModel.setForcedDetailRawQuery(true);
     List<PartitionSpliterRawResultIterator> resultList
         = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/982d03fe/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index ff33823..f22d1c1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -47,9 +47,9 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 
-import org.apache.spark.sql.types.Decimal;
 
 /**
  * Represent a page data for all columns, we store its data in columnar layout, so that
@@ -184,7 +184,7 @@ public class TablePage {
       if (DataTypes.isDecimal(measurePages[i].getDataType()) &&
           model.isCompactionFlow() &&
           value != null) {
-        value = ((Decimal) value).toJavaBigDecimal();
+        value = DataTypeUtil.getDataTypeConverter().convertFromDecimalToBigDecimal(value);
       }
       measurePages[i].putData(rowId, value);
     }


Mime
View raw message