carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajan...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3548]Geospatial Support: Modified to create and load the table with geo spatial index column and added InPolygon UDF
Date Fri, 31 Jan 2020 14:16:49 GMT
This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b2eb53d  [CARBONDATA-3548]Geospatial Support: Modified to create and load the table with geo spatial index column and added InPolygon UDF
b2eb53d is described below

commit b2eb53d0948cb591f0d6dcaab50416045ce27805
Author: Venu Reddy <venugopalreddyk@huawei.com>
AuthorDate: Wed Nov 20 12:25:42 2019 +0530

    [CARBONDATA-3548]Geospatial Support: Modified to create and load the table with geo spatial index column and added InPolygon UDF
    
    Following changes made with this PR:
    
    Added a table property index_handler to create an index column
    implicitly out of existing table schema columns during table create
    process. And append it to sort columns.
    CREATE TABLE carbontable(
    …
    longitude INT,
    latitude INT,
    …)
    STORED AS carbondata
    TBLPROPERTIES ('INDEX_HANDLER'='mygeohash',
    'INDEX_HANDLER.mygeohash.type'='geohash',
    'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude')
    
    Custom index handler. This handler allows user to create a new column
    from the set of schema columns. Newly created column name is same as
    that of handler name. type and sourcecolumns properties for the handler
    are mandatory properties. At present, only supported value for type
    property is 'geohash'.
    
    Modified load flow to allow the custom index value generation for the
    above created index column during row parse and converter steps.
    
    Alter add column/drop column/rename/type change, describe formatted DDLs
    
    modified to consider the index column during their processing.
    
    Added InPolygon UDF and query process for polygon expression.
    
    User query can contain InPolygon UDF with series of points (i.e.,
    
    longitude and latitude columns) as filter condition. First and last
    points being same. All the points when plotted and connected forms a
    closed geometry object. And query is used to fetch all the points lying
    within the closed geometry object.
    
    SELECT column1, column2 from carbonTable IN_POLYGON('115928764 40034712,
    116098022 40035764, 116091156 39961333, 115926704 39964228, 11592876
    40034712)
    
    This closes #3436
