carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/2] carbondata git commit: [CARBONDATA-1856][PARTITION] Support insert/load data for partition table
Date Mon, 18 Dec 2017 16:30:49 GMT
[CARBONDATA-1856][PARTITION] Support insert/load data for partition table

Changed carbonrelation to HadoopFSRelation during load in case of the partition table.
Implement sparks Fileformat interface for carbon and use carbonoutputformat inside.
Create partition.map file inside each segment for mapping between partition and index file.

This closes #1654


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4430178c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4430178c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4430178c

Branch: refs/heads/master
Commit: 4430178c0f66f735f5d35ec733b0479d63b15fcb
Parents: 6e224dc
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Sun Dec 17 11:44:36 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Dec 19 00:30:28 2017 +0800

----------------------------------------------------------------------
 .../core/metadata/PartitionMapFileStore.java    | 198 ++++++++++++++
 .../ThriftWrapperSchemaConverterImpl.java       |   8 +-
 .../schema/partition/PartitionType.java         |   3 +-
 .../core/metadata/schema/table/CarbonTable.java |   6 +
 .../core/mutate/CarbonUpdateUtil.java           |  80 +-----
 .../core/util/path/CarbonTablePath.java         |  13 +-
 .../CarbonFormatDirectoryStructureTest.java     |   2 +-
 format/src/main/thrift/schema.thrift            |   1 +
 .../carbondata/hadoop/CarbonInputFormat.java    |   4 +-
 .../hadoop/api/CarbonOutputCommitter.java       |  10 +
 .../hadoop/api/CarbonTableInputFormat.java      |  15 +-
 .../hadoop/api/CarbonTableOutputFormat.java     |  57 ++--
 .../streaming/CarbonStreamRecordWriter.java     |   2 +-
 .../StandardPartitionTableLoadingTestCase.scala | 220 +++++++++++++++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   2 +-
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |   8 +-
 .../spark/util/GlobalDictionaryUtil.scala       |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +
 .../command/carbonTableSchemaCommon.scala       |  11 +-
 .../spark/util/CarbonReflectionUtils.scala      |  58 +++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   2 +-
 .../management/CarbonInsertIntoCommand.scala    |   9 +-
 .../management/CarbonLoadDataCommand.scala      | 267 +++++++++++++++++--
 .../table/CarbonCreateTableCommand.scala        |   8 +
 .../datasources/CarbonFileFormat.scala          | 246 +++++++++++++++++
 .../strategy/CarbonLateDecodeStrategy.scala     |   1 -
 .../sql/execution/strategy/DDLStrategy.scala    |   4 +-
 .../spark/sql/hive/CarbonAnalysisRules.scala    |  69 ++++-
 .../sql/hive/CarbonPreAggregateRules.scala      |  56 ----
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   4 +-
 .../iterator/CarbonOutputIteratorWrapper.java   |   2 +-
 .../merger/AbstractResultProcessor.java         |   6 +-
 .../partition/spliter/RowResultProcessor.java   |   2 +-
 .../store/CarbonDataFileAttributes.java         |   6 +-
 .../store/CarbonFactDataHandlerModel.java       |   2 +-
 37 files changed, 1165 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
