carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [33/54] [abbrv] carbondata git commit: [CARBONDATA-1114][Tests] Fix bugs in tests in windows env
Date Thu, 08 Mar 2018 16:55:34 GMT
[CARBONDATA-1114][Tests] Fix bugs in tests in windows env

Fix bugs in tests that will cause failure under windows env

This closes #1994


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/859d71c1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/859d71c1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/859d71c1

Branch: refs/heads/master
Commit: 859d71c1737b6287c4f1a06f7cd9055d32ff8a99
Parents: d5396b1
Author: xuchuanyin <xuchuanyin@hust.edu.cn>
Authored: Sat Feb 24 21:18:17 2018 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Mar 8 22:21:11 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/dev/DataMap.java    |   6 -
 .../core/datamap/dev/DataMapFactory.java        |   2 +-
 .../exception/ConcurrentOperationException.java |  16 +-
 .../carbondata/core/locks/LocalFileLock.java    |  30 ++--
 .../core/metadata/PartitionMapFileStore.java    |   0
 .../statusmanager/SegmentStatusManager.java     |  10 +-
 .../SegmentUpdateStatusManager.java             |   1 -
 .../store/impl/DFSFileReaderImplUnitTest.java   |  11 +-
 .../store/impl/FileFactoryImplUnitTest.java     |  28 +++-
 .../filesystem/HDFSCarbonFileTest.java          |   3 +-
 .../filesystem/LocalCarbonFileTest.java         |  20 ++-
 datamap/examples/pom.xml                        | 145 +++++++----------
 .../datamap/examples/MinMaxDataWriter.java      |   1 -
 examples/flink/pom.xml                          |   4 +-
 .../carbondata/examples/FlinkExample.scala      |  10 +-
 .../CarbonStreamSparkStreamingExample.scala     |   1 -
 .../hadoop/api/CarbonTableInputFormat.java      |   5 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |   2 +-
 .../StandardPartitionGlobalSortTestCase.scala   |   2 +-
 .../exception/ProcessMetaDataException.java     |   2 +
 .../org/apache/carbondata/api/CarbonStore.scala |   6 +-
 .../carbondata/spark/load/CsvRDDHelper.scala    | 157 +++++++++++++++++++
 .../load/DataLoadProcessBuilderOnSpark.scala    |   3 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |   2 -
 .../command/carbonTableSchemaCommon.scala       |   6 +-
 .../CarbonAlterTableCompactionCommand.scala     |   3 +-
 .../management/CarbonCleanFilesCommand.scala    |   2 +-
 .../CarbonDeleteLoadByIdCommand.scala           |   2 +-
 .../CarbonDeleteLoadByLoadDateCommand.scala     |   2 +-
 .../management/CarbonLoadDataCommand.scala      |  28 ++--
 .../CarbonProjectForDeleteCommand.scala         |   2 +-
 .../CarbonProjectForUpdateCommand.scala         |   2 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |   2 +-
 .../command/table/CarbonDropTableCommand.scala  |   2 +-
 .../datasources/CarbonFileFormat.scala          |   3 -
 .../BooleanDataTypesInsertTest.scala            |   5 +-
 .../vectorreader/AddColumnTestCases.scala       |   1 +
 .../datamap/DataMapWriterListener.java          |   3 +-
 .../loading/model/CarbonLoadModelBuilder.java   |  34 +++-
 .../processing/loading/model/LoadOption.java    |  15 +-
 .../processing/merger/CarbonDataMergerUtil.java |   3 +-
 .../util/CarbonDataProcessorUtil.java           |   3 +-
 .../processing/util/CarbonLoaderUtil.java       |   8 +
 .../carbondata/lcm/locks/LocalFileLockTest.java |   2 +-
 .../loading/csvinput/CSVInputFormatTest.java    |   1 +
 store/sdk/pom.xml                               |   2 +-
 .../carbondata/sdk/file/CSVCarbonWriter.java    |   8 +-
 48 files changed, 400 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 02db8af..dd5507c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -38,9 +38,6 @@ public interface DataMap<T extends Blocklet> {
   /**
    * Prune the datamap with filter expression and partition information. It returns the list of
    * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
    */
   List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
       List<PartitionSpec> partitions);