---
 .../core/constants/CarbonCommonConstants.java      |  10 ++
 .../ThriftWrapperSchemaConverterImpl.java          |   2 +
 .../core/metadata/schema/table/CarbonTable.java    |   6 +-
 .../metadata/schema/table/column/CarbonColumn.java |   8 +
 .../metadata/schema/table/column/ColumnSchema.java |  12 ++
 .../scan/filter/FilterExpressionProcessor.java     |   2 +
 .../core/scan/filter/intf/ExpressionType.java      |   3 +-
 .../core/util/AbstractDataFileFooterConverter.java |   1 +
 .../apache/carbondata/core/util/CarbonUtil.java    |   1 +
 .../apache/carbondata/core/util/CustomIndex.java   |  80 +++++++++
 .../ThriftWrapperSchemaConverterImplTest.java      |   4 +
 .../datamap/bloom/BloomCoarseGrainDataMap.java     |   2 +-
 format/src/main/thrift/schema.thrift               |   5 +
 geo/pom.xml                                        | 175 ++++++++++++++++++++
 .../org/apache/carbondata/geo/GeoHashImpl.java     | 181 +++++++++++++++++++++
 .../geo/scan/expression/PolygonExpression.java     | 130 +++++++++++++++
 .../src/test/resources/geodata.csv                 |  17 ++
 .../scala/org/apache/carbondata/geo/GeoTest.scala  | 116 +++++++++++++
 integration/spark-common/pom.xml                   |   5 +
 .../spark/load/DataLoadProcessorStepOnSpark.scala  |  10 ++
 .../carbondata/spark/util/CarbonScalaUtil.scala    |  41 +++++
 .../apache/carbondata/spark/util/CommonUtil.scala  |   2 +-
 .../spark/sql/catalyst/CarbonParserUtil.scala      | 130 +++++++++++++--
 .../command/carbonTableSchemaCommon.scala          |   3 +-
 .../scala/org/apache/carbondata/geo/GeoUtils.scala |  49 ++++++
 .../org/apache/carbondata/geo/InPolygonUDF.scala   |  43 ++---
 .../carbondata/spark/util/CarbonSparkUtil.scala    |   4 +-
 .../spark/sql/CarbonDatasourceHadoopRelation.scala |   5 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |   2 +
 .../command/management/CarbonLoadDataCommand.scala |   2 +-
 .../schema/CarbonAlterTableAddColumnCommand.scala  |   3 +
 ...nAlterTableColRenameDataTypeChangeCommand.scala |   3 +
 .../schema/CarbonAlterTableDropColumnCommand.scala |   2 +
 .../table/CarbonDescribeFormattedCommand.scala     |  23 +++
 .../datasources/SparkCarbonTableFormat.scala       |  13 ++
 .../strategy/CarbonLateDecodeStrategy.scala        |  17 ++
 .../org/apache/spark/sql/hive/CarbonRelation.scala |   3 +-
 .../apache/spark/sql/optimizer/CarbonFilters.scala |   9 +-
 .../org/apache/spark/util/AlterTableUtil.scala     |  53 +++++-
 pom.xml                                            |   5 +
 .../loading/CarbonDataLoadConfiguration.java       |  15 ++
 .../processing/loading/DataLoadProcessBuilder.java |   1 +
 .../loading/converter/FieldConverter.java          |   7 +
 .../converter/impl/BinaryFieldConverterImpl.java   |   5 +
 .../converter/impl/ComplexFieldConverterImpl.java  |  11 +-
 .../impl/DirectDictionaryFieldConverterImpl.java   |   7 +
 .../converter/impl/FieldEncoderFactory.java        |  11 +-
 .../converter/impl/IndexFieldConverterImpl.java    | 101 ++++++++++++
 .../converter/impl/MeasureFieldConverterImpl.java  |   4 +
 .../impl/NonDictionaryFieldConverterImpl.java      |   5 +
 .../loading/converter/impl/RowConverterImpl.java   |  28 +++-
 .../processing/loading/model/CarbonLoadModel.java  |  15 ++
 .../loading/parser/impl/RowParserImpl.java         |  12 ++
 .../InputProcessorStepWithNoConverterImpl.java     |  29 +++-
 .../processing/util/CarbonDataProcessorUtil.java   |   5 +-
 55 files changed, 1366 insertions(+), 72 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 22ec7cf..34aa91e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -476,6 +476,16 @@ public final class CarbonCommonConstants {
    */
   public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
 
+  /**
+   * Index handler table property. It allows user to create a new sort column from the set of
+   * existing schema columns. And can generate value for the new column after parsing each row
+   * through custom handler.
+   */
+  public static final String INDEX_HANDLER = "index_handler";
+
+  // GeoHash index handler type
+  public static final String GEOHASH = "geohash";
+
   public static final String SORT_COLUMNS = "sort_columns";
   public static final String SORT_SCOPE = "sort_scope";
   public static final String RANGE_COLUMN = "range_column";
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 6718975..f9ba679 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -209,6 +209,7 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
     thriftColumnSchema.setInvisible(wrapperColumnSchema.isInvisible());
     thriftColumnSchema.setColumnReferenceId(wrapperColumnSchema.getColumnReferenceId());
     thriftColumnSchema.setSchemaOrdinal(wrapperColumnSchema.getSchemaOrdinal());
+    thriftColumnSchema.setIndexColumn(wrapperColumnSchema.isIndexColumn());
     if (wrapperColumnSchema.isSortColumn()) {
       Map<String, String> properties = wrapperColumnSchema.getColumnProperties();
       if (null == properties) {
@@ -545,6 +546,7 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
     wrapperColumnSchema.setInvisible(externalColumnSchema.isInvisible());
     wrapperColumnSchema.setColumnReferenceId(externalColumnSchema.getColumnReferenceId());
     wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
+    wrapperColumnSchema.setIndexColumn(externalColumnSchema.isIndexColumn());
     wrapperColumnSchema.setSortColumn(false);
     Map<String, String> properties = externalColumnSchema.getColumnProperties();
     if (properties != null) {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 88271c4..b2f15d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -319,7 +319,11 @@ public class CarbonTable implements Serializable, Writable {
    */
   private void fillCreateOrderColumn() {
     List<CarbonColumn> columns = new ArrayList<CarbonColumn>();
-    columns.addAll(visibleDimensions);
+    for (CarbonDimension dimension : visibleDimensions) {
+      if (!dimension.getColumnSchema().isIndexColumn()) {
+        columns.add(dimension);
+      }
+    }
     columns.addAll(visibleMeasures);
     Collections.sort(columns, new Comparator<CarbonColumn>() {
 
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index 262db8f..862148f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -167,6 +167,14 @@ public class CarbonColumn implements Serializable {
   }
 
   /**
+   * Checks if it is index column
+   * @return Returns True if the column is an index column. Otherwise returns false.
+   */
+  public boolean isIndexColumn() {
+    return columnSchema.isIndexColumn();
+  }
+
+  /**
    * @return columnproperty
    */
   public Map<String, String> getColumnProperties() {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index ac4e7a9..933898c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -117,6 +117,8 @@ public class ColumnSchema implements Serializable, Writable {
 
   private boolean isSortColumn = false;
 
+  private boolean indexColumn = false;
+
   /**
    * aggregate function used in pre aggregate table
    */
@@ -529,6 +531,7 @@ public class ColumnSchema implements Serializable, Writable {
       }
     }
     out.writeBoolean(isLocalDictColumn);
+    out.writeBoolean(indexColumn);
   }
 
   @Override
@@ -578,6 +581,7 @@ public class ColumnSchema implements Serializable, Writable {
       }
     }
     this.isLocalDictColumn = in.readBoolean();
+    this.indexColumn = in.readBoolean();
   }
 
   /**
@@ -588,4 +592,12 @@ public class ColumnSchema implements Serializable, Writable {
     return this.getColumnName()
         .contains(".val") || this.getColumnName().contains(".");
   }
+
+  public boolean isIndexColumn() {
+    return indexColumn;
+  }
+
+  public void setIndexColumn(boolean indexColumn) {
+    this.indexColumn = indexColumn;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 7b6b5a8..19704bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -171,6 +171,8 @@ public class FilterExpressionProcessor implements FilterProcessor {
       case TRUE:
         return getFilterResolverBasedOnExpressionType(ExpressionType.TRUE, false,
             expressionTree, tableIdentifier, expressionTree);
+      case POLYGON:
+        return createFilterResolverTree(expressionTree.getChildren().get(0), tableIdentifier);
       default:
         return getFilterResolverBasedOnExpressionType(ExpressionType.UNKNOWN, false, expressionTree,
             tableIdentifier, expressionTree);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
index a89a84f..5614dda 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
@@ -44,5 +44,6 @@ public enum ExpressionType {
   ENDSWITH,
   CONTAINSWITH,
   TEXT_MATCH,
-  IMPLICIT
+  IMPLICIT,
+  POLYGON
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index f58d53b..e1a4ee2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -360,6 +360,7 @@ public abstract class AbstractDataFileFooterConverter {
         wrapperColumnSchema.setSortColumn(true);
       }
     }
+    wrapperColumnSchema.setIndexColumn(externalColumnSchema.isIndexColumn());
     wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function());
     List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
         externalColumnSchema.getParentColumnTableRelations();
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 5008a66..a1824a4 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2037,6 +2037,7 @@ public final class CarbonUtil {
     wrapperColumnSchema.setScale(externalColumnSchema.getScale());
     wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
     wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
+    wrapperColumnSchema.setIndexColumn(externalColumnSchema.isIndexColumn());
     Map<String, String> properties = externalColumnSchema.getColumnProperties();
     if (properties != null) {
       if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java b/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
new file mode 100644
index 0000000..d570e1a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract class for custom index handler. When index_handler property is configured on table, a
+ * new column is created within carbon layer from the set of schema columns in the table.
+ * A custom implementation need to be provided to extract the sub-properties of index handler such
+ * as type, source columns etc, generate the value for the new column from the source column values,
+ * query processor to handle the custom UDF filter queries based on source columns.
+ * This class is an abstract for the custom implementation.
+ * @param <ReturnType>
+ */
+public abstract class CustomIndex<ReturnType> implements Serializable {
+  public static final String CUSTOM_INDEX_DEFAULT_IMPL = "org.apache.carbondata.geo.GeoHashImpl";
+  /**
+   * Initialize the custom index handler instance.
+   * @param handlerName
+   * @param properties
+   * @throws Exception
+   */
+  public abstract void init(String handlerName, Map<String, String> properties) throws Exception;
+
+  /**
+   * Generates the custom index column value from the given source columns.
+   * @param columns
+   * @return Returns generated column value
+   * @throws Exception
+   */
+  public abstract String generate(List<?> columns) throws Exception;
+
+  /**
+   * Query processor for custom index handler.
+   * @param query
+   * @return Returns list of ranges to be fetched
+   * @throws Exception
+   */
+  public abstract ReturnType query(String query) throws Exception;
+
+  /**
+   * Deserializes and returns the custom handler instance
+   * @param serializedInstance
+   * @return
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  public static CustomIndex getCustomInstance(String serializedInstance) throws IOException {
+    return (CustomIndex) ObjectSerializationUtil.convertStringToObject(serializedInstance);
+  }
+
+  /**
+   * Serializes the custom handler instance
+   * @param instance
+   * @return
+   * @throws IOException
+   */
+  public static String getCustomInstance(CustomIndex instance) throws IOException {
+    return ObjectSerializationUtil.convertObjectToString(instance);
+  }
+}
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index 5911df3..d39dec9 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -159,6 +159,10 @@ public class ThriftWrapperSchemaConverterImplTest {
         return thriftColumnSchema;
       }
 
+      @Mock public org.apache.carbondata.format.ColumnSchema setIndexColumn(boolean indexColumn) {
+        return thriftColumnSchema;
+      }
+
       @Mock public String getColumn_id() {
         return "1";
       }
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index cf9e079..58e3852 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -125,7 +125,7 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
       dataField.setTimestampFormat(tsFormat);
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(dataField, i, nullFormat, false,
-              false, carbonTable.getTablePath());
+              false, carbonTable.getTablePath(), null);
       this.name2Converters.put(indexedColumn.get(i).getColName(), fieldConverter);
     }
     this.badRecordLogHolder = new BadRecordLogHolder();
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index f4aa9b7..ecdab27 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -133,6 +133,11 @@ struct ColumnSchema{
   **/
   /** Deprecated */
 	17: optional list<ParentColumnTableRelation> parentColumnTableRelations;
+
+  /**
+   * To specify if it is an index column. Its Default value is false
+	 */
+	18: optional bool indexColumn;
 }
 
 /**
diff --git a/geo/pom.xml b/geo/pom.xml
new file mode 100644
index 0000000..4fdcea6
--- /dev/null
+++ b/geo/pom.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-geo</artifactId>
+  <name>Apache CarbonData :: Geo</name>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <dev.path>${basedir}/../dev</dev.path>
+    <jacoco.append>true</jacoco.append>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.18</version>
+        <!-- Note config is repeated in scalatest config -->
+        <configuration>
+          <skip>false</skip>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+          <!-- testFailureIgnore>false</testFailureIgnore -->
+          <failIfNoTests>false</failIfNoTests>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.17</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.4.1</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>com.ning.maven.plugins</groupId>
+        <artifactId>maven-duplicate-finder-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <!-- Note config is repeated in surefire config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <testFailureIgnore>false</testFailureIgnore>
+          <filereports>CarbonTestSuite.txt</filereports>
+          <argLine>-ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+          </argLine>
+          <stderr />
+          <environmentVariables>
+          </environmentVariables>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
+</project>
\ No newline at end of file
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoHashImpl.java b/geo/src/main/java/org/apache/carbondata/geo/GeoHashImpl.java
new file mode 100644
index 0000000..7e9803e
--- /dev/null
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoHashImpl.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CustomIndex;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * GeoHash custom implementation.
+ * This class extends {@link CustomIndex}. It provides methods to
+ * 1. Extracts the sub-properties of geohash type index handler such as type, source columns,
+ * grid size, origin, min and max longitude and latitude of data. Validates and stores them in
+ * instance.
+ * 2. Generates column value from the longitude and latitude column values.
+ * 3. Query processor to handle the custom UDF filter queries based on longitude and latitude
+ * columns.
+ */
+public class GeoHashImpl extends CustomIndex<List<Long[]>> {
+  /**
+   * Initialize the geohash index handler instance.
+   * @param handlerName
+   * @param properties
+   * @throws Exception
+   */
+  @Override
+  public void init(String handlerName, Map<String, String> properties) throws Exception {
+    String options = properties.get(CarbonCommonConstants.INDEX_HANDLER);
+    if (StringUtils.isEmpty(options)) {
+      throw new MalformedCarbonCommandException(
+              String.format("%s property is invalid.", CarbonCommonConstants.INDEX_HANDLER));
+    }
+    options = options.toLowerCase();
+    if (!options.contains(handlerName.toLowerCase())) {
+      throw new MalformedCarbonCommandException(
+              String.format("%s property is invalid. %s is not present.",
+                      CarbonCommonConstants.INDEX_HANDLER, handlerName));
+    }
+    String commonKey = CarbonCommonConstants.INDEX_HANDLER + CarbonCommonConstants.POINT +
+            handlerName + CarbonCommonConstants.POINT;
+    String TYPE = commonKey + "type";
+    String type = properties.get(TYPE);
+    if (!CarbonCommonConstants.GEOHASH.equalsIgnoreCase(type)) {
+      throw new MalformedCarbonCommandException(
+              String.format("%s property is invalid. %s property must be %s for this class.",
+                      CarbonCommonConstants.INDEX_HANDLER, TYPE, CarbonCommonConstants.GEOHASH));
+    }
+    String SOURCE_COLUMNS = commonKey + "sourcecolumns";
+    String sourceColumnsOption = properties.get(SOURCE_COLUMNS);
+    if (StringUtils.isEmpty(sourceColumnsOption)) {
+      throw new MalformedCarbonCommandException(
+              String.format("%s property is invalid. Must specify %s property.",
+                      CarbonCommonConstants.INDEX_HANDLER, SOURCE_COLUMNS));
+    }
+    if (sourceColumnsOption.split(",").length != 2) {
+      throw new MalformedCarbonCommandException(
+              String.format("%s property is invalid. %s property must have 2 columns.",
+                      CarbonCommonConstants.INDEX_HANDLER, SOURCE_COLUMNS));
+    }
+    String SOURCE_COLUMN_TYPES = commonKey + "sourcecolumntypes";
+    String sourceDataTypes = properties.get(SOURCE_COLUMN_TYPES);
+    String[] srcTypes = sourceDataTypes.split(",");
+    for (String srcdataType : srcTypes) {
+      if (!"bigint".equalsIgnoreCase(srcdataType)) {
+        throw new MalformedCarbonCommandException(
+                String.format("%s property is invalid. %s datatypes must be long.",
+                        CarbonCommonConstants.INDEX_HANDLER, SOURCE_COLUMNS));
+      }
+    }
+    // Set the generated column data type as long
+    String TARGET_DATA_TYPE = commonKey + "datatype";
+    properties.put(TARGET_DATA_TYPE, "long");
+    String ORIGIN_LATITUDE = commonKey + "originlatitude";
+    String originLatitude = properties.get(ORIGIN_LATITUDE);
+    if (StringUtils.isEmpty(originLatitude)) {
+      throw new MalformedCarbonCommandException(
+              String.format("%s property is invalid. Must specify %s property.",
+                      CarbonCommonConstants.INDEX_HANDLER, ORIGIN_LATITUDE));
+    }
+    String MIN_LONGITUDE = commonKey + "minlongitude";
+    String MAX_LONGITUDE = commonKey + "maxlongitude";
+    String MIN_LATITUDE = commonKey + "minlatitude";
+    String MAX_LATITUDE = commonKey + "maxlatitude";
+    String minLongitude = properties.get(MIN_LONGITUDE);
+    String maxLongitude = properties.get(MAX_LONGITUDE);
+    String minLatitude = properties.get(MIN_LATITUDE);
+    String maxLatitude = properties.get(MAX_LATITUDE);
+    if (StringUtils.isEmpty(minLongitude)) {
+      throw new MalformedCarbonCommandException(
+          String.format("%s property is invalid. Must specify %s property.",
+              CarbonCommonConstants.INDEX_HANDLER, MIN_LONGITUDE));
+    }
+    if (StringUtils.isEmpty(minLatitude)) {
+      throw new MalformedCarbonCommandException(
+          String.format("%s property is invalid. Must specify %s property.",
+              CarbonCommonConstants.INDEX_HANDLER, MIN_LATITUDE));
+    }
+    if (StringUtils.isEmpty(maxLongitude)) {
+      throw new MalformedCarbonCommandException(
+          String.format("%s property is invalid. Must specify %s property.",
+              CarbonCommonConstants.INDEX_HANDLER, MAX_LONGITUDE));
+    }
+    if (StringUtils.isEmpty(maxLatitude)) {
+      throw new MalformedCarbonCommandException(
+          String.format("%s property is invalid. Must specify %s property.",
+              CarbonCommonConstants.INDEX_HANDLER, MAX_LATITUDE));
+    }
+    String GRID_SIZE = commonKey + "gridsize";
+    String gridSize = properties.get(GRID_SIZE);
+    if (StringUtils.isEmpty(gridSize)) {
+      throw new MalformedCarbonCommandException(
+              String.format("%s property is invalid. %s property must be specified.",
+                      CarbonCommonConstants.INDEX_HANDLER, GRID_SIZE));
+    }
+    String CONVERSION_RATIO = commonKey + "conversionratio";
+    String conversionRatio = properties.get(CONVERSION_RATIO);
+    if (StringUtils.isEmpty(conversionRatio)) {
+      throw new MalformedCarbonCommandException(
+              String.format("%s property is invalid. %s property must be specified.",
+                      CarbonCommonConstants.INDEX_HANDLER, CONVERSION_RATIO));
+    }
+
+    // TODO: Fill the values to the instance fields
+  }
+
+  /**
+   * Generates the GeoHash ID column value from the given source columns.
+   * @param sources Longitude and Latitude
+   * @return Returns the generated hash id
+   * @throws Exception
+   */
+  @Override
+  public String generate(List<?> sources) throws Exception {
+    if (sources.size() != 2) {
+      throw new RuntimeException("Source columns list must be of size 2.");
+    }
+    if (sources.get(0) == null || sources.get(1) == null) {
+      // Bad record. Just return null
+      return null;
+    }
+    if (!(sources.get(0) instanceof Long) || !(sources.get(1) instanceof Long)) {
+      throw new RuntimeException("Source columns must be of Long type.");
+    }
+    //TODO: generate geohashId
+    return String.valueOf(0);
+  }
+
+  /**
+   * Query processor for GeoHash.
+   * @param polygon
+   * @return Returns list of ranges of GeoHash IDs
+   * @throws Exception
+   */
+  @Override
+  public List<Long[]> query(String polygon) throws Exception {
+    List<Long[]> rangeList = new ArrayList<Long[]>();
+    // TODO: process the polygon coordinates and generate the list of ranges of GeoHash IDs
+    return rangeList;
+  }
+}
diff --git a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
new file mode 100644
index 0000000..433866f
--- /dev/null
+++ b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo.scan.expression;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
+import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.util.CustomIndex;
+
+/**
+ * InPolygon expression processor. It inputs the InPolygon string to the Geo implementation's
+ * query method, gets the list of ranges of IDs to filter as an output. And then, build
+ * InExpression with list of all the IDs present in those list of ranges.
+ */
+@InterfaceAudience.Internal
+public class PolygonExpression extends Expression {
+  private String polygon;
+  private String columnName;
+  private CustomIndex<List<Long[]>> handler;
+  private List<Expression> children = new ArrayList<Expression>();
+
+  public PolygonExpression(String polygon, String columnName, CustomIndex handler) {
+    this.polygon = polygon;
+    this.handler = handler;
+    this.columnName = columnName;
+  }
+
+  private void buildExpression(List<Long[]> ranges) {
+    // Build InExpression with list of all the values present in the ranges
+    List<Expression> inList = new ArrayList<Expression>();
+    for (Long[] range : ranges) {
+      if (range.length != 2) {
+        throw new RuntimeException("Handler query must return list of ranges with each range "
+            + "containing minimum and maximum values");
+      }
+      for (long i = range[0]; i <= range[1]; i++) {
+        inList.add(new LiteralExpression(i, DataTypes.LONG));
+      }
+    }
+    children.add(new InExpression(new ColumnExpression(columnName, DataTypes.LONG),
+        new ListExpression(inList)));
+  }
+
+  /**
+   * This method builds InExpression with list of all the values present in the list of ranges of
+   * IDs.
+   */
+  private void processExpression() {
+    List<Long[]> ranges;
+    try {
+      ranges = handler.query(polygon);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    buildExpression(ranges);
+  }
+
+  @Override
+  public ExpressionResult evaluate(RowIntf value) {
+    throw new UnsupportedOperationException("Operation not supported for Polygon expression");
+  }
+
+  @Override
+  public ExpressionType getFilterExpressionType() {
+    return ExpressionType.POLYGON;
+  }
+
+  @Override
+  public List<Expression> getChildren() {
+    if (children.isEmpty()) {
+      processExpression();
+    }
+    return children;
+  }
+
+  @Override
+  public void findAndSetChild(Expression oldExpr, Expression newExpr) {
+  }
+
+  @Override
+  public String getString() {
+    return polygon;
+  }
+
+  @Override
+  public String getStatement() {
+    return "IN_POLYGON('" + polygon + "')";
+  }
+
+  private void writeObject(ObjectOutputStream out) throws IOException {
+    out.writeObject(polygon);
+    out.writeObject(columnName);
+    out.writeObject(handler);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    polygon = (String) in.readObject();
+    columnName = (String) in.readObject();
+    handler = (CustomIndex<List<Long[]>>) in.readObject();
+    children = new ArrayList<Expression>();
+  }
+}
diff --git a/integration/spark-common-test/src/test/resources/geodata.csv b/integration/spark-common-test/src/test/resources/geodata.csv
new file mode 100644
index 0000000..42dbdf4
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/geodata.csv
@@ -0,0 +1,17 @@
+timevalue,longitude,latitude
+1575428400000,116285807,40084087
+1575428400000,116372142,40129503
+1575428400000,116187332,39979316
+1575428400000,116337069,39951887
+1575428400000,116359102,40154684
+1575428400000,116736367,39970323
+1575428400000,116720179,40009893
+1575428400000,116346961,40133550
+1575428400000,116302895,39930753
+1575428400000,116288955,39999101
+1575428400000,116176090,40129953
+1575428400000,116725575,39981115
+1575428400000,116266922,40179415
+1575428400000,116353706,40156483
+1575428400000,116362699,39942444
+1575428400000,116325378,39963129
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
new file mode 100644
index 0000000..7f05cc8
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -0,0 +1,116 @@
+package org.apache.carbondata.geo
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+class GeoTest extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll(): Unit = {
+    drop()
+  }
+
+  test("Invalid geo index handler property") {
+    // Handler name must not match with table column name.  Fails to create table.
+    var exception = intercept[MalformedCarbonCommandException](sql(
+      s"""
+         | CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
+         | COMMENT "This is a malformed table"
+         | STORED AS carbondata
+         | TBLPROPERTIES ('INDEX_HANDLER'='longitude')
+      """.stripMargin))
+
+    assert(exception.getMessage.contains(
+      "handler: longitude must not match with any other column name in the table"))
+
+    // Type property is not configured. Fails to create table.
+    exception = intercept[MalformedCarbonCommandException](sql(
+      s"""
+         | CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
+         | COMMENT "This is a malformed table"
+         | STORED AS carbondata
+         | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash')
+      """.stripMargin))
+
+    assert(exception.getMessage.contains(
+      s"${CarbonCommonConstants.INDEX_HANDLER}.mygeohash.type property must be specified"))
+
+    // Source columns are not configured. Fails to create table.
+    exception = intercept[MalformedCarbonCommandException](sql(
+      s"""
+         | CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
+         | COMMENT "This is a malformed table"
+         | STORED AS carbondata
+         | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash', 'INDEX_HANDLER.mygeohash.type'='geohash')
+      """.stripMargin))
+
+    assert(exception.getMessage.contains(
+      s"${CarbonCommonConstants.INDEX_HANDLER}.mygeohash.sourcecolumns property must be " +
+      s"specified."))
+
+    // Source columns must be present in the table. Fails to create table.
+    exception = intercept[MalformedCarbonCommandException](sql(
+      s"""
+         | CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
+         | COMMENT "This is a malformed table"
+         | STORED AS carbondata
+         | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash', 'INDEX_HANDLER.mygeohash.type'='geohash',
+         | 'INDEX_HANDLER.mygeohash.sourcecolumns'='unknown1, unknown2')
+      """.stripMargin))
+
+    assert(exception.getMessage.contains(
+      s"Source column: unknown1 in property " +
+      s"${CarbonCommonConstants.INDEX_HANDLER}.mygeohash.sourcecolumns must be a column in the " +
+      s"table."))
+  }
+
+  test("test geo table create and load and check describe formatted") {
+    createTable()
+    loadData()
+    // Test if index handler column is added as a sort column
+    val descTable = sql("describe formatted geotable").collect
+    descTable.find(_.get(0).toString.contains("Sort Scope")) match {
+      case Some(row) => assert(row.get(1).toString.contains("LOCAL_SORT"))
+      case None => assert(false)
+    }
+    descTable.find(_.get(0).toString.contains("Sort Columns")) match {
+      case Some(row) => assert(row.get(1).toString.contains("mygeohash"))
+      case None => assert(false)
+    }
+  }
+
+  override def afterAll(): Unit = {
+    drop()
+  }
+
+  def drop(): Unit = {
+    sql("drop table if exists geotable")
+  }
+
+  def createTable(): Unit = {
+    sql(s"""
+           | CREATE TABLE geotable(
+           | timevalue BIGINT,
+           | longitude LONG,
+           | latitude LONG) COMMENT "This is a GeoTable"
+           | STORED AS carbondata
+           | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash',
+           | 'INDEX_HANDLER.mygeohash.type'='geohash',
+           | 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',
+           | 'INDEX_HANDLER.mygeohash.originLatitude'='1',
+           | 'INDEX_HANDLER.mygeohash.gridSize'='2',
+           | 'INDEX_HANDLER.mygeohash.minLongitude'='1',
+           | 'INDEX_HANDLER.mygeohash.maxLongitude'='4',
+           | 'INDEX_HANDLER.mygeohash.minLatitude'='1',
+           | 'INDEX_HANDLER.mygeohash.maxLatitude'='4',
+           | 'INDEX_HANDLER.mygeohash.conversionRatio'='1')
+       """.stripMargin)
+  }
+
+  def loadData(): Unit = {
+    sql(s"""LOAD DATA local inpath '$resourcesPath/geodata.csv' INTO TABLE geotable OPTIONS
+           |('DELIMITER'= ',')""".stripMargin)
+  }
+}
+
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 0790763..5fddef0 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -56,6 +56,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-geo</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-streaming</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 17db6d9..c21d984 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -209,6 +209,16 @@ object DataLoadProcessorStepOnSpark {
           row = new CarbonRow(rowParser.parseRow(rows.next()))
         }
         row = rowConverter.convert(row)
+        if (row != null) {
+          // In case of partition, after Input processor and converter steps, all the rows are given
+          // to hive to create partition folders. As hive is unaware of non-schema index columns,
+          // should discard those columns from rows and return.
+          val schemaColumnValues = row.getData.zipWithIndex.collect {
+            case (data, index) if !conf.getDataFields()(index).getColumn.isIndexColumn =>
+              data
+          }
+          row.setData(schemaColumnValues)
+        }
         rowCounter.add(1)
         row
       }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index afdbc9c..c8fa80d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -624,6 +624,47 @@ object CarbonScalaUtil {
     }
   }
 
+  /**
+   * The method adds the index handler to sort columns if it is not already present as sort column
+   * @param handler Index handler name
+   * @param sourceColumns Source columns of index handler
+   * @param tableProperties Table properties
+   */
+  def addIndexHandlerToSortColumns(handler: String, sourceColumns: Array[String],
+      tableProperties: mutable.Map[String, String]): Unit = {
+    // Add handler into sort columns
+    val sortKey = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
+    var sortColumnsString = handler
+    // If sort columns are not configured, simply use handler as a sort column.
+    if (sortKey.isDefined && !sortKey.get.isEmpty) {
+      sortColumnsString = sortKey.get
+      val sortColumns = sortColumnsString.split(",").map(_.trim)
+      // If sort columns already contains handler, use sort columns as is.
+      if (!sortColumns.contains(handler)) {
+        // If sort columns do not contain handler as one of the sort column then check if
+        // any of handler's source columns are present as sort columns. If so, insert handler
+        // into sort columns such that it is just before its source columns. Thus, sorting of
+        // data happens w.r.t handler before any of its source columns.
+        val sourceIndex = new Array[Int](sourceColumns.length)
+        sourceColumns.zipWithIndex.foreach {
+          case (source, index) => sourceIndex(index) = sortColumns.indexWhere(_.equals(source))
+        }
+        val posIdx = sourceIndex.filter(_ >= 0)
+        if (posIdx.nonEmpty) {
+          // Found index of first source column in the sort columns. Insert handler just
+          // before it.
+          sortColumnsString = (sortColumns.slice(0, posIdx.min) ++ Array(handler) ++
+                               sortColumns.slice(posIdx.min, sortColumns.length)).mkString(",")
+        } else {
+          // None of the source columns of handler are not present as sort columns. Just append
+          // handler to existing sort columns.
+          sortColumnsString = sortColumnsString + s",$handler"
+        }
+      }
+    }
+    tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortColumnsString)
+  }
+
   def isStringDataType(dataType: DataType): Boolean = {
     dataType == StringType
   }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 865d586..5940086 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -779,7 +779,7 @@ object CommonUtil {
   }
 
   def validateSortColumns(carbonTable: CarbonTable, newProperties: Map[String, String]): Unit = {
-    val fields = carbonTable.getCreateOrderColumn().asScala
+    val fields = carbonTable.getVisibleDimensions.asScala ++ carbonTable.getVisibleMeasures.asScala
     val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties
     var sortKeyOption = newProperties.get(CarbonCommonConstants.SORT_COLUMNS)
     val varcharColsString = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index 236036f..a8ddd69 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.catalyst
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, Map}
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, ListBuffer, Map}
 import scala.language.implicitConversions
 
 import org.apache.hadoop.hive.ql.lib.Node