new file mode 100644
index 0000000..4b58e75
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+
+/**
+ * Provide read and write support for partition mapping file in each segment
+ */
+public class PartitionMapFileStore {
+
+  /**
+   * Write partitionmapp file to the segment folder with indexfilename and corresponding partitions.
+   *
+   * @param segmentPath
+   * @param taskNo
+   * @param partionNames
+   * @throws IOException
+   */
+  public void writePartitionMapFile(String segmentPath, final String taskNo,
+      List<String> partionNames) throws IOException {
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+    // write partition info to new file.
+    if (carbonFile.exists() && partionNames.size() > 0) {
+      CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return file.getName().startsWith(taskNo) && file.getName()
+              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+        }
+      });
+      if (carbonFiles != null && carbonFiles.length > 0) {
+        PartitionMapper partitionMapper = new PartitionMapper();
+        Map<String, List<String>> partitionMap = new HashMap<>();
+        partitionMap.put(carbonFiles[0].getName(), partionNames);
+        partitionMapper.setPartitionMap(partitionMap);
+        String path = segmentPath + "/" + taskNo + CarbonTablePath.PARTITION_MAP_EXT;
+        writePartitionFile(partitionMapper, path);
+      }
+    }
+  }
+
+  private void writePartitionFile(PartitionMapper partitionMapper, String path) throws IOException {
+    AtomicFileOperations fileWrite =
+        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+    BufferedWriter brWriter = null;
+    DataOutputStream dataOutputStream = null;
+    Gson gsonObjectToWrite = new Gson();
+    try {
+      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      String metadataInstance = gsonObjectToWrite.toJson(partitionMapper);
+      brWriter.write(metadataInstance);
+    } finally {
+      if (null != brWriter) {
+        brWriter.flush();
+      }
+      CarbonUtil.closeStreams(brWriter);
+      fileWrite.close();
+    }
+  }
+
+  /**
+   * Merge all partition files in a segment to single file.
+   *
+   * @param segmentPath
+   * @throws IOException
+   */
+  public void mergePartitionMapFiles(String segmentPath) throws IOException {
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+    if (carbonFile.exists()) {
+      CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
+        }
+      });
+      if (carbonFiles != null && carbonFiles.length > 1) {
+        PartitionMapper partitionMapper = null;
+        for (CarbonFile file : carbonFiles) {
+          PartitionMapper localMapper = readPartitionMap(file.getAbsolutePath());
+          if (partitionMapper == null && localMapper != null) {
+            partitionMapper = localMapper;
+          }
+          if (localMapper != null) {
+            partitionMapper = partitionMapper.merge(localMapper);
+          }
+        }
+        if (partitionMapper != null) {
+          String path = segmentPath + "/" + "mergedpartitions" + CarbonTablePath.PARTITION_MAP_EXT;
+          writePartitionFile(partitionMapper, path);
+          for (CarbonFile file : carbonFiles) {
+            FileFactory.deleteAllCarbonFilesOfDir(file);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * This method reads the partition file
+   *
+   * @param partitionMapPath
+   * @return
+   */
+  public PartitionMapper readPartitionMap(String partitionMapPath) {
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    PartitionMapper partitionMapper;
+    AtomicFileOperations fileOperation =
+        new AtomicFileOperationsImpl(partitionMapPath, FileFactory.getFileType(partitionMapPath));
+
+    try {
+      if (!FileFactory.isFileExist(partitionMapPath, FileFactory.getFileType(partitionMapPath))) {
+        return null;
+      }
+      dataInputStream = fileOperation.openForRead();
+      inStream = new InputStreamReader(dataInputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      buffReader = new BufferedReader(inStream);
+      partitionMapper = gsonObjectToRead.fromJson(buffReader, PartitionMapper.class);
+    } catch (IOException e) {
+      return null;
+    } finally {
+      CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+    }
+
+    return partitionMapper;
+  }
+
+  public static class PartitionMapper implements Serializable {
+
+    private static final long serialVersionUID = 3582245668420401089L;
+
+    private Map<String, List<String>> partitionMap;
+
+    public PartitionMapper merge(PartitionMapper mapper) {
+      if (this == mapper) {
+        return this;
+      }
+      if (partitionMap != null && mapper.partitionMap != null) {
+        partitionMap.putAll(mapper.partitionMap);
+      }
+      if (partitionMap == null) {
+        partitionMap = mapper.partitionMap;
+      }
+      return this;
+    }
+
+    public Map<String, List<String>> getPartitionMap() {
+      return partitionMap;
+    }
+
+    public void setPartitionMap(Map<String, List<String>> partitionMap) {
+      this.partitionMap = partitionMap;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 1316a25..5d15bf8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -229,8 +229,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         return org.apache.carbondata.format.PartitionType.RANGE;
       case RANGE_INTERVAL:
         return org.apache.carbondata.format.PartitionType.RANGE_INTERVAL;
+      case NATIVE_HIVE:
+        return org.apache.carbondata.format.PartitionType.NATIVE_HIVE;
       default:
-        return org.apache.carbondata.format.PartitionType.HASH;
+        return org.apache.carbondata.format.PartitionType.NATIVE_HIVE;
     }
   }
 
@@ -559,8 +561,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         return PartitionType.RANGE;
       case RANGE_INTERVAL:
         return PartitionType.RANGE_INTERVAL;
+      case NATIVE_HIVE:
+        return PartitionType.NATIVE_HIVE;
       default:
-        return PartitionType.HASH;
+        return PartitionType.NATIVE_HIVE;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/PartitionType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/PartitionType.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/PartitionType.java
index 9a4cf4a..4013715 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/PartitionType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/PartitionType.java
@@ -23,5 +23,6 @@ public enum PartitionType {
   RANGE,
   RANGE_INTERVAL,
   LIST,
-  HASH
+  HASH,
+  NATIVE_HIVE
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 4ebc02d..7732a50 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
@@ -573,6 +574,11 @@ public class CarbonTable implements Serializable {
     return null != tablePartitionMap.get(getTableName());
   }
 
+  public boolean isHivePartitionTable() {
+    PartitionInfo partitionInfo = tablePartitionMap.get(getTableName());
+    return null != partitionInfo && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE;
+  }
+
   /**
    * @return absolute table identifier
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 0c9c47a..f4566ac 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -18,11 +18,8 @@
 package org.apache.carbondata.core.mutate;
 
 import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -382,7 +379,7 @@ public class CarbonUpdateUtil {
     return segmentName.split(CarbonCommonConstants.UNDERSCORE)[1];
   }
 
-  public static int getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) {
+  public static long getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) {
     String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
 
     // scan all the carbondata files and get the latest task ID.
@@ -397,11 +394,11 @@ public class CarbonUpdateUtil {
         return false;
       }
     });
-    int max = 0;
+    long max = 0;
     if (null != dataFiles) {
       for (CarbonFile file : dataFiles) {
-        int taskNumber =
-            Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()).split("_")[0]);
+        long taskNumber =
+            Long.parseLong(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()).split("_")[0]);
         if (taskNumber > max) {
           max = taskNumber;
         }
@@ -412,75 +409,6 @@ public class CarbonUpdateUtil {
 
   }
 
-  public static String getLatestBlockNameForSegment(String segmentId, CarbonTablePath tablePath) {
-    String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
-
-    // scan all the carbondata files and get the latest task ID.
-    CarbonFile segment =
-            FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
-
-    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        int max = 0;
-        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
-          int taskNumber = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()));
-          if (taskNumber >= max) {
-            return true;
-          }
-        }
-        return false;
-      }
-    });
-
-    // get the latest among the data files. highest task number will be at the last.
-    return dataFiles[dataFiles.length - 1].getName();
-  }
-
-  /**
-   * This method will convert a given timestamp to long value and then to string back
-   *
-   * @param factTimeStamp
-   * @return
-   */
-  public static String convertTimeStampToString(String factTimeStamp) {
-    SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    Date dateToStr = null;
-    try {
-      dateToStr = parser.parse(factTimeStamp);
-      return Long.toString(dateToStr.getTime());
-    } catch (ParseException e) {
-      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage());
-      return null;
-    }
-  }
-
-  /**
-   * This method will convert a given timestamp to long value and then to string back
-   *
-   * @param factTimeStamp
-   * @return
-   */
-  public static long convertTimeStampToLong(String factTimeStamp) {
-    SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
-    Date dateToStr = null;
-    try {
-      dateToStr = parser.parse(factTimeStamp);
-      return dateToStr.getTime();
-    } catch (ParseException e) {
-      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage());
-      parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-      try {
-        dateToStr = parser.parse(factTimeStamp);
-        return dateToStr.getTime();
-      } catch (ParseException e1) {
-        LOGGER
-            .error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e1.getMessage());
-        return 0;
-      }
-    }
-  }
-
-
   /**
    * Handling of the clean up of old carbondata files, index files , delte delta,
    * update status files.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 0f85b64..6be1f4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -48,6 +48,7 @@ public class CarbonTablePath extends Path {
 
   public static final String INDEX_FILE_EXT = ".carbonindex";
   public static final String MERGE_INDEX_FILE_EXT = ".carbonindexmerge";
+  public static final String PARTITION_MAP_EXT = ".partitionmap";
 
   private static final String STREAMING_DIR = ".streaming";
   private static final String STREAMING_LOG_DIR = "log";
@@ -250,7 +251,7 @@ public class CarbonTablePath extends Path {
    * @return absolute path of data file stored in carbon data format
    */
   public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo,
-      Integer taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) {
+      Long taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) {
     return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName(
         filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp);
   }
@@ -324,7 +325,7 @@ public class CarbonTablePath extends Path {
         return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber);
       default:
         String segmentDir = getSegmentDir(partitionId, segmentId);
-        return segmentDir + File.separator + getCarbonIndexFileName(Integer.parseInt(taskId),
+        return segmentDir + File.separator + getCarbonIndexFileName(Long.parseLong(taskId),
             Integer.parseInt(bucketNumber), batchNo, timeStamp);
     }
   }
