carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [41/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module
Date Tue, 10 Oct 2017 03:08:28 GMT
[CARBONDATA-1530] Clean up carbon-processing module

This closes #1391


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/349c59c7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/349c59c7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/349c59c7

Branch: refs/heads/streaming_ingest
Commit: 349c59c7b0e48d7d7551c7a993233b9f7960da94
Parents: b9f10da
Author: Jacky Li <jacky.likun@qq.com>
Authored: Thu Sep 28 00:26:50 2017 +0800
Committer: QiangCai <qiangcai@qq.com>
Committed: Sun Oct 1 09:41:19 2017 +0800

----------------------------------------------------------------------
 .../examples/GenerateDictionaryExample.scala    |   2 +-
 .../hadoop/test/util/StoreCreator.java          |  23 +-
 .../carbondata/presto/CarbondataPageSource.java |   4 +-
 .../presto/util/CarbonDataStoreCreator.scala    |  46 +-
 .../dataload/TestLoadDataUseAllDictionary.scala |   3 +-
 .../TestLoadDataWithNotProperInputFile.scala    |   2 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java | 905 -----------------
 .../spark/load/DeleteLoadFolders.java           | 151 ---
 .../carbondata/spark/load/FailureCauses.java    |  28 -
 .../spark/partition/api/DataPartitioner.java    |  35 -
 .../spark/partition/api/Partition.java          |  35 -
 .../partition/api/impl/DefaultLoadBalancer.java |  63 --
 .../api/impl/PartitionMultiFileImpl.java        |  44 -
 .../api/impl/QueryPartitionHelper.java          |  74 --
 .../api/impl/SampleDataPartitionerImpl.java     |  40 -
 .../carbondata/spark/splits/TableSplit.java     | 124 ---
 .../carbondata/spark/util/CarbonQueryUtil.java  | 138 ---
 .../carbondata/spark/util/LoadMetadataUtil.java |  47 -
 .../org/apache/carbondata/spark/util/Util.java  |  30 +
 .../apache/carbondata/spark/CarbonOption.scala  |   2 +-
 .../carbondata/spark/PartitionFactory.scala     |   2 +-
 .../load/DataLoadProcessBuilderOnSpark.scala    |   8 +-
 .../load/DataLoadProcessorStepOnSpark.scala     |  22 +-
 .../spark/load/GlobalSortHelper.scala           |   4 +-
 .../carbondata/spark/load/ValidateUtil.scala    |   4 +-
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  |   8 +-
 .../spark/rdd/CarbonCleanFilesRDD.scala         |   2 +-
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |   2 +-
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |   2 +-
 .../spark/rdd/CarbonDropTableRDD.scala          |   2 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |   9 +-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   2 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  11 +-
 .../spark/rdd/CarbonScanPartitionRDD.scala      |   7 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   2 +-
 .../spark/rdd/DataManagementFunc.scala          |   6 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  18 +-
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |   6 +-
 .../carbondata/spark/util/CommonUtil.scala      |   9 +-
 .../spark/util/GlobalDictionaryUtil.scala       |  13 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +-
 .../command/carbonTableSchemaCommon.scala       |   7 +-
 .../spark/sql/hive/DistributionUtil.scala       |   2 +-
 .../scala/org/apache/spark/util/FileUtils.scala |   2 +-
 .../org/apache/spark/util/PartitionUtils.scala  |   2 +-
 .../scala/org/apache/spark/util/SparkUtil.scala |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  24 +-
 .../sql/execution/command/IUDCommands.scala     |   2 +-
 .../execution/command/carbonTableSchema.scala   |  14 +-
 .../spark/util/AllDictionaryTestCase.scala      |   4 +-
 .../spark/util/DictionaryTestCaseUtil.scala     |   2 +-
 .../util/ExternalColumnDictionaryTestCase.scala |   7 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |   4 +-
 .../util/GlobalDictionaryUtilTestCase.scala     |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  24 +-
 .../carbondata/spark/util/QueryPlanUtil.scala   |  55 --
 .../AlterTableCompactionCommand.scala           |   2 +-
 .../command/management/LoadTableCommand.scala   |  10 +-
 .../command/mutation/DeleteExecution.scala      |  30 +-
 .../mutation/ProjectForDeleteCommand.scala      |   2 +-
 .../mutation/ProjectForUpdateCommand.scala      |   2 +-
 .../AlterTableDropCarbonPartitionCommand.scala  |   2 +-
 .../AlterTableSplitCarbonPartitionCommand.scala |   2 +-
 .../spark/util/AllDictionaryTestCase.scala      |   4 +-
 .../spark/util/DictionaryTestCaseUtil.scala     |   2 +-
 .../util/ExternalColumnDictionaryTestCase.scala |   6 +-
 .../api/dataloader/DataLoadModel.java           | 188 ----
 .../processing/api/dataloader/SchemaInfo.java   | 154 ---
 .../constants/DataProcessorConstants.java       |  62 --
 .../constants/TableOptionConstant.java          |  41 -
 .../processing/csvload/BlockDetails.java        |  84 --
 .../processing/csvload/BoundedInputStream.java  | 129 ---
 .../processing/csvload/CSVInputFormat.java      | 326 ------
 .../csvload/CSVRecordReaderIterator.java        | 105 --
 .../processing/csvload/StringArrayWritable.java |  70 --
 .../manager/CarbonDataProcessorManager.java     |  67 --
 .../processing/datatypes/ArrayDataType.java     |   2 +-
 .../processing/datatypes/PrimitiveDataType.java |   6 +-
 .../processing/datatypes/StructDataType.java    |   2 +-
 .../processing/etl/DataLoadingException.java    |  50 -
 .../exception/DataLoadingException.java         |  50 +
 .../exception/SliceMergerException.java         |  78 ++
 .../loading/AbstractDataLoadProcessorStep.java  | 167 ++++
 .../processing/loading/BadRecordsLogger.java    | 278 ++++++
 .../loading/CarbonDataLoadConfiguration.java    | 313 ++++++
 .../processing/loading/DataField.java           |  53 +
 .../processing/loading/DataLoadExecutor.java    | 108 ++
 .../loading/DataLoadProcessBuilder.java         | 226 +++++
 .../processing/loading/FailureCauses.java       |  28 +
 .../loading/complexobjects/ArrayObject.java     |  35 +
 .../loading/complexobjects/StructObject.java    |  36 +
 .../constants/DataLoadProcessorConstants.java   |  39 +
 .../loading/converter/BadRecordLogHolder.java   |  75 ++
 .../converter/DictionaryCardinalityFinder.java  |  26 +
 .../loading/converter/FieldConverter.java       |  36 +
 .../loading/converter/RowConverter.java         |  36 +
 .../AbstractDictionaryFieldConverterImpl.java   |  27 +
 .../impl/ComplexFieldConverterImpl.java         |  58 ++
 .../impl/DictionaryFieldConverterImpl.java      | 134 +++
 .../DirectDictionaryFieldConverterImpl.java     |  88 ++
 .../converter/impl/FieldEncoderFactory.java     | 142 +++
 .../impl/MeasureFieldConverterImpl.java         | 101 ++
 .../impl/NonDictionaryFieldConverterImpl.java   |  90 ++
 .../converter/impl/RowConverterImpl.java        | 241 +++++
 .../loading/csvinput/BlockDetails.java          |  84 ++
 .../loading/csvinput/BoundedInputStream.java    | 129 +++
 .../loading/csvinput/CSVInputFormat.java        | 326 ++++++
 .../csvinput/CSVRecordReaderIterator.java       | 105 ++
 .../loading/csvinput/StringArrayWritable.java   |  70 ++
 .../DictionaryServerClientDictionary.java       |  89 ++
 .../loading/dictionary/DirectDictionary.java    |  57 ++
 .../loading/dictionary/InMemBiDictionary.java   |  80 ++
 .../dictionary/PreCreatedDictionary.java        |  55 ++
 .../exception/BadRecordFoundException.java      |  65 ++
 .../exception/CarbonDataLoadingException.java   |  73 ++
 .../loading/exception/NoRetryException.java     |  68 ++
 .../loading/model/CarbonDataLoadSchema.java     |  57 ++
 .../loading/model/CarbonLoadModel.java          | 764 ++++++++++++++
 .../loading/parser/CarbonParserFactory.java     |  80 ++
 .../loading/parser/ComplexParser.java           |  29 +
 .../loading/parser/GenericParser.java           |  34 +
 .../processing/loading/parser/RowParser.java    |  31 +
 .../loading/parser/impl/ArrayParserImpl.java    |  68 ++
 .../parser/impl/PrimitiveParserImpl.java        |  28 +
 .../loading/parser/impl/RowParserImpl.java      |  95 ++
 .../loading/parser/impl/StructParserImpl.java   |  70 ++
 .../loading/partition/Partitioner.java          |  27 +
 .../partition/impl/HashPartitionerImpl.java     | 106 ++
 .../processing/loading/row/CarbonRowBatch.java  |  64 ++
 .../processing/loading/row/CarbonSortBatch.java |  45 +
 .../loading/sort/AbstractMergeSorter.java       |  43 +
 .../loading/sort/SortScopeOptions.java          |  54 +
 .../loading/sort/SortStepRowUtil.java           |  74 ++
 .../processing/loading/sort/Sorter.java         |  54 +
 .../processing/loading/sort/SorterFactory.java  |  72 ++
 .../sort/impl/ParallelReadMergeSorterImpl.java  | 231 +++++
 ...arallelReadMergeSorterWithBucketingImpl.java | 276 ++++++
 .../loading/sort/impl/ThreadStatusObserver.java |  55 ++
 .../UnsafeBatchParallelReadMergeSorterImpl.java | 338 +++++++
 .../impl/UnsafeParallelReadMergeSorterImpl.java | 216 ++++
 ...arallelReadMergeSorterWithBucketingImpl.java | 266 +++++
 .../sort/unsafe/UnsafeCarbonRowPage.java        | 405 ++++++++
 .../loading/sort/unsafe/UnsafeSortDataRows.java | 413 ++++++++
 .../unsafe/comparator/UnsafeRowComparator.java  | 131 +++
 .../UnsafeRowComparatorForNormalDIms.java       |  59 ++
 .../sort/unsafe/holder/SortTempChunkHolder.java |  36 +
 .../sort/unsafe/holder/UnsafeCarbonRow.java     |  24 +
 .../unsafe/holder/UnsafeCarbonRowForMerge.java  |  23 +
 .../holder/UnsafeFinalMergePageHolder.java      | 105 ++
 .../unsafe/holder/UnsafeInmemoryHolder.java     | 100 ++
 .../holder/UnsafeInmemoryMergeHolder.java       | 105 ++
 .../holder/UnsafeSortTempFileChunkHolder.java   | 472 +++++++++
 .../UnsafeInMemoryIntermediateDataMerger.java   | 213 ++++
 .../merger/UnsafeIntermediateFileMerger.java    | 367 +++++++
 .../unsafe/merger/UnsafeIntermediateMerger.java | 187 ++++
 .../UnsafeSingleThreadFinalSortFilesMerger.java | 259 +++++
 .../loading/sort/unsafe/sort/TimSort.java       | 986 +++++++++++++++++++
 .../unsafe/sort/UnsafeIntSortDataFormat.java    |  72 ++
 .../CarbonRowDataWriterProcessorStepImpl.java   | 299 ++++++
 .../steps/DataConverterProcessorStepImpl.java   | 227 +++++
 ...ConverterProcessorWithBucketingStepImpl.java | 231 +++++
 .../steps/DataWriterBatchProcessorStepImpl.java | 155 +++
 .../steps/DataWriterProcessorStepImpl.java      | 199 ++++
 .../loading/steps/InputProcessorStepImpl.java   | 244 +++++
 .../loading/steps/SortProcessorStepImpl.java    |  83 ++
 .../merger/AbstractResultProcessor.java         |   2 +-
 .../processing/merger/CarbonDataMergerUtil.java |   2 +-
 .../merger/CompactionResultSortProcessor.java   |  12 +-
 .../merger/RowResultMergerProcessor.java        |   4 +-
 .../merger/exeception/SliceMergerException.java |  78 --
 .../processing/model/CarbonDataLoadSchema.java  |  57 --
 .../processing/model/CarbonLoadModel.java       | 764 --------------
 .../newflow/AbstractDataLoadProcessorStep.java  | 167 ----
 .../newflow/CarbonDataLoadConfiguration.java    | 313 ------
 .../processing/newflow/DataField.java           |  53 -
 .../processing/newflow/DataLoadExecutor.java    | 109 --
 .../newflow/DataLoadProcessBuilder.java         | 231 -----
 .../newflow/complexobjects/ArrayObject.java     |  35 -
 .../newflow/complexobjects/StructObject.java    |  36 -
 .../constants/DataLoadProcessorConstants.java   |  39 -
 .../newflow/converter/BadRecordLogHolder.java   |  75 --
 .../converter/DictionaryCardinalityFinder.java  |  26 -
 .../newflow/converter/FieldConverter.java       |  36 -
 .../newflow/converter/RowConverter.java         |  36 -
 .../AbstractDictionaryFieldConverterImpl.java   |  27 -
 .../impl/ComplexFieldConverterImpl.java         |  58 --
 .../impl/DictionaryFieldConverterImpl.java      | 134 ---
 .../DirectDictionaryFieldConverterImpl.java     |  88 --
 .../converter/impl/FieldEncoderFactory.java     | 142 ---
 .../impl/MeasureFieldConverterImpl.java         | 101 --
 .../impl/NonDictionaryFieldConverterImpl.java   |  90 --
 .../converter/impl/RowConverterImpl.java        | 241 -----
 .../DictionaryServerClientDictionary.java       |  89 --
 .../newflow/dictionary/DirectDictionary.java    |  57 --
 .../newflow/dictionary/InMemBiDictionary.java   |  80 --
 .../dictionary/PreCreatedDictionary.java        |  55 --
 .../exception/BadRecordFoundException.java      |  65 --
 .../exception/CarbonDataLoadingException.java   |  73 --
 .../newflow/exception/NoRetryException.java     |  68 --
 .../newflow/parser/CarbonParserFactory.java     |  80 --
 .../newflow/parser/ComplexParser.java           |  29 -
 .../newflow/parser/GenericParser.java           |  34 -
 .../processing/newflow/parser/RowParser.java    |  31 -
 .../newflow/parser/impl/ArrayParserImpl.java    |  68 --
 .../parser/impl/PrimitiveParserImpl.java        |  28 -
 .../newflow/parser/impl/RowParserImpl.java      |  95 --
 .../newflow/parser/impl/StructParserImpl.java   |  70 --
 .../newflow/partition/Partitioner.java          |  27 -
 .../partition/impl/HashPartitionerImpl.java     | 106 --
 .../processing/newflow/row/CarbonRowBatch.java  |  64 --
 .../processing/newflow/row/CarbonSortBatch.java |  45 -
 .../newflow/sort/AbstractMergeSorter.java       |  43 -
 .../newflow/sort/SortScopeOptions.java          |  54 -
 .../newflow/sort/SortStepRowUtil.java           |  74 --
 .../processing/newflow/sort/Sorter.java         |  54 -
 .../processing/newflow/sort/SorterFactory.java  |  72 --
 .../sort/impl/ParallelReadMergeSorterImpl.java  | 231 -----
 ...arallelReadMergeSorterWithBucketingImpl.java | 276 ------
 .../newflow/sort/impl/ThreadStatusObserver.java |  55 --
 .../UnsafeBatchParallelReadMergeSorterImpl.java | 338 -------
 .../impl/UnsafeParallelReadMergeSorterImpl.java | 216 ----
 ...arallelReadMergeSorterWithBucketingImpl.java | 266 -----
 .../sort/unsafe/UnsafeCarbonRowPage.java        | 405 --------
 .../newflow/sort/unsafe/UnsafeSortDataRows.java | 413 --------
 .../unsafe/comparator/UnsafeRowComparator.java  | 131 ---
 .../UnsafeRowComparatorForNormalDIms.java       |  59 --
 .../sort/unsafe/holder/SortTempChunkHolder.java |  36 -
 .../sort/unsafe/holder/UnsafeCarbonRow.java     |  24 -
 .../unsafe/holder/UnsafeCarbonRowForMerge.java  |  23 -
 .../holder/UnsafeFinalMergePageHolder.java      | 105 --
 .../unsafe/holder/UnsafeInmemoryHolder.java     | 100 --
 .../holder/UnsafeInmemoryMergeHolder.java       | 105 --
 .../holder/UnsafeSortTempFileChunkHolder.java   | 472 ---------
 .../UnsafeInMemoryIntermediateDataMerger.java   | 213 ----
 .../merger/UnsafeIntermediateFileMerger.java    | 367 -------
 .../unsafe/merger/UnsafeIntermediateMerger.java | 187 ----
 .../UnsafeSingleThreadFinalSortFilesMerger.java | 259 -----
 .../newflow/sort/unsafe/sort/TimSort.java       | 986 -------------------
 .../unsafe/sort/UnsafeIntSortDataFormat.java    |  72 --
 .../CarbonRowDataWriterProcessorStepImpl.java   | 299 ------
 .../steps/DataConverterProcessorStepImpl.java   | 227 -----
 ...ConverterProcessorWithBucketingStepImpl.java | 231 -----
 .../steps/DataWriterBatchProcessorStepImpl.java | 155 ---
 .../steps/DataWriterProcessorStepImpl.java      | 199 ----
 .../newflow/steps/InputProcessorStepImpl.java   | 244 -----
 .../newflow/steps/SortProcessorStepImpl.java    |  83 --
 .../processing/partition/DataPartitioner.java   |  35 +
 .../processing/partition/Partition.java         |  35 +
 .../partition/impl/DefaultLoadBalancer.java     |  63 ++
 .../partition/impl/PartitionMultiFileImpl.java  |  44 +
 .../partition/impl/QueryPartitionHelper.java    |  74 ++
 .../impl/SampleDataPartitionerImpl.java         |  40 +
 .../spliter/AbstractCarbonQueryExecutor.java    | 133 +++
 .../partition/spliter/CarbonSplitExecutor.java  |  64 ++
 .../partition/spliter/RowResultProcessor.java   | 105 ++
 .../exception/AlterPartitionSliceException.java |  78 ++
 .../schema/metadata/SortObserver.java           |  42 -
 .../CarbonSortKeyAndGroupByException.java       |  87 ++
 .../sortdata/AbstractTempSortFileWriter.java    | 100 ++
 .../sortdata/CompressedTempSortFileWriter.java  |  78 ++
 .../sort/sortdata/IntermediateFileMerger.java   | 385 ++++++++
 .../sort/sortdata/NewRowComparator.java         |  71 ++
 .../sortdata/NewRowComparatorForNormalDims.java |  59 ++
 .../processing/sort/sortdata/RowComparator.java |  94 ++
 .../sortdata/RowComparatorForNormalDims.java    |  62 ++
 .../SingleThreadFinalSortFilesMerger.java       | 312 ++++++
 .../processing/sort/sortdata/SortDataRows.java  | 437 ++++++++
 .../sortdata/SortIntermediateFileMerger.java    | 114 +++
 .../processing/sort/sortdata/SortObserver.java  |  42 +
 .../sort/sortdata/SortParameters.java           | 602 +++++++++++
 .../sort/sortdata/SortTempFileChunkHolder.java  | 522 ++++++++++
 .../sort/sortdata/SortTempFileChunkWriter.java  |  75 ++
 .../sort/sortdata/TempSortFileReader.java       |  37 +
 .../sort/sortdata/TempSortFileWriter.java       |  46 +
 .../sortdata/TempSortFileWriterFactory.java     |  41 +
 .../UnCompressedTempSortFileWriter.java         | 112 +++
 .../CarbonSortKeyAndGroupByException.java       |  87 --
 .../sortdata/AbstractTempSortFileWriter.java    | 100 --
 .../sortdata/CompressedTempSortFileWriter.java  |  78 --
 .../sortdata/IntermediateFileMerger.java        | 385 --------
 .../sortdata/NewRowComparator.java              |  71 --
 .../sortdata/NewRowComparatorForNormalDims.java |  59 --
 .../sortandgroupby/sortdata/RowComparator.java  |  94 --
 .../sortdata/RowComparatorForNormalDims.java    |  62 --
 .../sortandgroupby/sortdata/SortDataRows.java   | 437 --------
 .../sortdata/SortIntermediateFileMerger.java    | 114 ---
 .../sortandgroupby/sortdata/SortParameters.java | 603 ------------
 .../sortdata/SortTempFileChunkHolder.java       | 522 ----------
 .../sortdata/SortTempFileChunkWriter.java       |  75 --
 .../sortdata/TempSortFileReader.java            |  37 -
 .../sortdata/TempSortFileWriter.java            |  46 -
 .../sortdata/TempSortFileWriterFactory.java     |  41 -
 .../UnCompressedTempSortFileWriter.java         | 112 ---
 .../spliter/AbstractCarbonQueryExecutor.java    | 133 ---
 .../processing/spliter/CarbonSplitExecutor.java |  64 --
 .../processing/spliter/RowResultProcessor.java  | 105 --
 .../exception/AlterPartitionSliceException.java |  78 --
 .../processing/splits/TableSplit.java           | 124 +++
 .../store/CarbonDataFileAttributes.java         |   9 -
 .../store/CarbonDataWriterFactory.java          |   4 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  22 +-
 .../store/CarbonFactDataHandlerModel.java       |   8 +-
 .../processing/store/CarbonKeyBlockHolder.java  |  46 -
 .../store/SingleThreadFinalSortFilesMerger.java | 313 ------
 .../store/colgroup/ColGroupDataHolder.java      | 100 --
 .../store/colgroup/ColGroupMinMax.java          | 215 ----
 .../store/colgroup/ColumnDataHolder.java        |  38 -
 .../processing/store/colgroup/DataHolder.java   |  38 -
 .../store/writer/AbstractFactDataWriter.java    |   2 +-
 .../store/writer/CarbonFactDataWriter.java      |   2 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |   2 +-
 .../csvbased/BadRecordsLogger.java              | 278 ------
 .../util/CarbonDataProcessorUtil.java           |   8 +-
 .../processing/util/CarbonLoaderUtil.java       | 890 +++++++++++++++++
 .../processing/util/CarbonQueryUtil.java        | 124 +++
 .../processing/util/DeleteLoadFolders.java      | 151 +++
 .../processing/util/LoadMetadataUtil.java       |  47 +
 .../processing/util/TableOptionConstant.java    |  41 +
 .../carbondata/processing/StoreCreator.java     |  23 +-
 .../processing/csvload/CSVInputFormatTest.java  | 169 ----
 .../loading/csvinput/CSVInputFormatTest.java    | 169 ++++
 .../dictionary/InMemBiDictionaryTest.java       |  72 ++
 .../dictionary/InMemBiDictionaryTest.java       |  72 --
 .../store/colgroup/ColGroupMinMaxTest.java      | 227 -----
 324 files changed, 18066 insertions(+), 19292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
index 8b2ceba..94d35b0 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.examples.util.ExampleUtils
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 /**
  * example for global dictionary generation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 9be3ed5..4b04116 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -72,16 +72,15 @@ import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWrit
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
-import org.apache.carbondata.processing.constants.TableOptionConstant;
-import org.apache.carbondata.processing.csvload.BlockDetails;
-import org.apache.carbondata.processing.csvload.CSVInputFormat;
-import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator;
-import org.apache.carbondata.processing.csvload.StringArrayWritable;
-import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.newflow.DataLoadExecutor;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
@@ -394,7 +393,6 @@ public class StoreCreator {
       path.delete();
     }
 
-    SchemaInfo info = new SchemaInfo();
     BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
         0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
     Configuration configuration = new Configuration();
@@ -420,9 +418,6 @@ public class StoreCreator {
         new String[] {storeLocation},
         new CarbonIterator[]{readerIterator});
 
-    info.setDatabaseName(databaseName);
-    info.setTableName(tableName);
-
     writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
         new ArrayList<LoadMetadataDetails>());
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 4520476..fd65230 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -21,13 +21,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.scan.result.BatchResult;
 import org.apache.carbondata.presto.readers.StreamReader;
 import org.apache.carbondata.presto.readers.StreamReaders;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 
 import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables;
 import com.facebook.presto.spi.ConnectorPageSource;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 64f892d..07f9699 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -17,10 +17,10 @@
 
 package org.apache.carbondata.presto.util
 
-import java.util
 import java.io._
 import java.nio.charset.Charset
 import java.text.SimpleDateFormat
+import java.util
 import java.util.{ArrayList, Date, List, UUID}
 
 import scala.collection.JavaConversions._
@@ -30,42 +30,33 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapred.TaskAttemptID
-import org.apache.hadoop.mapreduce.{RecordReader, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{RecordReader, TaskType}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier, ReverseDictionary}
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
-import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier,
-ReverseDictionary}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.fileoperations.{AtomicFileOperations, AtomicFileOperationsImpl,
-FileWriteOperation}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata,
-CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.metadata.converter.{SchemaConverter,
-ThriftWrapperSchemaConverterImpl}
+import org.apache.carbondata.core.fileoperations.{AtomicFileOperations, AtomicFileOperationsImpl, FileWriteOperation}
+import org.apache.carbondata.core.metadata.converter.{SchemaConverter, ThriftWrapperSchemaConverterImpl}
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, CarbonMeasure, ColumnSchema}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
-import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension,
-CarbonMeasure, ColumnSchema}
+import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier, ColumnIdentifier}
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.writer.{CarbonDictionaryWriter, CarbonDictionaryWriterImpl,
-ThriftWriter}
-import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter,
-CarbonDictionarySortIndexWriterImpl, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
-import org.apache.carbondata.processing.api.dataloader.SchemaInfo
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat,
-CSVRecordReaderIterator, StringArrayWritable}
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.newflow.DataLoadExecutor
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortIndexWriterImpl, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
+import org.apache.carbondata.core.writer.{CarbonDictionaryWriter, CarbonDictionaryWriterImpl, ThriftWriter}
+import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator, StringArrayWritable}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.DataLoadExecutor
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.util.TableOptionConstant
 
 object CarbonDataStoreCreator {
 
@@ -455,7 +446,6 @@ object CarbonDataStoreCreator {
     if (path.exists()) {
       path.delete()
     }
-    val info: SchemaInfo = new SchemaInfo()
     val blockDetails: BlockDetails = new BlockDetails(
       new Path(loadModel.getFactFilePath),
       0,
@@ -488,8 +478,6 @@ object CarbonDataStoreCreator {
       hadoopAttemptContext)
     new DataLoadExecutor()
       .execute(loadModel, Array(storeLocation), Array(readerIterator))
-    info.setDatabaseName(databaseName)
-    info.setTableName(tableName)
     writeLoadMetadata(loadModel.getCarbonDataLoadSchema,
       loadModel.getTableName,
       loadModel.getTableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
index 8e16ba9..52edf3a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
@@ -18,9 +18,10 @@
 package org.apache.carbondata.spark.testsuite.dataload
 
 import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.processing.exception.DataLoadingException
+
 class TestLoadDataUseAllDictionary extends QueryTest with BeforeAndAfterAll{
   override def beforeAll {
     sql("DROP TABLE IF EXISTS t3")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
index 016b195..9237627 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.spark.testsuite.dataload
 import java.io.File
 
 import org.apache.spark.util.FileUtils
-import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil
 import org.apache.spark.sql.test.util.QueryTest
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
deleted file mode 100644
index 9fe003f..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ /dev/null
@@ -1,905 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.load;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.datastore.row.LoadStatusType;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.processing.merger.NodeBlockRelation;
-import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import com.google.gson.Gson;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.util.Utils;
-
-public final class CarbonLoaderUtil {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
-
-  private CarbonLoaderUtil() {
-  }
-
-
-
-  public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
-
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
-      deleteStorePath(segmentPath);
-    }
-  }
-
-  /**
-   * the method returns true if the segment has carbondata file else returns false.
-   *
-   * @param loadModel
-   * @param currentLoad
-   * @return
-   */
-  public static boolean isValidSegment(CarbonLoadModel loadModel,
-      int currentLoad) {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
-        .getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
-        loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
-
-    int fileCount = 0;
-    int partitionCount = carbonTable.getPartitionCount();
-    for (int i = 0; i < partitionCount; i++) {
-      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "",
-          currentLoad + "");
-      CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
-          FileFactory.getFileType(segmentPath));
-      CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
-
-        @Override
-        public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(
-              CarbonTablePath.getCarbonIndexExtension())
-              || file.getName().endsWith(
-              CarbonTablePath.getCarbonDataExtension());
-        }
-
-      });
-      fileCount += files.length;
-      if (files.length > 0) {
-        return true;
-      }
-    }
-    if (fileCount == 0) {
-      return false;
-    }
-    return true;
-  }
-  public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
-      final boolean isCompactionFlow) throws IOException {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    String metaDataLocation = carbonTable.getMetaDataFilepath();
-    final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
-
-    //delete folder which metadata no exist in tablestatus
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      final String partitionCount = i + "";
-      String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
-      FileType fileType = FileFactory.getFileType(partitionPath);
-      if (FileFactory.isFileExist(partitionPath, fileType)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
-        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-          @Override public boolean accept(CarbonFile path) {
-            String segmentId =
-                CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
-            boolean found = false;
-            for (int j = 0; j < details.length; j++) {
-              if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
-                  .equals(partitionCount)) {
-                found = true;
-                break;
-              }
-            }
-            return !found;
-          }
-        });
-        for (int k = 0; k < listFiles.length; k++) {
-          String segmentId =
-              CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
-          if (isCompactionFlow) {
-            if (segmentId.contains(".")) {
-              deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          } else {
-            if (!segmentId.contains(".")) {
-              deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          }
-        }
-      }
-    }
-  }
-
-  private static void deleteStorePath(String path) {
-    try {
-      FileType fileType = FileFactory.getFileType(path);
-      if (FileFactory.isFileExist(path, fileType)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
-        CarbonUtil.deleteFoldersAndFiles(carbonFile);
-      }
-    } catch (IOException | InterruptedException e) {
-      LOGGER.error("Unable to delete the given path :: " + e.getMessage());
-    }
-  }
-
-
-  /**
-   * This method will delete the local data load folder location after data load is complete
-   *
-   * @param loadModel
-   */
-  public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
-      boolean isCompactionFlow, boolean isAltPartitionFlow) {
-    String databaseName = loadModel.getDatabaseName();
-    String tableName = loadModel.getTableName();
-    String tempLocationKey = CarbonDataProcessorUtil
-        .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
-            loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow);
-    // form local store location
-    final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey);
-    if (localStoreLocations == null) {
-      throw new RuntimeException("Store location not set for the key " + tempLocationKey);
-    }
-    // submit local folder clean up in another thread so that main thread execution is not blocked
-    ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
-    try {
-      localFolderDeletionService.submit(new Callable<Void>() {
-        @Override public Void call() throws Exception {
-          long startTime = System.currentTimeMillis();
-          String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator);
-          for (String loc : locArray) {
-            try {
-              CarbonUtil.deleteFoldersAndFiles(new File(loc));
-            } catch (IOException | InterruptedException e) {
-              LOGGER.error(e,
-                  "Failed to delete local data load folder location: " + loc);
-            }
-          }
-          LOGGER.info("Deleted the local store location: " + localStoreLocations
-                + " : Time taken: " + (System.currentTimeMillis() - startTime));
-          return null;
-        }
-      });
-    } finally {
-      if (null != localFolderDeletionService) {
-        localFolderDeletionService.shutdown();
-      }
-    }
-
-  }
-
-  /**
-   * This API will write the load level metadata for the loadmanagement module inorder to
-   * manage the load and query execution management smoothly.
-   *
-   * @param newMetaEntry
-   * @param loadModel
-   * @return boolean which determines whether status update is done or not.
-   * @throws IOException
-   */
-  public static boolean recordLoadMetadata(LoadMetadataDetails newMetaEntry,
-      CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
-      throws IOException, InterruptedException {
-    boolean status = false;
-    String metaDataFilepath =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
-    try {
-      if (carbonLock.lockWithRetries()) {
-        LOGGER.info(
-            "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
-                + " for table status updation");
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-        List<LoadMetadataDetails> listOfLoadFolderDetails =
-            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        List<CarbonFile> staleFolders = new ArrayList<>();
-        Collections.addAll(listOfLoadFolderDetails, listOfLoadFolderDetailsArray);
-        // create a new segment Id if load has just begun else add the already generated Id
-        if (loadStartEntry) {
-          String segmentId =
-              String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
-          newMetaEntry.setLoadName(segmentId);
-          loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
-          loadModel.setSegmentId(segmentId);
-          // Exception should be thrown if:
-          // 1. If insert overwrite is in progress and any other load or insert operation
-          // is triggered
-          // 2. If load or insert into operation is in progress and insert overwrite operation
-          // is triggered
-          for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
-            if (entry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())) {
-              throw new RuntimeException("Already insert overwrite is in progress");
-            } else if (
-                newMetaEntry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())
-                    && entry.getLoadStatus().equals(LoadStatusType.IN_PROGRESS.getMessage())) {
-              throw new RuntimeException("Already insert into or load is in progress");
-            }
-          }
-          listOfLoadFolderDetails.add(newMetaEntry);
-        } else {
-          newMetaEntry.setLoadName(String.valueOf(loadModel.getSegmentId()));
-          // existing entry needs to be overwritten as the entry will exist with some
-          // intermediate status
-          int indexToOverwriteNewMetaEntry = 0;
-          for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
-            if (entry.getLoadName().equals(newMetaEntry.getLoadName())
-                && entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
-              break;
-            }
-            indexToOverwriteNewMetaEntry++;
-          }
-          if (listOfLoadFolderDetails.get(indexToOverwriteNewMetaEntry).getLoadStatus()
-              .equals(CarbonCommonConstants.MARKED_FOR_DELETE)) {
-            throw new RuntimeException("It seems insert overwrite has been issued during load");
-          }
-          if (insertOverwrite) {
-            for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
-              if (!entry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())) {
-                entry.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
-                // For insert overwrite, we will delete the old segment folder immediately
-                // So collect the old segments here
-                String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
-                // add to the deletion list only if file exist else HDFS file system will throw
-                // exception while deleting the file if file path does not exist
-                if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
-                  staleFolders.add(FileFactory.getCarbonFile(path));
-                }
-              }
-            }
-          }
-          listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
-        }
-        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
-            .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
-        // Delete all old stale segment folders
-        for (CarbonFile staleFolder : staleFolders) {
-          // try block is inside for loop because even if there is failure in deletion of 1 stale
-          // folder still remaining stale folders should be deleted
-          try {
-            CarbonUtil.deleteFoldersAndFiles(staleFolder);
-          } catch (IOException | InterruptedException e) {
-            LOGGER.error("Failed to delete stale folder: " + e.getMessage());
-          }
-        }
-        status = true;
-      } else {
-        LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
-            .getDatabaseName() + "." + loadModel.getTableName());
-      };
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info(
-            "Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
-                + "." + loadModel.getTableName());
-      } else {
-        LOGGER.error(
-            "Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." + loadModel
-                .getTableName() + " during table status updation");
-      }
-    }
-    return status;
-  }
-
-  /**
-   * Method to create new entry for load in table status file
-   *
-   * @param loadMetadataDetails
-   * @param loadStatus
-   * @param loadStartTime
-   * @param addLoadEndTime
-   */
-  public static void populateNewLoadMetaEntry(LoadMetadataDetails loadMetadataDetails,
-      String loadStatus, long loadStartTime, boolean addLoadEndTime) {
-    if (addLoadEndTime) {
-      long loadEndDate = CarbonUpdateUtil.readCurrentTime();
-      loadMetadataDetails.setLoadEndTime(loadEndDate);
-    }
-    loadMetadataDetails.setLoadStatus(loadStatus);
-    loadMetadataDetails.setLoadStartTime(loadStartTime);
-  }
-
-  public static void writeLoadMetadata(String storeLocation, String dbName, String tableName,
-      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(storeLocation, dbName, tableName);
-    String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
-
-    DataOutputStream dataOutputStream;
-    Gson gsonObjectToWrite = new Gson();
-    BufferedWriter brWriter = null;
-
-    AtomicFileOperations writeOperation =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
-    try {
-
-      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
-      brWriter.write(metadataInstance);
-    } finally {
-      try {
-        if (null != brWriter) {
-          brWriter.flush();
-        }
-      } catch (Exception e) {
-        LOGGER.error("error in  flushing ");
-
-      }
-      CarbonUtil.closeStreams(brWriter);
-      writeOperation.close();
-    }
-
-  }
-
-  public static String readCurrentTime() {
-    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    String date = null;
-
-    date = sdf.format(new Date());
-
-    return date;
-  }
-
-  public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
-      String carbonStorePath) throws IOException {
-    Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
-        CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath);
-    return dictCache.get(columnIdentifier);
-  }
-
-  public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
-      ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
-      throws IOException {
-    return getDictionary(
-        new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType,
-            CarbonStorePath.getCarbonTablePath(carbonStorePath, tableIdentifier)),
-        carbonStorePath);
-  }
-
-  /**
-   * This method will divide the blocks among the tasks of the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @param noOfNodesInput -1 if number of nodes has to be decided
-   *                       based on block location information
-   * @param parallelism    total no of tasks to execute in parallel
-   * @return
-   */
-  public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
-      List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
-      List<String> activeNode) {
-
-    Map<String, List<Distributable>> mapOfNodes =
-        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
-    int taskPerNode = parallelism / mapOfNodes.size();
-    //assigning non zero value to noOfTasksPerNode
-    int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
-    // divide the blocks of a node among the tasks of the node.
-    return assignBlocksToTasksPerNode(mapOfNodes, noOfTasksPerNode);
-  }
-
-  /**
-   * This method will divide the blocks among the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
-      int noOfNodesInput) {
-    return nodeBlockMapping(blockInfos, noOfNodesInput, null);
-  }
-
-  /**
-   * This method will divide the blocks among the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
-    // -1 if number of nodes has to be decided based on block location information
-    return nodeBlockMapping(blockInfos, -1);
-  }
-
-  /**
-   * the method returns the number of required executors
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> getRequiredExecutors(
-      List<Distributable> blockInfos) {
-    List<NodeBlockRelation> flattenedList =
-        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (Distributable blockInfo : blockInfos) {
-      try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
-      }
-    }
-    // sort the flattened data.
-    Collections.sort(flattenedList);
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
-    return nodeAndBlockMapping;
-  }
-
-  /**
-   * This method will divide the blocks among the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @param noOfNodesInput -1 if number of nodes has to be decided
-   *                       based on block location information
-   * @return
-   */
-  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
-      int noOfNodesInput, List<String> activeNodes) {
-
-    Map<String, List<Distributable>> nodeBlocksMap =
-        new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<NodeBlockRelation> flattenedList =
-        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    Set<Distributable> uniqueBlocks =
-        new HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    Set<String> nodes = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
-
-    int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
-    if (null != activeNodes) {
-      noofNodes = activeNodes.size();
-    }
-    int blocksPerNode = blockInfos.size() / noofNodes;
-    blocksPerNode = blocksPerNode <= 0 ? 1 : blocksPerNode;
-
-    // sort the flattened data.
-    Collections.sort(flattenedList);
-
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
-
-    // so now we have a map of node vs blocks. allocate the block as per the order
-    createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
-
-    // if any blocks remain then assign them to nodes in round robin.
-    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
-
-    return nodeBlocksMap;
-  }
-
-  /**
-   * Assigning the blocks of a node to tasks.
-   *
-   * @param nodeBlocksMap nodeName to list of blocks mapping
-   * @param noOfTasksPerNode
-   * @return
-   */
-  private static Map<String, List<List<Distributable>>> assignBlocksToTasksPerNode(
-      Map<String, List<Distributable>> nodeBlocksMap, int noOfTasksPerNode) {
-    Map<String, List<List<Distributable>>> outputMap =
-        new HashMap<String, List<List<Distributable>>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // for each node
-    for (Map.Entry<String, List<Distributable>> eachNode : nodeBlocksMap.entrySet()) {
-
-      List<Distributable> blockOfEachNode = eachNode.getValue();
-      //sorting the block so same block will be give to same executor
-      Collections.sort(blockOfEachNode);
-      // create the task list for each node.
-      createTaskListForNode(outputMap, noOfTasksPerNode, eachNode.getKey());
-
-      // take all the block of node and divide it among the tasks of a node.
-      divideBlockToTasks(outputMap, eachNode.getKey(), blockOfEachNode);
-    }
-
-    return outputMap;
-  }
-
-  /**
-   * This will divide the blocks of a node to tasks of the node.
-   *
-   * @param outputMap
-   * @param key
-   * @param blockOfEachNode
-   */
-  private static void divideBlockToTasks(Map<String, List<List<Distributable>>> outputMap,
-      String key, List<Distributable> blockOfEachNode) {
-
-    List<List<Distributable>> taskLists = outputMap.get(key);
-    int tasksOfNode = taskLists.size();
-    int i = 0;
-    for (Distributable block : blockOfEachNode) {
-
-      taskLists.get(i % tasksOfNode).add(block);
-      i++;
-    }
-
-  }
-
-  /**
-   * This will create the empty list for each task of a node.
-   *
-   * @param outputMap
-   * @param noOfTasksPerNode
-   * @param key
-   */
-  private static void createTaskListForNode(Map<String, List<List<Distributable>>> outputMap,
-      int noOfTasksPerNode, String key) {
-    List<List<Distributable>> nodeTaskList =
-        new ArrayList<List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (int i = 0; i < noOfTasksPerNode; i++) {
-      List<Distributable> eachTask =
-          new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-      nodeTaskList.add(eachTask);
-
-    }
-    outputMap.put(key, nodeTaskList);
-
-  }
-
-  /**
-   * If any left over data blocks are present then assign those to nodes in round robin way.
-   *
-   * @param outputMap
-   * @param uniqueBlocks
-   */
-  private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
-      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
-
-    if (activeNodes != null) {
-      for (String activeNode : activeNodes) {
-        List<Distributable> blockLst = outputMap.get(activeNode);
-        if (null == blockLst) {
-          blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        }
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
-        if (blockLst.size() > 0) {
-          outputMap.put(activeNode, blockLst);
-        }
-      }
-    } else {
-      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
-        List<Distributable> blockLst = entry.getValue();
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
-      }
-
-    }
-
-    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
-      Iterator<Distributable> blocks = uniqueBlocks.iterator();
-      if (blocks.hasNext()) {
-        Distributable block = blocks.next();
-        List<Distributable> blockLst = entry.getValue();
-        blockLst.add(block);
-        blocks.remove();
-      }
-    }
-  }
-
-  /**
-   * The method populate the blockLst to be allocate to a specific node.
-   * @param uniqueBlocks
-   * @param noOfBlocksPerNode
-   * @param blockLst
-   */
-  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
-      List<Distributable> blockLst) {
-    Iterator<Distributable> blocks = uniqueBlocks.iterator();
-    //if the node is already having the per block nodes then avoid assign the extra blocks
-    if (blockLst.size() == noOfBlocksPerNode) {
-      return;
-    }
-    while (blocks.hasNext()) {
-      Distributable block = blocks.next();
-      blockLst.add(block);
-      blocks.remove();
-      if (blockLst.size() >= noOfBlocksPerNode) {
-        break;
-      }
-    }
-  }
-
-  /**
-   * To create the final output of the Node and Data blocks
-   *
-   * @param outputMap
-   * @param blocksPerNode
-   * @param uniqueBlocks
-   * @param nodeAndBlockMapping
-   * @param activeNodes
-   */
-  private static void createOutputMap(Map<String, List<Distributable>> outputMap, int blocksPerNode,
-      Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> nodeAndBlockMapping,
-      List<String> activeNodes) {
-
-    ArrayList<NodeMultiBlockRelation> multiBlockRelations =
-        new ArrayList<>(nodeAndBlockMapping.size());
-    for (Map.Entry<String, List<Distributable>> entry : nodeAndBlockMapping.entrySet()) {
-      multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), entry.getValue()));
-    }
-    // sort nodes based on number of blocks per node, so that nodes having lesser blocks
-    // are assigned first
-    Collections.sort(multiBlockRelations);
-
-    for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
-      String nodeName = nodeMultiBlockRelation.getNode();
-      //assign the block to the node only if the node is active
-      String activeExecutor = nodeName;
-      if (null != activeNodes) {
-        activeExecutor = getActiveExecutor(activeNodes, nodeName);
-        if (null == activeExecutor) {
-          continue;
-        }
-      }
-      // this loop will be for each NODE
-      int nodeCapacity = 0;
-      // loop thru blocks of each Node
-      for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
-
-        // check if this is already assigned.
-        if (uniqueBlocks.contains(block)) {
-
-          if (null == outputMap.get(activeExecutor)) {
-            List<Distributable> list =
-                new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-            outputMap.put(activeExecutor, list);
-          }
-          // assign this block to this node if node has capacity left
-          if (nodeCapacity < blocksPerNode) {
-            List<Distributable> infos = outputMap.get(activeExecutor);
-            infos.add(block);
-            nodeCapacity++;
-            uniqueBlocks.remove(block);
-          } else {
-            // No need to continue loop as node is full
-            break;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * method validates whether the node is active or not.
-   *
-   * @param activeNode
-   * @param nodeName
-   * @return returns true if active else false.
-   */
-  private static String getActiveExecutor(List activeNode, String nodeName) {
-    boolean isActiveNode = activeNode.contains(nodeName);
-    if (isActiveNode) {
-      return nodeName;
-    }
-    //if localhost then retrieve the localhost name then do the check
-    else if (nodeName.equals("localhost")) {
-      try {
-        String hostName = InetAddress.getLocalHost().getHostName();
-        isActiveNode = activeNode.contains(hostName);
-        if (isActiveNode) {
-          return hostName;
-        }
-      } catch (UnknownHostException ue) {
-        isActiveNode = false;
-      }
-    } else {
-      try {
-        String hostAddress = InetAddress.getByName(nodeName).getHostAddress();
-        isActiveNode = activeNode.contains(hostAddress);
-        if (isActiveNode) {
-          return hostAddress;
-        }
-      } catch (UnknownHostException ue) {
-        isActiveNode = false;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Create the Node and its related blocks Mapping and put in a Map
-   *
-   * @param flattenedList
-   * @param nodeAndBlockMapping
-   */
-  private static void createNodeVsBlockMapping(List<NodeBlockRelation> flattenedList,
-      Map<String, List<Distributable>> nodeAndBlockMapping) {
-    for (NodeBlockRelation nbr : flattenedList) {
-      String node = nbr.getNode();
-      List<Distributable> list;
-
-      if (null == nodeAndBlockMapping.get(node)) {
-        list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        list.add(nbr.getBlock());
-        nodeAndBlockMapping.put(node, list);
-      } else {
-        list = nodeAndBlockMapping.get(node);
-        list.add(nbr.getBlock());
-      }
-    }
-    /*for resolving performance issue, removed values() with entrySet () iterating the values and
-    sorting it.entrySet will give the logical view for hashMap and we dont query the map twice for
-    each key whereas values () iterate twice*/
-    Iterator<Map.Entry<String, List<Distributable>>> iterator =
-        nodeAndBlockMapping.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Collections.sort(iterator.next().getValue());
-    }
-  }
-
-  /**
-   * Create the flat List i.e flattening of the Map.
-   *
-   * @param blockInfos
-   * @param flattenedList
-   * @param uniqueBlocks
-   */
-  private static void createFlattenedListFromMap(List<Distributable> blockInfos,
-      List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
-      Set<String> nodeList) {
-    for (Distributable blockInfo : blockInfos) {
-      // put the blocks in the set
-      uniqueBlocks.add(blockInfo);
-
-      try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-          nodeList.add(eachNode);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
-      }
-    }
-  }
-
-  /**
-   * This method will get the store location for the given path, segment id and partition id
-   *
-   * @param carbonStorePath
-   * @param segmentId
-   */
-  public static void checkAndCreateCarbonDataLocation(String carbonStorePath,
-      String segmentId, CarbonTable carbonTable) {
-    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
-    String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
-    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
-  }
-
-  /**
-   * return the Array of available local-dirs
-   *
-   * @param conf
-   * @return
-   */
-  public static String[] getConfiguredLocalDirs(SparkConf conf) {
-    return Utils.getConfiguredLocalDirs(conf);
-  }
-
-  /**
-   * This will update the old table status details before clean files to the latest table status.
-   * @param oldList
-   * @param newList
-   * @return
-   */
-  public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
-      LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
-
-    List<LoadMetadataDetails> newListMetadata =
-        new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
-    for (LoadMetadataDetails oldSegment : oldList) {
-      if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
-        newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
-      }
-    }
-    return newListMetadata;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
deleted file mode 100644
index 59ac2f6..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.load;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-public final class DeleteLoadFolders {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
-
-  private DeleteLoadFolders() {
-
-  }
-
-  /**
-   * returns segment path
-   *
-   * @param dbName
-   * @param tableName
-   * @param storeLocation
-   * @param partitionId
-   * @param oneLoad
-   * @return
-   */
-  private static String getSegmentPath(String dbName, String tableName, String storeLocation,
-      int partitionId, LoadMetadataDetails oneLoad) {
-    CarbonTablePath carbon = new CarbonStorePath(storeLocation).getCarbonTablePath(
-        new CarbonTableIdentifier(dbName, tableName, ""));
-    String segmentId = oneLoad.getLoadName();
-    return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
-  }
-
-  private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
-
-    boolean status = false;
-    try {
-      if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
-        CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
-        CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
-          @Override public boolean accept(CarbonFile file) {
-            return (CarbonTablePath.isCarbonDataFile(file.getName())
-                || CarbonTablePath.isCarbonIndexFile(file.getName()));
-          }
-        });
-
-        //if there are no fact and msr metadata files present then no need to keep
-        //entry in metadata.
-        if (filesToBeDeleted.length == 0) {
-          status = true;
-        } else {
-
-          for (CarbonFile eachFile : filesToBeDeleted) {
-            if (!eachFile.delete()) {
-              LOGGER.warn("Unable to delete the file as per delete command "
-                  + eachFile.getAbsolutePath());
-              status = false;
-            } else {
-              status = true;
-            }
-          }
-        }
-        // need to delete the complete folder.
-        if (status) {
-          if (!file.delete()) {
-            LOGGER.warn("Unable to delete the folder as per delete command "
-                + file.getAbsolutePath());
-            status = false;
-          }
-        }
-
-      } else {
-        status = false;
-      }
-    } catch (IOException e) {
-      LOGGER.warn("Unable to delete the file as per delete command " + path);
-    }
-
-    return status;
-
-  }
-
-  private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
-      boolean isForceDelete) {
-    if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneLoad.getLoadStatus())
-        || CarbonCommonConstants.COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus()))
-        && oneLoad.getVisibility().equalsIgnoreCase("true")) {
-      if (isForceDelete) {
-        return true;
-      }
-      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-
-      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
-    }
-
-    return false;
-  }
-
-  public static boolean deleteLoadFoldersFromFileSystem(String dbName, String tableName,
-      String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
-
-    boolean isDeleted = false;
-
-    if (details != null && details.length != 0) {
-      for (LoadMetadataDetails oneLoad : details) {
-        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
-          String path = getSegmentPath(dbName, tableName, storeLocation, 0, oneLoad);
-          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
-          if (deletionStatus) {
-            isDeleted = true;
-            oneLoad.setVisibility("false");
-            LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
-          }
-        }
-      }
-    }
-
-    return isDeleted;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java
deleted file mode 100644
index 0345fcd..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.load;
-
-/**
- * This Enum is used to determine the Reasons of Failure.
- */
-public enum FailureCauses {
-  NONE,
-  BAD_RECORDS,
-  EXECUTOR_FAILURE,
-  STATUS_FILE_UPDATION_FAILURE,
-  MULTIPLE_INPUT_ROWS_MATCHING
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
deleted file mode 100644
index 112fa63..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.partition.api;
-
-import java.util.List;
-
-public interface DataPartitioner {
-
-  /**
-   * All the partitions built by the Partitioner
-   */
-  List<Partition> getAllPartitions();
-
-  /**
-   * Identifies the partitions applicable for the given filter (API used for For query)
-   */
-  List<Partition> getPartitions();
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
deleted file mode 100644
index fd93ce3..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.partition.api;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface Partition extends Serializable {
-  /**
-   * unique identification for the partition in the cluster.
-   */
-  String getUniqueID();
-
-  /**
-   * result
-   *
-   * @return
-   */
-  List<String> getFilesPath();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
deleted file mode 100644
index fa7c4d5..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-/**
- * A sample load balancer to distribute the partitions to the available nodes in a round robin mode.
- */
-public class DefaultLoadBalancer {
-  private Map<String, List<Partition>> nodeToPartitonMap =
-      new HashMap<String, List<Partition>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  private Map<Partition, String> partitonToNodeMap =
-      new HashMap<Partition, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  public DefaultLoadBalancer(List<String> nodes, List<Partition> partitions) {
-    //Per form a round robin allocation
-    int nodeCount = nodes.size();
-
-    int partitioner = 0;
-    for (Partition partition : partitions) {
-      int nodeindex = partitioner % nodeCount;
-      String node = nodes.get(nodeindex);
-
-      List<Partition> oldList = nodeToPartitonMap.get(node);
-      if (oldList == null) {
-        oldList = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-        nodeToPartitonMap.put(node, oldList);
-      }
-      oldList.add(partition);
-
-      partitonToNodeMap.put(partition, node);
-
-      partitioner++;
-    }
-  }
-
-  public String getNodeForPartitions(Partition partition) {
-    return partitonToNodeMap.get(partition);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
deleted file mode 100644
index c386da1..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.spark.partition.api.Partition;
-
-public class PartitionMultiFileImpl implements Partition {
-  private static final long serialVersionUID = -4363447826181193976L;
-  private String uniqueID;
-  private List<String> folderPath;
-
-  public PartitionMultiFileImpl(String uniqueID, List<String> folderPath) {
-    this.uniqueID = uniqueID;
-    this.folderPath = folderPath;
-  }
-
-  @Override public String getUniqueID() {
-    // TODO Auto-generated method stub
-    return uniqueID;
-  }
-
-  @Override public List<String> getFilesPath() {
-    // TODO Auto-generated method stub
-    return folderPath;
-  }
-
-}


Mime
View raw message