@@ -48,9 +45,6 @@ public interface DataMap<T extends Blocklet> {
   // TODO Move this method to Abstract class
   /**
    * Validate whether the current segment needs to be fetching the required data
-   *
-   * @param filterExp
-   * @return
    */
   boolean isScanRequired(FilterResolverIntf filterExp);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 50ac279..d8a467f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,8 +21,8 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.events.Event;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
index 7e717ba..918268c 100644
--- a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
+++ b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
@@ -17,21 +17,10 @@
 
 package org.apache.carbondata.core.exception;
 
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
-/**
- * This exception will be thrown when executing concurrent operations which
- * is not supported in carbon.
- *
- * For example, when INSERT OVERWRITE is executing, other operations are not
- * allowed, so this exception will be thrown
- */
-@InterfaceAudience.User
-@InterfaceStability.Stable
-public class ConcurrentOperationException extends Exception {
+public class ConcurrentOperationException extends MalformedCarbonCommandException {
 
   public ConcurrentOperationException(String dbName, String tableName, String command1,
       String command2) {
@@ -48,3 +37,4 @@ public class ConcurrentOperationException extends Exception {
   }
 
 }
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 75ea074..cb80877 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -17,17 +17,20 @@
 
 package org.apache.carbondata.core.locks;
 
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * This class handles the file locking in the local file system.
@@ -40,11 +43,6 @@ public class LocalFileLock extends AbstractCarbonLock {
   private String location;
 
   /**
-   * fileOutputStream of the local lock file
-   */
-  private FileOutputStream fileOutputStream;
-
-  /**
    * channel is the FileChannel of the lock file.
    */
   private FileChannel channel;
@@ -104,8 +102,8 @@ public class LocalFileLock extends AbstractCarbonLock {
         FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location));
       }
 
-      fileOutputStream = new FileOutputStream(lockFilePath);
-      channel = fileOutputStream.getChannel();
+      channel = FileChannel.open(Paths.get(lockFilePath), StandardOpenOption.WRITE,
+          StandardOpenOption.APPEND);
       try {
         fileLock = channel.tryLock();
       } catch (OverlappingFileLockException e) {
@@ -137,11 +135,17 @@ public class LocalFileLock extends AbstractCarbonLock {
     } catch (IOException e) {
       status = false;
     } finally {
-      if (null != fileOutputStream) {
-        try {
-          fileOutputStream.close();
-        } catch (IOException e) {
-          LOGGER.error(e.getMessage());
+      CarbonUtil.closeStreams(channel);
+
+      // deleting the lock file after releasing the lock.
+      if (null != lockFilePath) {
+        CarbonFile lockFile = FileFactory.getCarbonFile(lockFilePath,
+            FileFactory.getFileType(lockFilePath));
+        if (!lockFile.exists() || lockFile.delete()) {
+          LOGGER.info("Successfully deleted the lock file " + lockFilePath);
+        } else {
+          LOGGER.error("Not able to delete the lock file " + lockFilePath);
+          status = false;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index b3b1240..d76158e 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
@@ -837,6 +838,13 @@ public class SegmentStatusManager {
   public static void deleteLoadsAndUpdateMetadata(
       CarbonTable carbonTable,
       boolean isForceDeletion) throws IOException {
+    deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null);
+  }
+
+  public static void deleteLoadsAndUpdateMetadata(
+      CarbonTable carbonTable,
+      boolean isForceDeletion,
+      List<PartitionSpec> partitionSpecs) throws IOException {
     if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
       LoadMetadataDetails[] details =
           SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
@@ -881,7 +889,7 @@ public class SegmentStatusManager {
           CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
           if (updationCompletionStatus) {
             DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
-                identifier, carbonTable.getMetadataPath(), isForceDeletion);
+                identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 39eb262..a21873d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -44,7 +44,6 @@ import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.TupleIdEnum;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
index da61a94..30144c1 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
@@ -38,7 +38,6 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
 public class DFSFileReaderImplUnitTest {
-
   private static DFSFileReaderImpl dfsFileHolder;
   private static String fileName;
   private static String fileNameWithEmptyContent;
@@ -50,10 +49,8 @@ public class DFSFileReaderImplUnitTest {
     file = new File("Test.carbondata");
     fileWithEmptyContent = new File("TestEXception.carbondata");
 
-    if (!file.exists()) try {
-      file.createNewFile();
-    } catch (IOException e) {
-      e.printStackTrace();
+    if (file.exists()) {
+      file.delete();
     }
     if (!fileWithEmptyContent.exists()) try {
       fileWithEmptyContent.createNewFile();
@@ -61,10 +58,12 @@ public class DFSFileReaderImplUnitTest {
       e.printStackTrace();
     }
     try {
-      FileOutputStream of = new FileOutputStream(file, true);
+      FileOutputStream of = new FileOutputStream(file, false);
       BufferedWriter br = new BufferedWriter(new OutputStreamWriter(of, "UTF-8"));
       br.write("Hello World");
       br.close();
+      of.flush();
+      of.close();
     } catch (Exception e) {
       e.getMessage();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
index 65590d6..0e7d1c9 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.carbon.datastorage.filesystem.store.impl;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -43,6 +44,10 @@ public class FileFactoryImplUnitTest {
 
   @AfterClass
   public static void tearDown() {
+    cleanUp();
+  }
+
+  private static void cleanUp() {
     File file = new File(filePath);
     if (file.exists()) {
       file.delete();
@@ -83,17 +88,17 @@ public class FileFactoryImplUnitTest {
   }
 
   @Test public void testCreateNewFileWithDefaultFileType() throws IOException {
-    tearDown();
+    cleanUp();
     assertTrue(FileFactory.createNewFile(filePath, FileFactory.FileType.LOCAL));
   }
 
   @Test public void testCreateNewLockFileWithDefaultFileType() throws IOException {
-    tearDown();
+    cleanUp();
     assertTrue(FileFactory.createNewLockFile(filePath, FileFactory.FileType.LOCAL));
   }
 
   @Test public void testCreateNewLockFileWithViewFsFileType() throws IOException {
-    tearDown();
+    cleanUp();
     assertTrue(FileFactory.createNewLockFile(filePath, FileFactory.FileType.VIEWFS));
   }
 
@@ -129,20 +134,29 @@ public class FileFactoryImplUnitTest {
     assertTrue(FileFactory.mkdirs(filePath, FileFactory.FileType.VIEWFS));
   }
 
-  @Test public void testGetDataOutputStreamUsingAppendeForException() {
+  @Test public void testGetDataOutputStreamUsingAppendeForException() throws IOException {
+    DataOutputStream outputStream = null;
     try {
-      FileFactory.getDataOutputStreamUsingAppend(filePath, FileFactory.FileType.VIEWFS);
+      outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, FileFactory.FileType.VIEWFS);
     } catch (Exception exception) {
       assertEquals("Not supported", exception.getMessage());
+    } finally {
+      if (null != outputStream) {
+        outputStream.close();
+      }
     }
   }
 
   @Test public void getDataOutputStreamForVIEWFSType() throws IOException {
-    assertNotNull(FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS));
+    DataOutputStream outputStream = FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS);
+    assertNotNull(outputStream);
+    outputStream.close();
   }
 
   @Test public void getDataOutputStreamForLocalType() throws IOException {
-    assertNotNull(FileFactory.getDataOutputStream(filePath, FileFactory.FileType.LOCAL));
+    DataOutputStream outputStream = FileFactory.getDataOutputStream(filePath, FileFactory.FileType.LOCAL);
+    assertNotNull(outputStream);
+    outputStream.close();
   }
 
   @Test public void testGetCarbonFile() throws IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
index 4018123..42d4afa 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
@@ -58,7 +58,7 @@ public class HDFSCarbonFileTest {
     @BeforeClass
     static public void setUp() throws IOException {
         Configuration config = new Configuration();
-//adding local hadoop configuration
+        // adding local hadoop configuration
         config.addResource(new Path("core-site.xml"));
         config.addResource(new Path("hdfs-site.xml"));
         fileName = "Test.carbondata"; //this path is HDFS path
@@ -75,6 +75,7 @@ public class HDFSCarbonFileTest {
             BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));
             br.write("Hello World");
             br.close();
+            os.close();
             fs.close();
 
             fileStatus = new FileStatus(12L, true, 60, 120l, 180L, new Path(fileName));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java
index 96ef106..14f9fe2 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java
@@ -29,6 +29,7 @@ import sun.nio.ch.FileChannelImpl;
 import java.io.*;
 import java.nio.channels.ReadableByteChannel;
 import java.util.Objects;
+import java.util.UUID;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -44,8 +45,8 @@ public class LocalCarbonFileTest {
 
     @BeforeClass
     static public void setUp() {
-        file = new File("Test.carbondata");
-        dir = new File("Testdir.carbondata");
+        file = new File("TestLocalCarbonFile");
+        dir = new File("TestLocalCarbonDir");
         if (!file.exists())
             try {
                 file.createNewFile();
@@ -60,6 +61,7 @@ public class LocalCarbonFileTest {
             byte[] bytes = "core java api".getBytes();
 
             oFile.write(bytes);
+            oFile.close();
         } catch (FileNotFoundException e) {
             e.printStackTrace();
             localCarbonFile = new LocalCarbonFile(file);
@@ -121,8 +123,9 @@ public class LocalCarbonFileTest {
     @Test
     public void testRenameForce() {
         localCarbonFile = new LocalCarbonFile(file);
-        assertTrue(localCarbonFile.renameForce("Testdb.carbon"));
-        File file1 = new File("Testdb.carbon");
+        String destFile = "TestRename" + UUID.randomUUID().toString();
+        assertTrue(localCarbonFile.renameForce(destFile));
+        File file1 = new File(destFile);
         if (file1.exists()) {
             file1.delete();
         }
@@ -131,7 +134,12 @@ public class LocalCarbonFileTest {
     @Test
     public void testRenameTo() {
         localCarbonFile = new LocalCarbonFile(file);
-        assertTrue(!localCarbonFile.renameTo("Testdb.carbon"));
+        String destFile = "TestRename" + UUID.randomUUID().toString();
+        assertTrue(!localCarbonFile.renameTo(destFile));
+        File file1 = new File(destFile);
+        if (file1.exists()) {
+            file1.delete();
+        }
     }
 
     @Test
@@ -463,6 +471,6 @@ public class LocalCarbonFileTest {
 
         localCarbonFile = new LocalCarbonFile("demo.txt");
 
-        assertEquals(localCarbonFile.renameForce("Test.carbondata"), true);
+        assertEquals(localCarbonFile.renameForce("renameToFile"), true);
     }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/datamap/examples/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/examples/pom.xml b/datamap/examples/pom.xml
index 0049950..8539a86 100644
--- a/datamap/examples/pom.xml
+++ b/datamap/examples/pom.xml
@@ -15,97 +15,70 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
-    <modelVersion>4.0.0</modelVersion>
+  <modelVersion>4.0.0</modelVersion>
 
-    <parent>
-        <groupId>org.apache.carbondata</groupId>
-        <artifactId>carbondata-parent</artifactId>
-        <version>1.4.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
+  <parent>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-parent</artifactId>
+      <version>1.4.0-SNAPSHOT</version>
+      <relativePath>../../pom.xml</relativePath>
+  </parent>
 
-    <artifactId>carbondata-datamap-examples</artifactId>
-    <name>Apache CarbonData :: Datamap Examples</name>
+  <artifactId>carbondata-datamap-examples</artifactId>
+  <name>Apache CarbonData :: DataMap Examples</name>
 
-    <properties>
-        <dev.path>${basedir}/../../dev</dev.path>
-    </properties>
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+  </properties>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.carbondata</groupId>
-            <artifactId>carbondata-spark2</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
-                    <artifactId>spark-hive-thriftserver_2.10</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
-                    <artifactId>spark-repl_2.10</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
-                    <artifactId>spark-sql_2.10</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_${scala.binary.version}</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-repl_${scala.binary.version}</artifactId>
-        </dependency>
-    </dependencies>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark2</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
 
-    <build>
-        <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory>
-        <resources>
-            <resource>
-                <directory>.</directory>
-                <includes>
-                    <include>CARBON_EXAMPLESLogResource.properties</include>
-                </includes>
-            </resource>
-        </resources>
-        <plugins>
-            <plugin>
-                <groupId>org.scala-tools</groupId>
-                <artifactId>maven-scala-plugin</artifactId>
-                <version>2.15.2</version>
-                <executions>
-                    <execution>
-                        <id>compile</id>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                        <phase>compile</phase>
-                    </execution>
-                    <execution>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>1.7</source>
-                    <target>1.7</target>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
+  <build>
+    <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory>
+    <resources>
+      <resource>
+        <directory>.</directory>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index 5046182..17c8332 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/examples/flink/pom.xml
----------------------------------------------------------------------
diff --git a/examples/flink/pom.xml b/examples/flink/pom.xml
index 75913cf..b783435 100644
--- a/examples/flink/pom.xml
+++ b/examples/flink/pom.xml
@@ -52,12 +52,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-spark</artifactId>
+      <artifactId>carbondata-spark2</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-examples-spark</artifactId>
+      <artifactId>carbondata-examples-spark2</artifactId>
       <version>${project.version}</version>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
----------------------------------------------------------------------
diff --git a/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala b/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
index 9ce95ae..239a038 100644
--- a/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
+++ b/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
@@ -21,8 +21,8 @@ import org.apache.flink.api.java.ExecutionEnvironment
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.Job
 
-import org.apache.carbondata.examples.util.ExampleUtils
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonProjection}
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 
 // Write carbondata file by spark and read it by flink
 // scalastyle:off println
@@ -30,7 +30,7 @@ object FlinkExample {
 
   def main(args: Array[String]): Unit = {
     // write carbondata file by spark
-    val cc = ExampleUtils.createCarbonContext("FlinkExample")
+    val cc = ExampleUtils.createCarbonSession("FlinkExample")
     val path = ExampleUtils.writeSampleCarbonFile(cc, "carbon1")
 
     // read two columns by flink
@@ -38,11 +38,11 @@ object FlinkExample {
     projection.addColumn("c1")  // column c1
     projection.addColumn("c3")  // column c3
     val conf = new Configuration()
-    CarbonInputFormat.setColumnProjection(conf, projection)
+    CarbonTableInputFormat.setColumnProjection(conf, projection)
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.readHadoopFile(
-      new CarbonInputFormat[Array[Object]],
+      new CarbonTableInputFormat[Array[Object]],
       classOf[Void],
       classOf[Array[Object]],
       path,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
index 63b1c5a..856084b 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
@@ -20,7 +20,6 @@ package org.apache.carbondata.examples
 import java.io.{File, PrintWriter}
 import java.net.ServerSocket
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.CarbonSparkStreamingFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 5cebc12..f629d40 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -33,8 +33,8 @@ import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -498,7 +498,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
       for (Segment segment : streamSegments) {
-        String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
+        String segmentDir = CarbonTablePath.getSegmentPath(
+            identifier.getTablePath(), segment.getSegmentNo());
         FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
         if (FileFactory.isFileExist(segmentDir, fileType)) {
           String indexName = CarbonTablePath.getCarbonStreamIndexFileName();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index b39c44c..3f0ca42 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -34,11 +34,11 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData
 import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
 
 // This testsuite test insert and insert overwrite with other commands concurrently

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index b511ee8..6e6be68 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll {
   var executorService: ExecutorService = _

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
index 3e06bde..471b645 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.exception;
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+
 // This exception will be thrown when processMetaData failed in
 // Carbon's RunnableCommand
 public class ProcessMetaDataException extends MalformedCarbonCommandException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index b69ec37..bfb1616 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 object CarbonStore {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -139,9 +138,8 @@ object CarbonStore {
         carbonCleanFilesLock =
           CarbonLockUtil
             .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
-        SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-          isForceDeletion = true, carbonTable, currentTablePartitions.map(_.asJava).orNull)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
         CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
         currentTablePartitions match {
           case Some(partitions) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
new file mode 100644
index 0000000..36d8c51
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
+import org.apache.carbondata.spark.util.CommonUtil
+
+object CsvRDDHelper {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * createsw a RDD that does reading of multiple CSV files
+   */
+  def csvFileScanRDD(
+      spark: SparkSession,
+      model: CarbonLoadModel,
+      hadoopConf: Configuration
+  ): RDD[InternalRow] = {
+    // 1. partition
+    val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+    val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+    val defaultParallelism = spark.sparkContext.defaultParallelism
+    CommonUtil.configureCSVInputFormat(hadoopConf, model)
+    hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+    val jobConf = new JobConf(hadoopConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val jobContext = new JobContextImpl(jobConf, null)
+    val inputFormat = new CSVInputFormat()
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val splitFiles = rawSplits.map { split =>
+      val fileSplit = split.asInstanceOf[FileSplit]
+      PartitionedFile(
+        InternalRow.empty,
+        fileSplit.getPath.toString,
+        fileSplit.getStart,
+        fileSplit.getLength,
+        fileSplit.getLocations)
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
+    val bytesPerCore = totalBytes / defaultParallelism
+
+    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+    LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+                s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    val partitions = new ArrayBuffer[FilePartition]
+    val currentFiles = new ArrayBuffer[PartitionedFile]
+    var currentSize = 0L
+
+    def closePartition(): Unit = {
+      if (currentFiles.nonEmpty) {
+        val newPartition =
+          FilePartition(
+            partitions.size,
+            currentFiles.toArray.toSeq)
+        partitions += newPartition
+      }
+      currentFiles.clear()
+      currentSize = 0
+    }
+
+    splitFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
+      }
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles += file
+    }
+    closePartition()
+
+    // 2. read function
+    val serializableConfiguration = new SerializableConfiguration(jobConf)
+    val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+      override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+        new Iterator[InternalRow] {
+          val hadoopConf = serializableConfiguration.value
+          val jobTrackerId: String = {
+            val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+            formatter.format(new Date())
+          }
+          val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
+          val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+          val inputSplit =
+            new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
+          var finished = false
+          val inputFormat = new CSVInputFormat()
+          val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
+          reader.initialize(inputSplit, hadoopAttemptContext)
+
+          override def hasNext: Boolean = {
+            if (!finished) {
+              if (reader != null) {
+                if (reader.nextKeyValue()) {
+                  true
+                } else {
+                  finished = true
+                  reader.close()
+                  false
+                }
+              } else {
+                finished = true
+                false
+              }
+            } else {
+              false
+            }
+          }
+
+          override def next(): InternalRow = {
+            new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
+          }
+        }
+      }
+    }
+    new FileScanRDD(spark, readFunction, partitions)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index e1bd84b..1062cd7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -34,7 +34,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 /**
  * Use sortBy operator in spark to load the data
@@ -52,7 +51,7 @@ object DataLoadProcessBuilderOnSpark {
     } else {
       // input data from files
       val columnCount = model.getCsvHeaderColumns.length
-      DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf)
+      CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
         .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 773ea16..59ed8ba 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel}
 import org.apache.spark.sql.types._
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogService
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -46,7 +47,6 @@ import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object CarbonScalaUtil {
   def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 9104a32..d3093fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -816,8 +816,6 @@ object CommonUtil {
                   val carbonTable = CarbonMetadata.getInstance
                     .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
                   SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
-                  DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-                    isForceDeletion = true, carbonTable, null)
                 } catch {
                   case _: Exception =>
                     LOGGER.warn(s"Error while cleaning table " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 71ce2c6..3c21af3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -31,19 +31,15 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
-import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.DataTypeUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CompactionType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index e47c500..2f4aa30 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
@@ -47,7 +47,6 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEv
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
 import org.apache.carbondata.streaming.segment.StreamSegment
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index d2adc57..2092028 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 0861c63..81427a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 case class CarbonDeleteLoadByIdCommand(
     loadIds: Seq[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index dcbc6ce..1d76bda 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 case class CarbonDeleteLoadByLoadDateCommand(
     databaseNameOp: Option[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 70134a6..eb00ebf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -59,10 +59,9 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.SegmentStatus
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
@@ -71,18 +70,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
-import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
+import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],
@@ -193,12 +189,18 @@ case class CarbonLoadDataCommand(
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
       carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
+
+      val javaPartition = mutable.Map[String, String]()
+      partition.foreach { case (k, v) =>
+        if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get
+      }
+
       new CarbonLoadModelBuilder(table).build(
         options.asJava,
         optionsFinal,
         carbonLoadModel,
         hadoopConf,
-        partition,
+        javaPartition.asJava,
         dataFrame.isDefined)
       // Delete stale segment folders that are not in table status but are physically present in
       // the Fact folder
@@ -231,11 +233,7 @@ case class CarbonLoadDataCommand(
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         // Clean up the old invalid segment data before creating a new entry for new load.
-        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-          isForceDeletion = false,
-          table,
-          currPartitions)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty && !table.isHivePartitionTable) {
           CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
@@ -679,7 +677,7 @@ case class CarbonLoadDataCommand(
           }
         }
         val columnCount = carbonLoadModel.getCsvHeaderColumns.length
-        val rdd = DataLoadingUtil.csvFileScanRDD(
+        val rdd = CsvRDDHelper.csvFileScanRDD(
           sparkSession,
           model = carbonLoadModel,
           hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index f074285..230378b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 5165342..2a92478 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,10 +25,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.ArrayType
 import org.apache.spark.storage.StorageLevel
 
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index cf77e0f..2503fc3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -25,12 +25,12 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.util.AlterTableUtil
 
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 8001a93..0298eea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -28,12 +28,12 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{ConcurrentOperationException, ProcessMetaDataException}
 
 case class CarbonDropTableCommand(
     ifExistsSet: Boolean,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 61a31a5..2eed988 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -50,11 +50,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
 
 class CarbonFileFormat
   extends FileFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
index f8cfa6b..45edd3d 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 
 class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -555,7 +556,7 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with
 
     sql(
       s"""
-         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | LOAD DATA LOCAL INPATH '${FileFactory.getUpdatedFilePath(storeLocation)}'
          | INTO TABLE hive_table
            """.stripMargin)
 
@@ -923,7 +924,7 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with
 
     sql(
       s"""
-         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | LOAD DATA LOCAL INPATH '${FileFactory.getUpdatedFilePath(storeLocation)}'
          | INTO TABLE hive_table
            """.stripMargin)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index 995f041..d94570a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 1104229..66f8bc5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -74,7 +74,8 @@ public class DataMapWriterListener {
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
     List<AbstractDataMapWriter> writers = registry.get(columns);
-    AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
+    AbstractDataMapWriter writer = factory.createWriter(
+        new Segment(segmentId, null), dataWritePath);
     if (writers != null) {
       writers.add(writer);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 17e8dbe..29dfa40 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
 
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -100,6 +102,26 @@ public class CarbonLoadModelBuilder {
       Map<String, String> optionsFinal,
       CarbonLoadModel carbonLoadModel,
       Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
+    build(options, optionsFinal, carbonLoadModel, hadoopConf, new HashMap<String, String>(), false);
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @param optionsFinal Load options that populated with default values for optional options
+   * @param carbonLoadModel The output load model
+   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+   *                   user provided load options
+   * @param partitions partition name map to path
+   * @param isDataFrame true if build for load for dataframe
+   */
+  public void build(
+      Map<String, String> options,
+      Map<String, String> optionsFinal,
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf,
+      Map<String, String> partitions,
+      boolean isDataFrame) throws InvalidLoadOptionException, IOException {
     carbonLoadModel.setTableName(table.getTableName());
     carbonLoadModel.setDatabaseName(table.getDatabaseName());
     carbonLoadModel.setTablePath(table.getTablePath());
@@ -214,8 +236,18 @@ public class CarbonLoadModelBuilder {
     carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
     carbonLoadModel.setCsvHeader(fileHeader);
     carbonLoadModel.setColDictFilePath(column_dict);
+
+    List<String> ignoreColumns = new ArrayList<>();
+    if (!isDataFrame) {
+      for (Map.Entry<String, String> partition : partitions.entrySet()) {
+        if (partition.getValue() != null) {
+          ignoreColumns.add(partition.getKey());
+        }
+      }
+    }
+
     carbonLoadModel.setCsvHeaderColumns(
-        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf));
+        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns));
 
     int validatedMaxColumns = validateMaxColumns(
         carbonLoadModel.getCsvHeaderColumns(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 5af4859..bac1a94 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.Maps;
@@ -201,6 +203,16 @@ public class LoadOption {
   public static String[] getCsvHeaderColumns(
       CarbonLoadModel carbonLoadModel,
       Configuration hadoopConf) throws IOException {
+    return getCsvHeaderColumns(carbonLoadModel, hadoopConf, new LinkedList<String>());
+  }
+
+  /**
+   * Return CSV header field names, with partition column
+   */
+  public static String[] getCsvHeaderColumns(
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf,
+      List<String> staticPartitionCols) throws IOException {
     String delimiter;
     if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) {
       delimiter = CarbonCommonConstants.COMMA;
@@ -231,7 +243,7 @@ public class LoadOption {
     }
 
     if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
-        carbonLoadModel.getCarbonDataLoadSchema())) {
+        carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
       if (csvFile == null) {
         LOG.error("CSV header in DDL is not proper."
             + " Column names in schema and CSV header are not the same.");
@@ -249,4 +261,5 @@ public class LoadOption {
     }
     return csvColumns;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 1ab803b..438e7db 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1013,7 +1013,8 @@ public final class CarbonDataMergerUtil {
     CarbonFile[] updateDeltaFiles = null;
     Set<String> uniqueBlocks = new HashSet<String>();
 
-    String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), seg.getSegmentNo());
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        identifier.getTablePath(), seg.getSegmentNo());
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
     CarbonFile[] allSegmentFiles = segDir.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index efd715c..ba2b0c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -392,7 +392,8 @@ public final class CarbonDataProcessorUtil {
    *
    * @return data directory path
    */
-  public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) {
+  public static String createCarbonStoreLocation(String databaseName, String tableName,
+      String segmentId) {
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 38e5698..a948538 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -16,9 +16,13 @@
  */
 package org.apache.carbondata.processing.util;
 
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.nio.charset.Charset;
 import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -36,6 +40,9 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -55,6 +62,7 @@ import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
 
 import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
+import com.google.gson.Gson;
 
 public final class CarbonLoaderUtil {
 


Mime
View raw message