@@ -353,7 +354,7 @@ public class CarbonTablePath extends Path {
    * @param factUpdateTimeStamp unique identifier to identify an update
    * @return gets data file name only with out path
    */
-  public static String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber,
+  public static String getCarbonDataFileName(Integer filePartNo, Long taskNo, int bucketNumber,
       int batchNo, String factUpdateTimeStamp) {
     return DATA_PART_PREFIX + filePartNo + "-" + taskNo + BATCH_PREFIX + batchNo + "-"
         + bucketNumber + "-" + factUpdateTimeStamp + CARBON_DATA_EXT;
@@ -366,7 +367,7 @@ public class CarbonTablePath extends Path {
    * @param factUpdatedTimeStamp time stamp
    * @return filename
    */
-  public static String getCarbonIndexFileName(int taskNo, int bucketNumber, int batchNo,
+  public static String getCarbonIndexFileName(long taskNo, int bucketNumber, int batchNo,
       String factUpdatedTimeStamp) {
     return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp
         + INDEX_FILE_EXT;
@@ -505,8 +506,8 @@ public class CarbonTablePath extends Path {
      * @param taskNo
      * @return
      */
-    public static int getTaskIdFromTaskNo(String taskNo) {
-      return Integer.parseInt(taskNo.split(BATCH_PREFIX)[0]);
+    public static long getTaskIdFromTaskNo(String taskNo) {
+      return Long.parseLong(taskNo.split(BATCH_PREFIX)[0]);
     }
 
     public static int getBatchNoFromTaskNo(String taskNo) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
index 936f87f..5549806 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
@@ -53,7 +53,7 @@ public class CarbonFormatDirectoryStructureTest {
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
     assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
-    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4,  0, 0, "999").replace("\\", "/")
+    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4L,  0, 0, "999").replace("\\", "/")
         .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4_batchno0-0-999.carbondata"));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index fc73cfb..a924009 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -63,6 +63,7 @@ enum PartitionType{
   RANGE_INTERVAL = 1;
   LIST = 2;
   HASH = 3;
+  NATIVE_HIVE = 4; // Uses the standard partition features of spark/hive
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 88d8341..1803a12 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -523,9 +523,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> entry :
             segmentIndexMap.entrySet()) {
           SegmentTaskIndexStore.TaskBucketHolder taskHolder = entry.getKey();
-          int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskHolder.taskNo);
+          long taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskHolder.taskNo);
           if (partitionInfo != null) {
-            partitionIndex = partitionIdList.indexOf(taskId);
+            partitionIndex = partitionIdList.indexOf((int)taskId);
           }
           // matchedPartitions variable will be null in two cases as follows
           // 1. the table is not a partition table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 9bcb2be..ec42adf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -21,8 +21,10 @@ import java.io.IOException;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.PartitionMapFileStore;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonLoaderUtil;
 
@@ -59,6 +61,10 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel);
   }
 
+  @Override public void setupTask(TaskAttemptContext context) throws IOException {
+    super.setupTask(context);
+  }
+
   /**
    * Update the tablestatus as success after job is success
    *
@@ -75,6 +81,10 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(),
         loadModel.getCarbonDataLoadSchema().getCarbonTable());
     CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
+    String segmentPath =
+        CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId());
+    // Merge all partition files into a single file.
+    new PartitionMapFileStore().mergePartitionMapFiles(segmentPath);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index c16b0aa..9e8e22d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -387,7 +388,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
 
     // prune partitions for filter query on partition table
     BitSet matchedPartitions = null;
-    if (partitionInfo != null) {
+    if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
       matchedPartitions = setMatchedPartitions(null, filter, partitionInfo, null);
       if (matchedPartitions != null) {
         if (matchedPartitions.cardinality() == 0) {
@@ -722,21 +723,21 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
     int partitionIndex = 0;
     List<Integer> partitionIdList = new ArrayList<>();
-    if (partitionInfo != null) {
+    if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
       partitionIdList = partitionInfo.getPartitionIds();
     }
     for (ExtendedBlocklet blocklet : prunedBlocklets) {
-      int partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
-          CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString()));
+      long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+          CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
 
       // OldPartitionIdList is only used in alter table partition command because it change
       // partition info first and then read data.
       // For other normal query should use newest partitionIdList
-      if (partitionInfo != null) {
+      if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
         if (oldPartitionIdList != null) {
-          partitionIndex = oldPartitionIdList.indexOf(partitionId);
+          partitionIndex = oldPartitionIdList.indexOf((int)partitionId);
         } else {
-          partitionIndex = partitionIdList.indexOf(partitionId);
+          partitionIndex = partitionIdList.indexOf((int)partitionId);
         }
       }
       if (partitionIndex != -1) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 9504502..f11cb35 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -19,6 +19,10 @@ package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -28,6 +32,7 @@ import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.processing.loading.DataLoadExecutor;
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
@@ -203,26 +208,27 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri
   public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter(
       TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
-    loadModel.setTaskNo(taskAttemptContext.getTaskAttemptID().getTaskID().getId() + "");
+    loadModel.setTaskNo(System.nanoTime() + "");
     final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
     final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper();
     final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
-    CarbonRecordWriter recordWriter = new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor);
-    new Thread() {
+    ExecutorService executorService = Executors.newFixedThreadPool(1,
+        new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));;
+    // It should be started in new thread as the underlying iterator uses blocking queue.
+    Future future = executorService.submit(new Thread() {
       @Override public void run() {
         try {
-          dataLoadExecutor.execute(
-              loadModel,
-              tempStoreLocations,
-              new CarbonIterator[] { iteratorWrapper });
+          dataLoadExecutor
+              .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper });
         } catch (Exception e) {
           dataLoadExecutor.close();
           throw new RuntimeException(e);
         }
       }
-    }.start();
+    });
 
-    return recordWriter;
+    return new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor, loadModel, future,
+        executorService);
   }
 
   public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException {
@@ -339,28 +345,47 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri
     model.setCsvHeaderColumns(columns);
   }
 
-  private static class CarbonRecordWriter extends RecordWriter<NullWritable, StringArrayWritable> {
+  public static class CarbonRecordWriter extends RecordWriter<NullWritable, StringArrayWritable> {
 
     private CarbonOutputIteratorWrapper iteratorWrapper;
 
     private DataLoadExecutor dataLoadExecutor;
 
+    private CarbonLoadModel loadModel;
+
+    private ExecutorService executorService;
+
+    private Future future;
+
     public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper,
-        DataLoadExecutor dataLoadExecutor) {
+        DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future,
+        ExecutorService executorService) {
       this.iteratorWrapper = iteratorWrapper;
       this.dataLoadExecutor = dataLoadExecutor;
+      this.loadModel = loadModel;
+      this.executorService = executorService;
+      this.future = future;
     }
 
-    @Override
-    public void write(NullWritable aVoid, StringArrayWritable strings)
+    @Override public void write(NullWritable aVoid, StringArrayWritable strings)
         throws InterruptedException {
       iteratorWrapper.write(strings.get());
     }
 
-    @Override
-    public void close(TaskAttemptContext taskAttemptContext) {
+    @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException {
       iteratorWrapper.close();
-      dataLoadExecutor.close();
+      try {
+        future.get();
+      } catch (ExecutionException e) {
+        throw new InterruptedException(e.getMessage());
+      } finally {
+        executorService.shutdownNow();
+        dataLoadExecutor.close();
+      }
+    }
+
+    public CarbonLoadModel getLoadModel() {
+      return loadModel;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index cb601d7..bad2f44 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -119,7 +119,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
     String segmentId = CarbonStreamOutputFormat.getSegmentId(hadoopConf);
     carbonLoadModel.setSegmentId(segmentId);
     carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
-    int taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
+    long taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
     carbonLoadModel.setTaskNo("" + taskNo);
     configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
     maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
new file mode 100644
index 0000000..11e95d8
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(
+      """
+        | CREATE TABLE originMultiLoads (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+  }
+
+  def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int): Unit = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+      carbonTable.getTablePath)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        return file.getName.endsWith(".partitionmap")
+      }
+    })
+    assert(dataFiles.length == partitions)
+  }
+
+  test("data loading for partition table for one partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionone (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_partitionone", "0", 1)
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone order by empno"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("data loading for partition table for two partition column") {
+    sql(
+      """
+        | CREATE TABLE partitiontwo (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (doj Timestamp, empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_partitiontwo", "0", 1)
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwo order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("data loading for partition table for three partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionthree (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_partitionthree", "0", 1)
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionthree order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+  }
+
+
+  test("multiple data loading for partition table for three partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionmultiplethree (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_partitionmultiplethree", "1", 1)
+    validateDataFiles("default_partitionmultiplethree", "2", 1)
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmultiplethree order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+  }
+
+  test("insert data for partition table for three partition column") {
+    sql(
+      """
+        | CREATE TABLE insertpartitionthree (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""insert into insertpartitionthree select empno,doj,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
+    sql(s"""insert into insertpartitionthree select empno,doj,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
+    sql(s"""insert into insertpartitionthree select empno,doj,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
+
+    validateDataFiles("default_insertpartitionthree", "0", 1)
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from insertpartitionthree order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+  }
+
+  test("data loading for partition table for one static partition column") {
+    sql(
+      """
+        | CREATE TABLE staticpartitionone (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""insert into staticpartitionone PARTITION(empno='1') select empname,designation,doj,workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary from originTable""")
+
+    validateDataFiles("default_staticpartitionone", "0", 1)
+  }
+
+  test("single pass loading for partition table for one partition column") {
+    sql(
+      """
+        | CREATE TABLE singlepasspartitionone (empname String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE singlepasspartitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='true')""")
+
+    validateDataFiles("default_singlepasspartitionone", "0", 1)
+  }
+
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists originMultiLoads")
+    sql("drop table if exists partitionone")
+    sql("drop table if exists partitiontwo")
+    sql("drop table if exists partitionthree")
+    sql("drop table if exists partitionmultiplethree")
+    sql("drop table if exists insertpartitionthree")
+    sql("drop table if exists staticpartitionone")
+    sql("drop table if exists singlepasspartitionone")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index b27521a..b5a9315 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -118,7 +118,7 @@ class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val tableSp
 }
 
 class SparkPartitionLoader(model: CarbonLoadModel,
-    splitIndex: Int,
+    splitIndex: Long,
     storePath: String,
     loadMetadataDetails: LoadMetadataDetails) {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index 4934cbc..b6dd6b8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -35,7 +35,7 @@ object UpdateDataLoad {
 
   def DataLoadForUpdate(
       segId: String,
-      index: Int,
+      index: Long,
       iter: Iterator[Row],
       carbonLoadModel: CarbonLoadModel,
       loadMetadataDetails: LoadMetadataDetails): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index ab47532..d8ce8f3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -186,7 +186,7 @@ object CommonUtil {
     val listInfo = tableProperties.get(CarbonCommonConstants.LIST_INFO)
 
     if (partitionType.isEmpty) {
-      isValid = false
+      isValid = true
     } else {
       partitionType.get.toUpperCase() match {
         case "HASH" => if (!numPartitions.isDefined
@@ -207,10 +207,12 @@ object CommonUtil {
             isValid &= validateTypeConvert(partitionerFields(0), _))
         }
         case "RANGE_INTERVAL" => isValid = false
-        case _ => isValid = false
+        case _ => isValid = true
       }
       // only support one partition column for now
-      if (partitionerFields.length > 1) isValid = false
+      if (partitionerFields.length > 1 && !partitionType.get.toUpperCase.equals("NATIVE_HIVE")) {
+        isValid = false
+      }
     }
     isValid
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 330582f..5f44e43 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -732,7 +732,7 @@ object GlobalDictionaryUtil {
           headers
         }
 
-        if (headers.length > headerOfInputData.length) {
+        if (headers.length > headerOfInputData.length && !carbonTable.isHivePartitionTable) {
           val msg = "The number of columns in the file header do not match the " +
                     "number of columns in the data file; Either delimiter " +
                     "or fileheader provided is not correct"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 1c0e58b..a39465f 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -419,6 +419,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         case "LIST" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.LIST)
           partitionInfo.setListInfo(listInfo.map(_.asJava).asJava)
           partitionInfo.initialize(listInfo.size + 1)
+        case _ => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.NATIVE_HIVE)
+          partitionInfo.setListInfo(listInfo.map(_.asJava).asJava)
       }
       Some(partitionInfo)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 5b93907..089b60e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -642,11 +642,14 @@ class TableNewProcessor(cm: TableModel) {
     }
     if (cm.partitionInfo.isDefined) {
       val partitionInfo = cm.partitionInfo.get
-      val PartitionColumnSchema = partitionInfo.getColumnSchemaList.asScala
+      val partitionColumnSchema = partitionInfo.getColumnSchemaList.asScala
       val partitionCols = allColumns.filter { column =>
-        PartitionColumnSchema.exists(_.getColumnName.equalsIgnoreCase(column.getColumnName))
-      }.asJava
-      partitionInfo.setColumnSchemaList(partitionCols)
+        partitionColumnSchema.exists(_.getColumnName.equalsIgnoreCase(column.getColumnName))
+      }
+      val orderCols =
+        partitionColumnSchema.map(
+          f => partitionCols.find(_.getColumnName.equalsIgnoreCase(f.getColumnName)).get).asJava
+      partitionInfo.setColumnSchemaList(orderCols)
       tableSchema.setPartitionInfo(partitionInfo)
     }
     tableSchema.setTableName(cm.tableName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 6cc7ae1..ba51077 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -25,9 +25,12 @@ import org.apache.spark.SparkContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.parser.AstBuilder
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.sources.BaseRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -94,6 +97,57 @@ object CarbonReflectionUtils {
     }
   }
 
+  def getInsertIntoCommand(table: LogicalPlan,
+      partition: Map[String, Option[String]],
+      query: LogicalPlan,
+      overwrite: Boolean,
+      ifPartitionNotExists: Boolean): InsertIntoTable = {
+    val className = "org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable"
+    if (SPARK_VERSION.startsWith("2.1")) {
+      val overwriteOptions = createObject(
+        "org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions",
+        overwrite.asInstanceOf[Object], Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object]
+      createObject(
+        className,
+        table,
+        partition,
+        query,
+        overwriteOptions,
+        ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
+    } else if (SPARK_VERSION.startsWith("2.2")) {
+      createObject(
+        className,
+        table,
+        partition,
+        query,
+        overwrite.asInstanceOf[Object],
+        ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
+    } else {
+      throw new UnsupportedOperationException("Unsupported Spark version")
+    }
+  }
+
+  def getLogicalRelation(relation: BaseRelation,
+      expectedOutputAttributes: Seq[Attribute],
+      catalogTable: Option[CatalogTable]): LogicalRelation = {
+    val className = "org.apache.spark.sql.execution.datasources.LogicalRelation"
+    if (SPARK_VERSION.startsWith("2.1")) {
+      createObject(
+        className,
+        relation,
+        Some(expectedOutputAttributes),
+        catalogTable)._1.asInstanceOf[LogicalRelation]
+    } else if (SPARK_VERSION.startsWith("2.2")) {
+      createObject(
+        className,
+        relation,
+        expectedOutputAttributes,
+        catalogTable)._1.asInstanceOf[LogicalRelation]
+    } else {
+      throw new UnsupportedOperationException("Unsupported Spark version")
+    }
+  }
+
 
   def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
     var overwriteboolean: Boolean = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index d6360a7..29712de 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -623,7 +623,7 @@ object CarbonDataRDDFactory {
       carbonLoadModel: CarbonLoadModel,
       updateModel: Option[UpdateTableModel],
       key: String,
-      taskNo: Int,
+      taskNo: Long,
       iter: Iterator[Row]
   ): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     val rddResult = new updateResultImpl()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index e5de052..19b96a8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -98,7 +98,7 @@ case class CarbonDatasourceHadoopRelation(
         CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS)
     }
     if (data.logicalPlan.output.size >= carbonRelation.output.size) {
-      CarbonInsertIntoCommand(this, data.logicalPlan, overwrite).run(sparkSession)
+      CarbonInsertIntoCommand(this, data.logicalPlan, overwrite, Map.empty).run(sparkSession)
     } else {
       CarbonException.analysisException(
         "Cannot insert into target table because number of columns mismatch")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index ef16955..5142ef2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -26,7 +26,8 @@ import org.apache.carbondata.spark.util.CarbonSparkUtil
 case class CarbonInsertIntoCommand(
     relation: CarbonDatasourceHadoopRelation,
     child: LogicalPlan,
-    overwrite: Boolean)
+    overwrite: Boolean,
+    partition: Map[String, Option[String]])
   extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
@@ -40,7 +41,11 @@ case class CarbonInsertIntoCommand(
       scala.collection.immutable.Map("fileheader" -> header),
       overwrite,
       null,
-      Some(df)).run(sparkSession)
+      Some(df),
+      None,
+      None,
+      Map.empty,
+      partition).run(sparkSession)
     // updating relation metadata. This is in case of auto detect high cardinality
     relation.carbonRelation.metaData =
       CarbonSparkUtil.createSparkMeta(relation.carbonRelation.carbonTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 2afd040..c94f698 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -17,17 +17,31 @@
 
 package org.apache.spark.sql.execution.command.management
 
+import java.text.SimpleDateFormat
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.NewHadoopRDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
+import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
+import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.types.StructField
-import org.apache.spark.util.{CausedBy, FileUtils}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
@@ -45,6 +59,7 @@ import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreEx
 import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -65,7 +80,8 @@ case class CarbonLoadDataCommand(
     dataFrame: Option[DataFrame] = None,
     updateModel: Option[UpdateTableModel] = None,
     var tableInfoOp: Option[TableInfo] = None,
-    internalOptions: Map[String, String] = Map.empty) extends DataCommand {
+    internalOptions: Map[String, String] = Map.empty,
+    partition: Map[String, Option[String]] = Map.empty) extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -156,7 +172,7 @@ case class CarbonLoadDataCommand(
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
         // add the start entry for the new load in the table status file
-        if (updateModel.isEmpty) {
+        if (updateModel.isEmpty && !table.isHivePartitionTable) {
           CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
         }
         if (isOverwriteTable) {
@@ -217,9 +233,9 @@ case class CarbonLoadDataCommand(
           LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
           throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
         case ex: Exception =>
+          LOGGER.error(ex)
           // update the load entry in table status file for changing the status to marked for delete
           CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
-          LOGGER.error(ex)
           LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
           throw ex
       } finally {
@@ -344,16 +360,35 @@ case class CarbonLoadDataCommand(
     } else {
       dataFrame
     }
-    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
-      carbonLoadModel,
-      columnar,
-      partitionStatus,
-      server,
-      isOverwriteTable,
-      hadoopConf,
-      loadDataFrame,
-      updateModel,
-      operationContext)
+
+    if (carbonTable.isHivePartitionTable) {
+      try {
+        loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
+      } finally {
+        server match {
+          case Some(dictServer) =>
+            try {
+              dictServer.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+                .getCarbonTableIdentifier.getTableId)
+            } catch {
+              case _: Exception =>
+                throw new Exception("Dataload failed due to error while writing dictionary file!")
+            }
+          case _ =>
+        }
+      }
+    } else {
+      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+        carbonLoadModel,
+        columnar,
+        partitionStatus,
+        server,
+        isOverwriteTable,
+        hadoopConf,
+        loadDataFrame,
+        updateModel,
+        operationContext)
+    }
   }
 
   private def loadData(
@@ -373,23 +408,203 @@ case class CarbonLoadDataCommand(
     } else {
       (dataFrame, dataFrame)
     }
-    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
+    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    if (!table.isChildDataMap) {
       GlobalDictionaryUtil.generateGlobalDictionary(
         sparkSession.sqlContext,
         carbonLoadModel,
         hadoopConf,
         dictionaryDataFrame)
     }
-    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
-      carbonLoadModel,
-      columnar,
-      partitionStatus,
-      None,
-      isOverwriteTable,
-      hadoopConf,
-      loadDataFrame,
-      updateModel,
-      operationContext)
+    if (table.isHivePartitionTable) {
+      loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
+    } else {
+      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+        carbonLoadModel,
+        columnar,
+        partitionStatus,
+        None,
+        isOverwriteTable,
+        hadoopConf,
+        loadDataFrame,
+        updateModel,
+        operationContext)
+    }
+  }
+
+  private def loadStandardPartition(sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      dataFrame: Option[DataFrame]) = {
+    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val logicalPlan =
+      sparkSession.sessionState.catalog.lookupRelation(
+        TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
+    val relation = logicalPlan.collect {
+      case l: LogicalRelation => l
+      case c: CatalogRelation => c
+    }.head.asInstanceOf[LogicalPlan]
+
+    val query: LogicalPlan = if (dataFrame.isDefined) {
+      val timeStampformatString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+        .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+      val dateFormat = new SimpleDateFormat(dateFormatString)
+      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+      val serializationNullFormat =
+      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+      val attributes =
+        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
+      val len = attributes.length
+      val rdd = dataFrame.get.rdd.map { f =>
+        val data = new Array[Any](len)
+        var i = 0
+        while (i < len) {
+          data(i) =
+            UTF8String.fromString(
+              CarbonScalaUtil.getString(f.get(i),
+                serializationNullFormat,
+                delimiterLevel1,
+                delimiterLevel2,
+                timeStampFormat,
+                dateFormat))
+          i = i + 1
+        }
+        InternalRow.fromSeq(data)
+      }
+      LogicalRDD(attributes, rdd)(sparkSession)
+
+    } else {
+      // input data from csv files. Convert to logical plan
+      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
+      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
+      val jobConf = new JobConf(hadoopConf)
+      SparkHadoopUtil.get.addCredentials(jobConf)
+      val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
+        sparkSession.sparkContext,
+        classOf[CSVInputFormat],
+        classOf[NullWritable],
+        classOf[StringArrayWritable],
+        jobConf
+      ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString)))
+
+      val attributes =
+        StructType(carbonLoadModel.getCsvHeaderColumns.map(
+          StructField(_, StringType))).toAttributes
+      // Only select the required columns
+      Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
+        LogicalRDD(attributes, rdd)(sparkSession))
+    }
+    val convertRelation = relation match {
+      case l: LogicalRelation =>
+        convertToLogicalRelation(l, isOverwriteTable, carbonLoadModel, sparkSession)
+      case c: CatalogRelation =>
+        convertToLogicalRelation(c, isOverwriteTable, carbonLoadModel, sparkSession)
+    }
+    Dataset.ofRows(
+      sparkSession,
+      CarbonReflectionUtils.getInsertIntoCommand(
+        convertRelation,
+        partition,
+        query,
+        isOverwriteTable,
+        false))
+  }
+
+  private def convertToLogicalRelation(
+      relation: LogicalRelation,
+      overWrite: Boolean,
+      loadModel: CarbonLoadModel,
+      sparkSession: SparkSession): LogicalRelation = {
+    val catalogTable = relation.catalogTable.get
+    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
+    val metastoreSchema = StructType(StructType.fromAttributes(
+      relation.output).fields.map(_.copy(dataType = StringType)))
+    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
+    val catalog = new CatalogFileIndex(
+      sparkSession, catalogTable, relation.relation.sizeInBytes)
+    if (lazyPruningEnabled) {
+      catalog
+    } else {
+      catalog.filterPartitions(Nil) // materialize all the partitions in memory
+    }
+    val partitionSchema =
+      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f =>
+      metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
+
+
+    val dataSchema =
+      StructType(metastoreSchema
+        .filterNot(field => partitionSchema.contains(field.name)))
+    val options = new mutable.HashMap[String, String]()
+    options ++= catalogTable.storage.properties
+    options += (("overwrite", overWrite.toString))
+    options += (("onepass", loadModel.getUseOnePass.toString))
+    options += (("dicthost", loadModel.getDictionaryServerHost))
+    options += (("dictport", loadModel.getDictionaryServerPort.toString))
+    val hdfsRelation = HadoopFsRelation(
+      location = catalog,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      bucketSpec = catalogTable.bucketSpec,
+      fileFormat = new CarbonFileFormat,
+      options = options.toMap)(sparkSession = sparkSession)
+
+    CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
+      hdfsRelation.schema.toAttributes,
+      Some(catalogTable))
+  }
+
+  private def convertToLogicalRelation(
+      relation: CatalogRelation,
+      overWrite: Boolean,
+      loadModel: CarbonLoadModel,
+      sparkSession: SparkSession): LogicalRelation = {
+    val catalogTable =
+      CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", relation).asInstanceOf[CatalogTable]
+    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
+    val metastoreSchema = StructType(StructType.fromAttributes(
+      relation.output).fields.map(_.copy(dataType = StringType)))
+    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
+    // TODO nedd to find a way to avoid double lookup
+    val sizeInBytes =
+      CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+        catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
+    val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
+    if (lazyPruningEnabled) {
+      catalog
+    } else {
+      catalog.filterPartitions(Nil) // materialize all the partitions in memory
+    }
+    val partitionSchema =
+      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f =>
+        metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
+
+
+    val dataSchema =
+      StructType(metastoreSchema
+        .filterNot(field => partitionSchema.contains(field.name)))
+    val options = new mutable.HashMap[String, String]()
+    options ++= catalogTable.storage.properties
+    options += (("overwrite", overWrite.toString))
+    options += (("onepass", loadModel.getUseOnePass.toString))
+    options += (("dicthost", loadModel.getDictionaryServerHost))
+    options += (("dictport", loadModel.getDictionaryServerPort.toString))
+    val hdfsRelation = HadoopFsRelation(
+      location = catalog,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      bucketSpec = catalogTable.bucketSpec,
+      fileFormat = new CarbonFileFormat,
+      options = options.toMap)(sparkSession = sparkSession)
+
+    CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
+      hdfsRelation.schema.toAttributes,
+      Some(catalogTable))
   }
 
   def getDataFrameWithTupleID(): DataFrame = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 314e551..a42d974 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -93,6 +93,13 @@ case class CarbonCreateTableCommand(
           val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath)
           val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
           sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
+          val partitionInfo = tableInfo.getFactTable.getPartitionInfo
+          val partitionString = if (partitionInfo != null) {
+            s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map(
+              _.getColumnName).mkString(",")})"
+          } else {
+            ""
+          }
           sparkSession.sql(
             s"""CREATE TABLE $dbName.$tableName
                |(${ rawSchema })
@@ -103,6 +110,7 @@ case class CarbonCreateTableCommand(
                |  tablePath "$tablePath",
                |  path "$tablePath"
                |  $carbonSchemaString)
+               |  $partitionString
              """.stripMargin)
         } catch {
           case e: AnalysisException => throw e


Mime
View raw message