@@ -28,8 +29,8 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
@@ -38,7 +39,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CustomIndex}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataTypeConverterUtil}
 
@@ -89,6 +90,105 @@ object CarbonParserUtil {
   }
 
   /**
+   * The method parses, validates and processes the index_handler property.
+   * @param tableProperties Table properties
+   * @param tableFields Sequence of table fields
+   * @return <Seq[Field]> Sequence of index fields to add to table fields
+   */
+  private def processIndexProperty(tableProperties: mutable.Map[String, String],
+      tableFields: Seq[Field]): Seq[Field] = {
+    val option = tableProperties.get(CarbonCommonConstants.INDEX_HANDLER)
+    val fields = ListBuffer[Field]()
+    if (option.isDefined) {
+      if (option.get.trim.isEmpty) {
+        throw new MalformedCarbonCommandException(
+          s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is invalid. " +
+          s"Option value is empty.")
+      }
+      option.get.split(",").map(_.trim).foreach { handler =>
+        // Validate target column name
+        if (tableFields.exists(_.column.equalsIgnoreCase(handler))) {
+          throw new MalformedCarbonCommandException(
+            s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is invalid. " +
+            s"handler: $handler must not match with any other column name in the table")
+        }
+        val TYPE = s"${ CarbonCommonConstants.INDEX_HANDLER }.$handler.type"
+        val SOURCE_COLUMNS = s"${ CarbonCommonConstants.INDEX_HANDLER }.$handler.sourcecolumns"
+        val SOURCE_COLUMN_TYPES
+        = s"${ CarbonCommonConstants.INDEX_HANDLER }.$handler.sourcecolumntypes"
+        val HANDLER_CLASS = s"${ CarbonCommonConstants.INDEX_HANDLER }.$handler.class"
+        val HANDLER_INSTANCE = s"${ CarbonCommonConstants.INDEX_HANDLER }.$handler.instance"
+
+        val handlerType = tableProperties.get(TYPE)
+        if (handlerType.isEmpty || handlerType.get.trim.isEmpty) {
+          throw new MalformedCarbonCommandException(
+            s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is invalid. " +
+            s"$TYPE property must be specified.")
+        }
+        val sourceColumnsOption = tableProperties.get(SOURCE_COLUMNS)
+        if (sourceColumnsOption.isEmpty || sourceColumnsOption.get.trim.isEmpty) {
+          throw new MalformedCarbonCommandException(
+            s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is invalid. " +
+            s"$SOURCE_COLUMNS property must be specified.")
+        }
+        val sourcesWithoutSpaces = sourceColumnsOption.get.replaceAll("\\s", "")
+        // Validate source columns
+        val sources = sourcesWithoutSpaces.split(",")
+        if (sources.distinct.length != sources.size) {
+          throw new MalformedCarbonCommandException(
+            s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is invalid. " +
+            s"$SOURCE_COLUMNS property cannot have duplicate columns.")
+        }
+        val sourceTypes = StringBuilder.newBuilder
+        sources.foreach { column =>
+          tableFields.find(_.column.equalsIgnoreCase(column)) match {
+            case Some(field) => sourceTypes.append(field.dataType.get).append(",")
+            case None =>
+              throw new MalformedCarbonCommandException(
+                s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is invalid. " +
+                s"Source column: $column in property " +
+                s"$SOURCE_COLUMNS must be a column in the table.")
+          }
+        }
+        tableProperties.put(SOURCE_COLUMNS, sourcesWithoutSpaces)
+        tableProperties.put(SOURCE_COLUMN_TYPES, sourceTypes.dropRight(1).toString())
+        val handlerClass = tableProperties.get(HANDLER_CLASS)
+        val handlerClassName: String = handlerClass match {
+          case Some(className) => className.trim
+          case None =>
+            // use handler type to find the default implementation
+            if (handlerType.get.trim.equalsIgnoreCase(CarbonCommonConstants.GEOHASH)) {
+              // Use GeoHash default implementation
+              val className = CustomIndex.CUSTOM_INDEX_DEFAULT_IMPL
+              tableProperties.put(HANDLER_CLASS, className)
+              className
+            } else {
+              throw new MalformedCarbonCommandException(
+                s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is invalid. " +
+                s"Unsupported value: ${ handlerType.get } specified for property $TYPE.")
+            }
+        }
+        try {
+          val handlerClass: Class[_] = java.lang.Class.forName(handlerClassName)
+          val instance = handlerClass.newInstance().asInstanceOf[CustomIndex[_]]
+          instance.init(handler, tableProperties.asJava)
+          tableProperties.put(HANDLER_INSTANCE, CustomIndex.getCustomInstance(instance))
+        } catch {
+          case ex@(_: ClassNotFoundException | _: InstantiationError | _: IllegalAccessException |
+                   _: ClassCastException) =>
+            val err = s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property process failed. "
+            LOGGER.error(err, ex)
+            throw new MalformedCarbonCommandException(err, ex)
+        }
+        // Add index handler as a sort column if it is not already present in it.
+        CarbonScalaUtil.addIndexHandlerToSortColumns(handler, sources, tableProperties)
+        fields += Field(handler, Some("BigInt"), Some(handler), Some(null), index = true)
+      }
+    }
+    fields
+  }
+
+  /**
    * This will prepare the Model from the Tree details.
    *
    * @param ifNotExistPresent
@@ -110,8 +210,12 @@ object CarbonParserUtil {
       isAlterFlow: Boolean = false,
       tableComment: Option[String] = None): TableModel = {
 
+    // Process index handler property
+    val indexFields = processIndexProperty(tableProperties, fields)
+    val allFields = fields ++ indexFields
+
     // do not allow below key words as column name
-    validateColumnNames(fields)
+    validateColumnNames(allFields)
 
     fields.zipWithIndex.foreach { case (field, index) =>
       field.schemaOrdinal = index
@@ -129,7 +233,7 @@ object CarbonParserUtil {
         } ")
     }
     val (dims, msrs, noDictionaryDims, sortKeyDims, varcharColumns) = extractDimAndMsrFields(
-      fields, tableProperties)
+      fields, indexFields, tableProperties)
 
     // column properties
     val colProps = extractColumnProperties(fields, tableProperties)
@@ -232,7 +336,8 @@ object CarbonParserUtil {
 
     if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
       // validate the column_meta_cache option
-      val tableColumns = dims.map(x => x.name.get) ++ msrs.map(x => x.name.get)
+      val tableColumns = dims.view.filterNot(_.index).map(x => x.name.get) ++
+                         msrs.map(x => x.name.get)
       CommonUtil.validateColumnMetaCacheFields(
         dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
         tableName,
@@ -513,10 +618,11 @@ object CarbonParserUtil {
    * By default all string cols are dimensions.
    *
    * @param fields
+   * @param indexFields
    * @param tableProperties
    * @return
    */
-  protected def extractDimAndMsrFields(fields: Seq[Field],
+  protected def extractDimAndMsrFields(fields: Seq[Field], indexFields: Seq[Field],
       tableProperties: Map[String, String]):
   (Seq[Field], Seq[Field], Seq[String], Seq[String], Seq[String]) = {
     var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
@@ -524,6 +630,8 @@ object CarbonParserUtil {
     var noDictionaryDims: Seq[String] = Seq[String]()
     var varcharCols: Seq[String] = Seq[String]()
 
+    val allFields = fields ++ indexFields
+
     // All long_string cols should be there in create table cols and should be of string data type
     if (tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS).isDefined) {
       varcharCols = tableProperties(CarbonCommonConstants.LONG_STRING_COLUMNS)
@@ -544,12 +652,12 @@ object CarbonParserUtil {
       val sortKey = sortKeyString.split(',').map(_.trim)
       CommonUtil.validateSortColumns(
         sortKey,
-        fields.map { field => (field.column, field.dataType.get) },
+        allFields.map { field => (field.column, field.dataType.get) },
         varcharCols
       )
       sortKey.foreach { dimension =>
         if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase)) {
-          fields.foreach { field =>
+          allFields.foreach { field =>
             if (field.column.equalsIgnoreCase(dimension)) {
               sortKeyDimsTmp :+= field.column
             }
@@ -592,7 +700,7 @@ object CarbonParserUtil {
 
     // by default consider all String cols as dims and if any dictionary include isn't present then
     // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
-    fields.foreach { field =>
+    allFields.foreach { field =>
       if (field.dataType.get.toUpperCase.equals("TIMESTAMP")) {
         noDictionaryDims :+= field.column
         dimFields += field
@@ -980,7 +1088,7 @@ object CarbonParserUtil {
           field.precision, field.scale, field.rawSchema, field.columnComment)
       case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
         field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
-        field.columnComment)
+        field.columnComment, field.index)
       case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
         field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
         field.columnComment)
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index d2daf8e..83a5000 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -71,7 +71,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
     storeType: Option[String] = Some("columnar"),
     var schemaOrdinal: Int = -1,
     var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "",
-    var columnComment: String = "") {
+    var columnComment: String = "", var index: Boolean = false) {
   override def equals(o: Any) : Boolean = o match {
     case that: Field =>
       that.column.equalsIgnoreCase(this.column)
@@ -664,6 +664,7 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setScale(field.scale)
     columnSchema.setSchemaOrdinal(field.schemaOrdinal)
     columnSchema.setSortColumn(false)
+    columnSchema.setIndexColumn(field.index)
     if (isVarcharColumn(colName)) {
       columnSchema.setDataType(DataTypes.VARCHAR)
     }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/geo/GeoUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/geo/GeoUtils.scala
new file mode 100644
index 0000000..7c9edd0
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/geo/GeoUtils.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.util.CarbonException
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CustomIndex
+
+object GeoUtils {
+  def getGeoHashHandler(tableProperties: mutable.Map[String, String])
+                        : (String, CustomIndex[_]) = {
+    val indexProperty = tableProperties.get(CarbonCommonConstants.INDEX_HANDLER)
+    if (indexProperty.isEmpty || indexProperty.get.trim.isEmpty) {
+      CarbonException.analysisException(
+        s"Table do not have ${CarbonCommonConstants.INDEX_HANDLER} property " +
+        s"with ${CarbonCommonConstants.GEOHASH} type handler")
+    }
+    val handler = indexProperty.get.split(",").map(_.trim).filter(handler =>
+      CarbonCommonConstants.GEOHASH.equalsIgnoreCase(
+        tableProperties.getOrElse(s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.type", "")))
+      .map(handler => (handler,
+        tableProperties.get(s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.instance")))
+    if (handler.isEmpty || handler.length != 1 || handler(0)._1.isEmpty
+      || handler(0)._2.isEmpty) {
+      CarbonException.analysisException(
+        s"Table do not have ${CarbonCommonConstants.INDEX_HANDLER} property " +
+        s"with ${CarbonCommonConstants.GEOHASH} type handler")
+    }
+    (handler(0) _1, CustomIndex.getCustomInstance(handler(0)._2.get))
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/integration/spark2/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
similarity index 61%
copy from core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
copy to integration/spark2/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
index a89a84f..1f419ed 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
@@ -15,34 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.core.scan.filter.intf;
+package org.apache.carbondata.geo
 
-public enum ExpressionType {
+import org.apache.spark.sql.sources.Filter
 
-  AND,
-  OR,
-  NOT,
-  EQUALS,
-  NOT_EQUALS,
-  LESSTHAN,
-  LESSTHAN_EQUALTO,
-  GREATERTHAN,
-  GREATERTHAN_EQUALTO,
-  ADD,
-  SUBSTRACT,
-  DIVIDE,
-  MULTIPLY,
-  IN,
-  LIST,
-  NOT_IN,
-  UNKNOWN,
-  LITERAL,
-  RANGE,
-  FALSE,
-  TRUE,
-  STARTSWITH,
-  ENDSWITH,
-  CONTAINSWITH,
-  TEXT_MATCH,
-  IMPLICIT
+import org.apache.carbondata.common.annotations.InterfaceAudience
+
+@InterfaceAudience.Internal
+class InPolygonUDF extends (String => Boolean) with Serializable {
+  override def apply(v1: String): Boolean = {
+    true // Carbon applies the filter. So, Spark do not have to apply filter.
+  }
+}
+
+@InterfaceAudience.Internal
+case class InPolygon(queryString: String) extends Filter {
+  override def references: Array[String] = null
 }
+
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 9f27193..700cf91 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -72,8 +72,8 @@ object CarbonSparkUtil {
     val carbonTable = carbonRelation.carbonTable
     val columnSchemas: mutable.Buffer[ColumnSchema] = carbonTable.getTableInfo.getFactTable.
       getListOfColumns.asScala
-      .filter(cSchema => !cSchema.isInvisible && cSchema.getSchemaOrdinal != -1).
-      sortWith(_.getSchemaOrdinal < _.getSchemaOrdinal)
+      .filter(cSchema => !cSchema.isInvisible && cSchema.getSchemaOrdinal != -1 &&
+                         !cSchema.isIndexColumn).sortWith(_.getSchemaOrdinal < _.getSchemaOrdinal)
     val columnList = columnSchemas.toList.asJava
     carbonRelation.dimensionsAttr.foreach(attr => {
       val carbonColumn = carbonTable.getColumnByName(attr.name)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 032e3c5..5a8ec69 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -70,7 +72,8 @@ case class CarbonDatasourceHadoopRelation(
       filters: Array[Filter],
       partitions: Seq[PartitionSpec]): RDD[InternalRow] = {
     val filterExpression: Option[Expression] = filters.flatMap { filter =>
-      CarbonFilters.createCarbonFilter(schema, filter)
+      CarbonFilters.createCarbonFilter(schema, filter,
+        carbonTable.getTableInfo.getFactTable.getTableProperties.asScala)
     }.reduceOption(new AndExpression(_, _))
 
     val projection = new CarbonProjection
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index e193dd8..0f10330 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF}
 import org.apache.carbondata.events._
+import org.apache.carbondata.geo.InPolygonUDF
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
@@ -84,6 +85,7 @@ class CarbonEnv {
     // TODO: move it to proper place, it should be registered by datamap implementation
     sparkSession.udf.register("text_match", new TextMatchUDF)
     sparkSession.udf.register("text_match_with_limit", new TextMatchMaxDocUDF)
+    sparkSession.udf.register("in_polygon", new InPolygonUDF)
 
     // added for handling timeseries function like hour, minute, day , month , year
     sparkSession.udf.register("timeseries", new TimeSeriesFunction)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 39addfa..f31a8cb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -428,7 +428,7 @@ case class CarbonLoadDataCommand(
     // input data from csv files. Convert to logical plan
     val allCols = new ArrayBuffer[String]()
     // get only the visible dimensions from table
-    allCols ++= table.getVisibleDimensions.asScala.map(_.getColName)
+    allCols ++= table.getVisibleDimensions().asScala.filterNot(_.isIndexColumn).map(_.getColName)
     allCols ++= table.getVisibleMeasures.asScala.map(_.getColName)
     var attributes =
       StructType(
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index c526387..48bcab1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -62,6 +62,9 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
         throw new MalformedCarbonCommandException(
           "alter table add column is not supported for index datamap")
       }
+      val alterColumns =
+        (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols).map(_.column)
+      AlterTableUtil.validateForIndexHandlerName(carbonTable, alterColumns)
       val operationContext = new OperationContext
       val alterTableAddColumnListener = AlterTableAddColumnPreEvent(sparkSession, carbonTable,
         alterTableAddColumnsModel)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index a26fac4..190b776 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -113,6 +113,9 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
         throw new MalformedCarbonCommandException(
           "alter table column rename is not supported for index datamap")
       }
+      // Do not allow index handler's source columns to be changed.
+      AlterTableUtil.validateForIndexHandlerSources(carbonTable,
+        List(alterTableColRenameAndDataTypeChangeModel.columnName))
       val operationContext = new OperationContext
       operationContext.setProperty("childTableColumnRename", childTableColumnRename)
       val alterTableColRenameAndDataTypeChangePreEvent =
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 9370396..2cd4603 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -61,6 +61,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
         throw new MalformedCarbonCommandException(
           "alter table drop column is not supported for index datamap")
       }
+      // Do not allow index handler's source columns to be dropped.
+      AlterTableUtil.validateForIndexHandlerSources(carbonTable, alterTableDropColumnModel.columns)
       val partitionInfo = carbonTable.getPartitionInfo()
       val tableColumns = carbonTable.getCreateOrderColumn().asScala
       if (partitionInfo != null) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index cf4dbe9..5f54b32 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -56,6 +56,29 @@ private[sql] case class CarbonDescribeFormattedCommand(
 
     val carbonTable = relation.carbonTable
     val tblProps = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+    // Append index handler columns
+    val indexes = tblProps.get(CarbonCommonConstants.INDEX_HANDLER)
+    if (indexes.isDefined) {
+      results ++= Seq(
+        ("", "", ""),
+        ("## Custom Index Information", "", "")
+      )
+      val indexList = indexes.get.split(",").map(_.trim)
+      indexList.zip(Stream from 1).foreach {
+        case(index, count) =>
+          results ++= Seq(
+            ("Type", tblProps(s"${ CarbonCommonConstants.INDEX_HANDLER }.$index.type"), ""),
+            ("Class", tblProps(s"${ CarbonCommonConstants.INDEX_HANDLER }.$index.class"), ""),
+            ("Column Name", index, ""),
+            ("Column Data Type",
+              tblProps(s"${ CarbonCommonConstants.INDEX_HANDLER }.$index.datatype"), ""),
+            ("Sources Columns",
+              tblProps(s"${ CarbonCommonConstants.INDEX_HANDLER }.$index.sourcecolumns"), ""))
+          if (indexList.length != count) {
+            results ++= Seq(("", "", ""))
+          }
+      }
+    }
     // If Sort Columns are given and Sort Scope is not given in either table properties
     // or carbon properties then pass LOCAL_SORT as the sort scope,
     // else pass NO_SORT
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index bde74fd..18a7de1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -105,6 +105,19 @@ with Serializable {
       carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
         carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
           CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+    // If index handler property is configured, set flag to indicate index columns are present.
+    // So that InputProcessorStepWithNoConverterImpl can generate the values for those columns,
+    // convert them and then apply sort/write steps.
+    val handler =
+    table.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.INDEX_HANDLER)
+    if (handler != null) {
+      val sortScope = optionsFinal.get("sort_scope")
+      if (sortScope.equalsIgnoreCase(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) {
+        // Index handler non-schema column must be sorted
+        optionsFinal.put("sort_scope", "LOCAL_SORT")
+      }
+      model.setIndexColumnsPresent(true)
+    }
     optionsFinal
       .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
     val partitionStr =
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index b7d1511..7c77431 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.datamap.{TextMatch, TextMatchLimit, TextMatchMaxDocUDF, TextMatchUDF}
+import org.apache.carbondata.geo.{InPolygon, InPolygonUDF}
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 /**
@@ -286,6 +287,15 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     }
 
     var vectorPushRowFilters = CarbonProperties.getInstance().isPushRowFiltersForVector
+
+    // Spark cannot filter the rows from the pages in case of polygon query. So, we do the row
+    // level filter at carbon and return the rows directly.
+    if (candidatePredicates
+      .exists(exp => exp.isInstanceOf[ScalaUDF] &&
+                     exp.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonUDF])) {
+      vectorPushRowFilters = true
+    }
+
     // In case of mixed format, make the vectorPushRowFilters always false as other formats
     // filtering happens in spark layer.
     if (vectorPushRowFilters && extraRdd.isDefined) {
@@ -551,6 +561,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           predicate match {
             case u: ScalaUDF if u.function.isInstanceOf[TextMatchUDF] ||
                                 u.function.isInstanceOf[TextMatchMaxDocUDF] => count = count + 1
+            case _ =>
           }
         }
         if (count > 1) {
@@ -618,6 +629,12 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         }
         Some(TextMatchLimit(u.children.head.toString(), u.children.last.toString()))
 
+      case u: ScalaUDF if u.function.isInstanceOf[InPolygonUDF] =>
+        if (u.children.size > 1) {
+          throw new MalformedCarbonCommandException("Expect one string in polygon")
+        }
+        Some(InPolygon(u.children.head.toString()))
+
       case or@Or(left, right) =>
 
         val leftFilter = translateFilter(left, true)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index fb88ce4..6db8ce3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -48,7 +48,8 @@ case class CarbonRelation(
   }
 
   val dimensionsAttr: Seq[AttributeReference] = {
-    val sett = new LinkedHashSet(carbonTable.getVisibleDimensions.asScala.asJava)
+    val sett = new LinkedHashSet(carbonTable.getVisibleDimensions.asScala.filterNot(_.isIndexColumn)
+      .asJava)
     sett.asScala.toSeq.map(dim => {
       val dimval = carbonTable.getDimensionByName(dim.getColName)
       val output: DataType = dimval.getDataType.getName.toLowerCase match {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 7aeda33..75405ec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.optimizer
 import java.util.ArrayList
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.Try
 
 import org.apache.spark.sql._
@@ -43,6 +44,8 @@ import org.apache.carbondata.core.scan.expression.MatchExpression
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.datamap.{TextMatch, TextMatchLimit}
+import org.apache.carbondata.geo.{GeoUtils, InPolygon}
+import org.apache.carbondata.geo.scan.expression.{PolygonExpression => CarbonPolygonExpression}
 
 /**
  * All filter conversions are done here.
@@ -54,7 +57,8 @@ object CarbonFilters {
    * Converts data sources filters to carbon filter predicates.
    */
   def createCarbonFilter(schema: StructType,
-      predicate: sources.Filter): Option[CarbonExpression] = {
+      predicate: sources.Filter,
+      tableProperties: mutable.Map[String, String]): Option[CarbonExpression] = {
     val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
 
     def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
@@ -140,6 +144,9 @@ object CarbonFilters {
           Some(new MatchExpression(queryString))
         case TextMatchLimit(queryString, maxDoc) =>
           Some(new MatchExpression(queryString, Try(maxDoc.toInt).getOrElse(Integer.MAX_VALUE)))
+        case InPolygon(queryString) =>
+          val (columnName, handler) = GeoUtils.getGeoHashHandler(tableProperties)
+          Some(new CarbonPolygonExpression(queryString, columnName, handler))
         case _ => None
       }
     }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index f314e74..854afdd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -450,10 +450,9 @@ object AlterTableUtil {
       validateGlobalSortPartitions(carbonTable, lowerCasePropertiesMap)
 
       // validate the Sort Scope and Sort Columns
-      validateSortScopeAndSortColumnsProperties(carbonTable, lowerCasePropertiesMap)
-
-      // validate the Sort Scope and Sort Columns
-      validateSortScopeAndSortColumnsProperties(carbonTable, lowerCasePropertiesMap)
+      validateSortScopeAndSortColumnsProperties(carbonTable,
+        lowerCasePropertiesMap,
+        tblPropertiesMap)
 
       // validate the Compaction Level Threshold
       validateCompactionLevelThresholdProperties(carbonTable, lowerCasePropertiesMap)
@@ -615,7 +614,8 @@ object AlterTableUtil {
     // validate column meta cache property
     if (propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
       val schemaList: util.List[ColumnSchema] = CarbonUtil
-        .getColumnSchemaList(carbonTable.getVisibleDimensions, carbonTable.getVisibleMeasures)
+        .getColumnSchemaList(carbonTable.getVisibleDimensions.asScala
+          .filterNot(_.getColumnSchema.isIndexColumn).asJava, carbonTable.getVisibleMeasures)
       val tableColumns: Seq[String] = schemaList.asScala
         .map(columnSchema => columnSchema.getColumnName)
       CommonUtil
@@ -676,9 +676,20 @@ object AlterTableUtil {
   }
 
   def validateSortScopeAndSortColumnsProperties(carbonTable: CarbonTable,
-      propertiesMap: mutable.Map[String, String]): Unit = {
+                                                propertiesMap: mutable.Map[String, String],
+                                                tblPropertiesMap: mutable.Map[String, String]
+                                               ): Unit = {
     CommonUtil.validateSortScope(propertiesMap)
     CommonUtil.validateSortColumns(carbonTable, propertiesMap)
+    val indexProp = tblPropertiesMap.get(CarbonCommonConstants.INDEX_HANDLER)
+    if (indexProp.isDefined) {
+      indexProp.get.split(",").map(_.trim).foreach { handler =>
+        val SOURCE_COLUMNS = s"${ CarbonCommonConstants.INDEX_HANDLER }.$handler.sourcecolumns"
+        val sourceColumns = tblPropertiesMap(SOURCE_COLUMNS).split(",").map(_.trim)
+        // Add index handler as a sort column if it is not already present in it.
+        CarbonScalaUtil.addIndexHandlerToSortColumns(handler, sourceColumns, propertiesMap)
+      }
+    }
     // match SORT_SCOPE and SORT_COLUMNS
     val newSortScope = propertiesMap.get(CarbonCommonConstants.SORT_SCOPE)
     val newSortColumns = propertiesMap.get(CarbonCommonConstants.SORT_COLUMNS)
@@ -1038,4 +1049,34 @@ object AlterTableUtil {
       CommonUtil.validateLoadMinSize(propertiesMap, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)
     }
   }
+
+  def validateForIndexHandlerName(carbonTable: CarbonTable, alterColumns: Seq[String]): Unit = {
+    // Do not allow columns to be added with index handler name
+    val properties = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+    val indexProperty = properties.get(CarbonCommonConstants.INDEX_HANDLER)
+    if (indexProperty.isDefined) {
+      indexProperty.get.split(",").map(_.trim).foreach(element =>
+        if (alterColumns.contains(element)) {
+          throw new MalformedCarbonCommandException(s"Column: $element is not allowed. " +
+            s"This column is present in ${CarbonCommonConstants.INDEX_HANDLER} table property.")
+        })
+      }
+  }
+
+  def validateForIndexHandlerSources(carbonTable: CarbonTable, alterColumns: Seq[String]): Unit = {
+    // Do not allow index handler source columns to be altered
+    val properties = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+    val indexProperty = properties.get(CarbonCommonConstants.INDEX_HANDLER)
+    if (indexProperty.isDefined) {
+      indexProperty.get.split(",").map(_.trim).foreach { element =>
+        val srcColumns
+        = properties.get(CarbonCommonConstants.INDEX_HANDLER + s".$element.sourcecolumns")
+        val common = alterColumns.intersect(srcColumns.get.split(",").map(_.trim))
+        if (common.nonEmpty) {
+          throw new MalformedCarbonCommandException(s"Columns present in " +
+            s"${CarbonCommonConstants.INDEX_HANDLER} table property cannot be altered.")
+        }
+      }
+    }
+  }
 }
diff --git a/pom.xml b/pom.xml
index a1854b4..ba00dce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,7 @@
     <module>integration/presto</module>
     <module>examples/flink</module>
     <module>streaming</module>
+    <module>geo</module>
   </modules>
 
   <properties>
@@ -525,6 +526,8 @@
               <sourceDirectories>
                 <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/geo/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/geo/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
@@ -573,6 +576,8 @@
               <sourceDirectories>
                 <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/geo/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/geo/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 54e7bf7..f8aaebd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -101,6 +101,13 @@ public class CarbonDataLoadConfiguration {
 
   private OutputFilesInfoHolder outputFilesInfoHolder;
 
+  /**
+   * Whether index columns are present. This flag should be set only when all the schema
+   * columns are already converted. Now, just need to generate and convert index columns present in
+   * data fields.
+   */
+  private boolean isIndexColumnsPresent;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -384,4 +391,12 @@ public class CarbonDataLoadConfiguration {
   public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) {
     this.outputFilesInfoHolder = outputFilesInfoHolder;
   }
+
+  public boolean isIndexColumnsPresent() {
+    return isIndexColumnsPresent;
+  }
+
+  public void setIndexColumnsPresent(boolean indexColumnsPresent) {
+    isIndexColumnsPresent = indexColumnsPresent;
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 20fe32e..550afc8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -190,6 +190,7 @@ public final class DataLoadProcessBuilder {
     configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     configuration.setHeader(loadModel.getCsvHeaderColumns());
     configuration.setSegmentId(loadModel.getSegmentId());
+    configuration.setIndexColumnsPresent(loadModel.isIndexColumnsPresent());
     List<LoadMetadataDetails> loadMetadataDetails = loadModel.getLoadMetadataDetails();
     if (loadMetadataDetails != null) {
       for (LoadMetadataDetails detail : loadMetadataDetails) {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
index 06f2ffc..6a855b3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.loading.converter;
 
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 
 /**
@@ -40,6 +41,12 @@ public interface FieldConverter {
   Object convert(Object value, BadRecordLogHolder logHolder) throws RuntimeException;
 
   /**
+   * This method gets data field for the field.
+   * @return
+   */
+  DataField getDataField();
+
+  /**
    * This method clears all the dictionary caches being acquired.
    */
   void clear();
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
index d006704..28e8afb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
@@ -74,6 +74,11 @@ public class BinaryFieldConverterImpl implements FieldConverter {
   }
 
   @Override
+  public DataField getDataField() {
+    return dataField;
+  }
+
+  @Override
   public void clear() {
   }
 }
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
index 55a0101..fa1e957 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 
@@ -31,10 +32,13 @@ public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterI
   private GenericDataType genericDataType;
 
   private int index;
+  private DataField dataField;
 
-  public ComplexFieldConverterImpl(GenericDataType genericDataType, int index) {
+  public ComplexFieldConverterImpl(DataField dataField, GenericDataType genericDataType,
+      int index) {
     this.genericDataType = genericDataType;
     this.index = index;
+    this.dataField = dataField;
   }
 
   @Override
@@ -57,6 +61,11 @@ public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterI
     }
   }
 
+  @Override
+  public DataField getDataField() {
+    return dataField;
+  }
+
   /**
    * Method to clear out the dictionary caches. In this instance nothing to clear.
    */
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
index 2306e99..cfdd337 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -34,6 +34,7 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
   private DirectDictionaryGenerator directDictionaryGenerator;
 
   private int index;
+  private DataField dataField;
 
   private String nullFormat;
 
@@ -60,6 +61,7 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
           .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
     }
     this.index = index;
+    this.dataField = dataField;
     this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
@@ -96,6 +98,11 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
     }
   }
 
+  @Override
+  public DataField getDataField() {
+    return dataField;
+  }
+
   /**
    * Method to clean the dictionary cache. In this instance nothing to clear.
    */
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 2f089bf..68c505a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.FieldConverter;
 import org.apache.carbondata.processing.loading.converter.impl.binary.Base64BinaryDecoder;
@@ -64,19 +65,23 @@ public class FieldEncoderFactory {
    * @param isEmptyBadRecord        whether is Empty BadRecord
    * @param isConvertToBinary       whether the no dictionary field to be converted to binary or not
    * @param binaryDecoder           carbon binary decoder for loading data
+   * @param configuration           Data load configuration
    * @return
    */
   public FieldConverter createFieldEncoder(
       DataField dataField, int index, String nullFormat, boolean isEmptyBadRecord,
-      boolean isConvertToBinary, String binaryDecoder) {
+      boolean isConvertToBinary, String binaryDecoder, CarbonDataLoadConfiguration configuration) {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimension()) {
-      if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+      if (dataField.getColumn().isIndexColumn()) {
+        return new IndexFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord,
+            configuration);
+      } else if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
           !dataField.getColumn().isComplex()) {
         return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
             isEmptyBadRecord);
       } else if (dataField.getColumn().isComplex()) {
-        return new ComplexFieldConverterImpl(
+        return new ComplexFieldConverterImpl(dataField,
             createComplexDataType(dataField, nullFormat, getBinaryDecoder(binaryDecoder)), index);
       } else if (dataField.getColumn().getDataType() == DataTypes.BINARY) {
         BinaryDecoder binaryDecoderObject = getBinaryDecoder(binaryDecoder);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/IndexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/IndexFieldConverterImpl.java
new file mode 100644
index 0000000..062c251
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/IndexFieldConverterImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.util.CustomIndex;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Converter for Index handler columns
+ */
+public class IndexFieldConverterImpl extends MeasureFieldConverterImpl {
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(MeasureFieldConverterImpl.class.getName());
+  private int index;
+  private int[] sourceIndexes;
+  CustomIndex instance;
+
+  public IndexFieldConverterImpl(DataField dataField, String nullFormat, int index,
+      boolean isEmptyBadRecord, CarbonDataLoadConfiguration configuration) {
+    super(dataField, nullFormat, index, isEmptyBadRecord);
+    this.index = index;
+    Map<String, String> properties =
+        configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+            .getTableProperties();
+    try {
+      instance = CustomIndex.getCustomInstance(properties.get(
+          CarbonCommonConstants.INDEX_HANDLER + "." + dataField.getColumn().getColName()
+              + ".instance"));
+    } catch (IOException e) {
+      LOGGER.error("Failed to get the custom instance", e);
+      throw new RuntimeException(e);
+    }
+    String sourceColumns = properties.get(
+        CarbonCommonConstants.INDEX_HANDLER + "." + dataField.getColumn().getColName()
+            + ".sourcecolumns");
+    String[] sources = sourceColumns.split(",");
+    sourceIndexes = new int[sources.length];
+    int idx = 0;
+    for (String source : sources) {
+      sourceIndexes[idx++] = getDataFieldIndexByName(configuration.getDataFields(), source);
+    }
+  }
+
+  private int getDataFieldIndexByName(DataField[] fields, String column) {
+    for (int i = 0; i < fields.length; i++) {
+      if (fields[i].getColumn().getColName().equalsIgnoreCase(column)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  private String generateIndexValue(CarbonRow row) {
+    List<Object> sourceValues = new ArrayList<Object>();
+    for (int sourceIndex : sourceIndexes) {
+      sourceValues.add(row.getData()[sourceIndex]);
+    }
+    String value = null;
+    try {
+      value = instance.generate(sourceValues);
+    } catch (Exception e) {
+      LOGGER.error("Failed to generate index column value", e);
+    }
+    return value;
+  }
+
+  @Override
+  public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+      throws CarbonDataLoadingException {
+    row.update(generateIndexValue(row), index);
+    super.convert(row, logHolder);
+  }
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 62167b9..f4fa2b5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -138,6 +138,10 @@ public class MeasureFieldConverterImpl implements FieldConverter {
     }
   }
 
+  @Override
+  public DataField getDataField() {
+    return dataField;
+  }
 
   /**
    * Method to clean the dictionary cache. As in this MeasureFieldConverterImpl convert no
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
index dfc5ac9..72a2220 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -118,6 +118,11 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
   }
 
   @Override
+  public DataField getDataField() {
+    return dataField;
+  }
+
+  @Override
   public void clear() {
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index eb6b5fa..9105d27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -82,15 +82,22 @@ public class RowConverterImpl implements RowConverter {
         configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
             .toString());
     List<FieldConverter> fieldConverterList = new ArrayList<>();
+    List<FieldConverter> nonSchemaFieldConverterList = new ArrayList<>();
     long lruCacheStartTime = System.currentTimeMillis();
 
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], i, nullFormat, isEmptyBadRecord, isConvertToBinary,
               (String) configuration.getDataLoadProperty(
-                  CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER));
-      fieldConverterList.add(fieldConverter);
+                  CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER),
+              configuration);
+      if (fields[i].getColumn().isIndexColumn()) {
+        nonSchemaFieldConverterList.add(fieldConverter);
+      } else {
+        fieldConverterList.add(fieldConverter);
+      }
     }
+    fieldConverterList.addAll(nonSchemaFieldConverterList);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
     fieldConverters = fieldConverterList.toArray(new FieldConverter[0]);
@@ -102,6 +109,12 @@ public class RowConverterImpl implements RowConverter {
     logHolder.setLogged(false);
     logHolder.clear();
     for (int i = 0; i < fieldConverters.length; i++) {
+      if (configuration.isIndexColumnsPresent() && !fieldConverters[i].getDataField().getColumn()
+          .isIndexColumn()) {
+        // Skip the conversion for schema columns if the conversion is required only for index
+        // columns
+        continue;
+      }
       fieldConverters[i].convert(row, logHolder);
       if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
         badRecordLogger.addBadRecordsToBuilder(row.getRawData(), logHolder.getReason());
@@ -137,6 +150,7 @@ public class RowConverterImpl implements RowConverter {
         new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger,
             this.isConvertToBinary);
     List<FieldConverter> fieldConverterList = new ArrayList<>();
+    List<FieldConverter> nonSchemaFieldConverterList = new ArrayList<>();
     String nullFormat =
         configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
             .toString();
@@ -147,9 +161,15 @@ public class RowConverterImpl implements RowConverter {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], i, nullFormat, isEmptyBadRecord, isConvertToBinary,
               (String) configuration.getDataLoadProperty(
-                  CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER));
-      fieldConverterList.add(fieldConverter);
+                  CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER),
+              configuration);
+      if (fields[i].getColumn().isIndexColumn()) {
+        nonSchemaFieldConverterList.add(fieldConverter);
+      } else {
+        fieldConverterList.add(fieldConverter);
+      }
     }
+    fieldConverterList.addAll(nonSchemaFieldConverterList);
     converter.fieldConverters = fieldConverterList.toArray(new FieldConverter[0]);
     converter.logHolder = new BadRecordLogHolder();
     return converter;
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 81b393c..759aa83 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -167,6 +167,13 @@ public class CarbonLoadModel implements Serializable {
   private boolean isLoadWithoutConverterStep;
 
   /**
+   * Whether index columns are present. This flag should be set only when all the schema
+   * columns are already converted. Now, just need to generate and convert index columns present in
+   * data fields.
+   */
+  private boolean isIndexColumnsPresent;
+
+  /**
    * To identify the suitable input processor step for json file loading.
    */
   private boolean isJsonFileLoad;
@@ -857,4 +864,12 @@ public class CarbonLoadModel implements Serializable {
   public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) {
     this.outputFilesInfoHolder = outputFilesInfoHolder;
   }
+
+  public boolean isIndexColumnsPresent() {
+    return isIndexColumnsPresent;
+  }
+
+  public void setIndexColumnsPresent(boolean indexColumnsPresent) {
+    isIndexColumnsPresent = indexColumnsPresent;
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
index 94d9802..62f95ef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -71,6 +71,14 @@ public class RowParserImpl implements RowParser {
     inputMapping = new int[input.length];
     int k = 0;
     for (int i = 0; i < fields.length; i++) {
+      if (fields[i].getColumn().isIndexColumn()) {
+        // Index handler columns are non-schema fields. They are not present in the header. So set
+        // the input mapping as -1 for the field and continue
+        input[k] = fields[i];
+        inputMapping[k] = -1;
+        k++;
+        continue;
+      }
       for (int j = 0; j < numberOfColumns; j++) {
         if (header[j].equalsIgnoreCase(fields[i].getColumn().getColName())) {
           input[k] = fields[i];
@@ -96,6 +104,10 @@ public class RowParserImpl implements RowParser {
     }
     Object[] out = new Object[genericParsers.length];
     for (int i = 0; i < genericParsers.length; i++) {
+      if (inputMapping[i] == -1) {
+        // Skip non-schema columns. They are not present the input row
+        continue;
+      }
       Object obj = row[inputMapping[i]];
       out[outputMapping[i]] = genericParsers[i].parse(obj);
     }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 3a71845..6d9ef39 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
 import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory;
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
 import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
@@ -63,6 +64,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
   private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
 
+  private RowConverterImpl rowConverter;
+
   // cores used in SDK writer, set by the user
   private short sdkWriterCores;
 
@@ -82,7 +85,7 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
   public void initialize() throws IOException {
     super.initialize();
     // if logger is enabled then raw data will be required.
-    RowConverterImpl rowConverter =
+    rowConverter =
         new RowConverterImpl(configuration.getDataFields(), configuration, null);
     rowConverter.initialize();
     configuration.setCardinalityFinder(rowConverter);
@@ -140,7 +143,7 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       outIterators[i] =
           new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
               rowCounter, orderOfData, noDictionaryMapping, dataTypes, configuration,
-              dataFieldsWithComplexDataType);
+              dataFieldsWithComplexDataType, rowConverter);
     }
     return outIterators;
   }
@@ -198,10 +201,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
     private boolean isHivePartitionTable = false;
 
+    RowConverter converter;
+    CarbonDataLoadConfiguration configuration;
+
     public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
         boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
         DataType[] dataTypes, CarbonDataLoadConfiguration configuration,
-        Map<Integer, GenericDataType> dataFieldsWithComplexDataType) {
+        Map<Integer, GenericDataType> dataFieldsWithComplexDataType, RowConverter converter) {
       this.inputIterators = inputIterators;
       this.batchSize = batchSize;
       this.counter = 0;
@@ -217,6 +223,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       this.dataFieldsWithComplexDataType = dataFieldsWithComplexDataType;
       this.isHivePartitionTable =
           configuration.getTableSpec().getCarbonTable().isHivePartitionTable();
+      this.configuration = configuration;
+      this.converter = converter;
     }
 
     @Override
@@ -256,8 +264,12 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       int count = 0;
 
       while (internalHasNext() && count < batchSize) {
-        carbonRowBatch.addRow(
-            new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields)));
+        CarbonRow carbonRow =
+            new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields));
+        if (configuration.isIndexColumnsPresent()) {
+          carbonRow = converter.convert(carbonRow);
+        }
+        carbonRowBatch.addRow(carbonRow);
         count++;
       }
       rowCounter.getAndAdd(carbonRowBatch.getSize());
@@ -268,8 +280,11 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
     }
 
     private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) {
-      Object[] newData = new Object[data.length];
-      for (int i = 0; i < data.length; i++) {
+      Object[] newData = new Object[dataFields.length];
+      for (int i = 0; i < dataFields.length; i++) {
+        if (dataFields[i].getColumn().isIndexColumn()) {
+          continue;
+        }
         if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
           if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) {
             // keep the no dictionary measure column as original data
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index a930471..5383607 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -337,7 +337,10 @@ public final class CarbonDataProcessorUtil {
     List<CarbonDimension> dimensions =
         schema.getCarbonTable().getVisibleDimensions();
     for (CarbonDimension dimension : dimensions) {
-      columnNames.add(dimension.getColName());
+      if (!dimension.isIndexColumn()) {
+        // skip the non-schema index column
+        columnNames.add(dimension.getColName());
+      }
     }
     List<CarbonMeasure> measures = schema.getCarbonTable().getVisibleMeasures();
     for (CarbonMeasure msr : measures) {


Mime
View raw message