carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [18/22] incubator-carbondata git commit: commenting the delete test case having subquery. supporting spark version 2.0. correcting rebase error. rebased with latest code.
Date Fri, 06 Jan 2017 13:57:18 GMT
commenting the delete test case having subquery.
supporting spark version 2.0.
correcting rebase error.
rebased with latest code.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8dda2a8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8dda2a8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8dda2a8d

Branch: refs/heads/master
Commit: 8dda2a8d20bc4e505c597011dad2d70a51ce3377
Parents: 7aa6800
Author: ravikiran <ravikiran.sn042@gmail.com>
Authored: Tue Jan 3 22:27:17 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jan 6 19:16:29 2017 +0530

----------------------------------------------------------------------
 .../carbon/datastore/SegmentTaskIndexStore.java |  89 ++-
 .../core/carbon/path/CarbonTablePath.java       |  36 +-
 .../core/constants/CarbonCommonConstants.java   |   6 +
 .../core/updatestatus/SegmentStatusManager.java |   9 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   5 +-
 .../impl/DictionaryBasedResultCollector.java    |   2 +-
 .../carbondata/scan/model/QueryModel.java       |   2 +-
 .../carbondata/hadoop/CarbonInputFormat.java    | 320 +++-----
 .../carbondata/spark/load/CarbonLoaderUtil.java |  26 +-
 .../spark/load/DeleteLoadFolders.java           |   4 -
 .../carbondata/spark/util/LoadMetadataUtil.java |   1 -
 .../org/apache/carbondata/api/CarbonStore.scala |  12 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           |   9 +-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   2 +-
 .../spark/rdd/DataManagementFunc.scala          |   8 +-
 .../spark/rdd/UpdateCoalescedRDD.scala          |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  16 +-
 .../spark/sql/CarbonDatasourceRelation.scala    |   4 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 800 +------------------
 .../sql/execution/command/IUDCommands.scala     |   2 -
 .../execution/command/carbonTableSchema.scala   |  95 ++-
 .../iud/DeleteCarbonTableTestCase.scala         |  59 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 215 ++++-
 .../execution/command/carbonTableSchema.scala   |  57 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   4 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   3 +-
 .../org/apache/spark/util/Compaction.scala      |   7 +-
 28 files changed, 559 insertions(+), 1238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index dd78f5e..a0a367a 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -41,10 +41,9 @@ import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderExcepti
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil;
+import org.apache.carbondata.core.update.UpdateVO;
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.scan.model.QueryModel;
-
-
 
 /**
  * Class to handle loading, unloading,clearing,storing of the table
@@ -97,7 +96,7 @@ public class SegmentTaskIndexStore
     } catch (Throwable e) {
       throw new CarbonUtilException("Problem in loading segment block.", e);
     }
-	
+
     if (null != segmentTaskIndexWrapper) {
       segmentTaskIndexWrapper.incrementAccessCount();
     }
@@ -122,30 +121,6 @@ public class SegmentTaskIndexStore
   }
 
   /**
-   * returns all the segments taskid_to_Blcoks map wrapper.
-   *
-   * @param tableSegmentUniqueIdentifiers
-   * @return
-   * @throws IOException
-   */
-  @Override public List<SegmentTaskIndexWrapper> getAll(
-      List<TableSegmentUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
-    List<SegmentTaskIndexWrapper> segmentTaskIndexWrappers =
-        new ArrayList<>(tableSegmentUniqueIdentifiers.size());
-    try {
-      for (TableSegmentUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) {
-        segmentTaskIndexWrappers.add(get(segmentUniqueIdentifier));
-      }
-    } catch (IOException e) {
-      for (SegmentTaskIndexWrapper segmentTaskIndexWrapper : segmentTaskIndexWrappers) {
-        segmentTaskIndexWrapper.clear();
-      }
-      throw e;
-    }
-    return segmentTaskIndexWrappers;
-  }
-
-  /**
    * returns the SegmentTaskIndexWrapper
    *
    * @param tableSegmentUniqueIdentifier
@@ -171,6 +146,22 @@ public class SegmentTaskIndexStore
   }
 
   /**
+   *
+   * @param taskKey
+   * @param listOfUpdatedFactFiles
+   * @return
+   */
+  private String getTimeStampValueFromBlock(String taskKey, List<String> listOfUpdatedFactFiles) {
+    for (String blockName : listOfUpdatedFactFiles) {
+      if (taskKey.equals(CarbonTablePath.DataFileUtil.getTaskNo(blockName))) {
+        blockName = blockName.substring(blockName.lastIndexOf('-') + 1, blockName.lastIndexOf('.'));
+        return blockName;
+      }
+    }
+    return null;
+  }
+
+  /**
    * Below method will be used to load the segment of segments
    * One segment may have multiple task , so  table segment will be loaded
    * based on task id and will return the map of taksId to table segment
@@ -190,21 +181,25 @@ public class SegmentTaskIndexStore
         segmentToTableBlocksInfos.entrySet().iterator();
     Map<TaskBucketHolder, AbstractIndex> taskIdToSegmentIndexMap = null;
     SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+    String segmentId = null;
     TaskBucketHolder taskBucketHolder = null;
     try {
       while (iteratorOverSegmentBlocksInfos.hasNext()) {
         // segment id to table block mapping
-        iteratorOverSegmentBlocksInfos.next();
+        Map.Entry<String, List<TableBlockInfo>> next = iteratorOverSegmentBlocksInfos.next();
         // group task id to table block info mapping for the segment
         Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
             mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos);
+        segmentId = next.getKey();
         // get the existing map of task id to table segment map
         UpdateVO updateVO = updateStatusManager.getInvalidTimestampRange(segmentId);
         // check if segment is already loaded, if segment is already loaded
         //no need to load the segment block
         String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
         segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache.get(lruCacheKey);
-        if (segmentTaskIndexWrapper == null) {
+        if (segmentTaskIndexWrapper == null || tableSegmentUniqueIdentifier.isSegmentUpdated()) {
           // get the segment loader lock object this is to avoid
           // same segment is getting loaded multiple times
           // in case of concurrent query
@@ -215,15 +210,25 @@ public class SegmentTaskIndexStore
           // acquire lock to lod the segment
           synchronized (segmentLoderLockObject) {
             segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache.get(lruCacheKey);
-            if (null == segmentTaskIndexWrapper) {
-              // creating a map of take if to table segment
-              taskIdToSegmentIndexMap = new HashMap<TaskBucketHolder, AbstractIndex>();
-              segmentTaskIndexWrapper = new SegmentTaskIndexWrapper(taskIdToSegmentIndexMap);
+            if (null == segmentTaskIndexWrapper || tableSegmentUniqueIdentifier
+                .isSegmentUpdated()) {
+              // if the segment is updated then get the existing block task id map details
+              // so that the same can be updated after loading the btree.
+              if (tableSegmentUniqueIdentifier.isSegmentUpdated()
+                  && null != segmentTaskIndexWrapper) {
+                taskIdToSegmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+              } else {
+                // creating a map of take if to table segment
+                taskIdToSegmentIndexMap = new HashMap<TaskBucketHolder, AbstractIndex>();
+                segmentTaskIndexWrapper = new SegmentTaskIndexWrapper(taskIdToSegmentIndexMap);
+                segmentTaskIndexWrapper.incrementAccessCount();
+              }
               Iterator<Map.Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
                   taskIdToTableBlockInfoMap.entrySet().iterator();
               long requiredSize =
                   calculateRequiredSize(taskIdToTableBlockInfoMap, absoluteTableIdentifier);
-              segmentTaskIndexWrapper.setMemorySize(requiredSize);
+              segmentTaskIndexWrapper
+                  .setMemorySize(requiredSize + segmentTaskIndexWrapper.getMemorySize());
               boolean isAddedToLruCache =
                   lruCache.put(lruCacheKey, segmentTaskIndexWrapper, requiredSize);
               if (isAddedToLruCache) {
@@ -239,15 +244,23 @@ public class SegmentTaskIndexStore
                 throw new IndexBuilderException(
                     "Can not load the segment. No Enough space available.");
               }
-              //tableSegmentMapTemp.put(next.getKey(), taskIdToSegmentIndexMap);
+
+              // set the latest timestamp.
+              segmentTaskIndexWrapper
+                  .setRefreshedTimeStamp(updateVO.getCreatedOrUpdatedTimeStamp());
+              // tableSegmentMapTemp.put(next.getKey(), taskIdToSegmentIndexMap);
               // removing from segment lock map as once segment is loaded
-              //if concurrent query is coming for same segment
+              // if concurrent query is coming for same segment
               // it will wait on the lock so after this segment will be already
-              //loaded so lock is not required, that is why removing the
+              // loaded so lock is not required, that is why removing the
               // the lock object as it wont be useful
               segmentLockMap.remove(lruCacheKey);
+            } else {
+              segmentTaskIndexWrapper.incrementAccessCount();
             }
           }
+        } else {
+          segmentTaskIndexWrapper.incrementAccessCount();
         }
       }
     } catch (IndexBuilderException e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
index 789cd71..bbece41 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
@@ -358,22 +358,6 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * Gets absolute path of data file of given aggregate table
-   *
-   * @param aggTableID          unique aggregate table identifier
-   * @param partitionId         unique partition identifier
-   * @param segmentId           unique partition identifier
-   * @param filePartNo          data file part number
-   * @param factUpdateTimeStamp unique identifier to identify an update
-   * @return absolute path of data file stored in carbon data format
-   */
-  public String getCarbonAggDataFilePath(String aggTableID, String partitionId, String segmentId,
-      Integer filePartNo, Integer taskNo, String factUpdateTimeStamp) {
-    return getAggSegmentDir(aggTableID, partitionId, segmentId) + File.separator
-        + getCarbonDataFileName(filePartNo, taskNo, factUpdateTimeStamp);
-  }
-
-  /**
    * Gets data file name only with out path
    *
    * @param filePartNo          data file part number
@@ -383,7 +367,7 @@ public class CarbonTablePath extends Path {
    */
   public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber,
       String factUpdateTimeStamp) {
-    return DATA_PART_PREFIX + "-" + filePartNo + "-" + taskNo + "-" + bucketNumber + "-"
+    return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "-" + bucketNumber + "-"
         + factUpdateTimeStamp + CARBON_DATA_EXT;
   }
 
@@ -502,6 +486,24 @@ public class CarbonTablePath extends Path {
     }
 
     /**
+     * gets updated timestamp information from given carbon data file name
+     */
+    public static String getBucketNo(String carbonFilePath) {
+      // Get the file name from path
+      String fileName = getFileName(carbonFilePath);
+      // + 1 for size of "-"
+      int firstDashPos = fileName.indexOf("-");
+      int secondDash = fileName.indexOf("-", firstDashPos + 1);
+      int startIndex = fileName.indexOf("-", secondDash + 1) + 1;
+      int endIndex = fileName.indexOf("-", startIndex);
+      // to support backward compatibility
+      if (startIndex == -1 || endIndex == -1) {
+        return "0";
+      }
+      return fileName.substring(startIndex, endIndex);
+    }
+
+    /**
      * gets file part number information from given carbon data file name
      */
     public static String getPartNo(String carbonDataFileName) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8c01ba5..f91a75c 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1088,6 +1088,12 @@ public final class CarbonCommonConstants {
    * property to set is IS_DRIVER_INSTANCE
    */
   public static final String IS_DRIVER_INSTANCE = "is.driver.instance";
+
+  /**
+   * maximum length of column
+   */
+  public static final int DEFAULT_COLUMN_LENGTH = 100000;
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
index 5c2fef5..1622f3c 100644
--- a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
@@ -72,15 +72,16 @@ public class SegmentStatusManager {
   /**
    * This method will return last modified time of tablestatus file
    */
-  public long getTableStatusLastModifiedTime() throws IOException {
+  public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identifier)
+      throws IOException {
     String tableStatusPath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier()).getTableStatusFilePath();
+        .getCarbonTablePath(identifier.getStorePath(), identifier.getCarbonTableIdentifier())
+        .getTableStatusFilePath();
     if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) {
       return 0L;
     } else {
       return FileFactory.getCarbonFile(tableStatusPath, FileFactory.getFileType(tableStatusPath))
-              .getLastModifiedTime();
+          .getLastModifiedTime();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 9ff9936..5196631 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1389,9 +1389,12 @@ public final class CarbonUtil {
     return isFileExists(dictionaryFilePath) && isFileExists(dictionaryMetadataFilePath);
   }
 
-   /**
+  /**
+   *
    * @param tableInfo
    * @param invalidBlockVOForSegmentId
+   * @param updateStatusMngr
+   * @return
    */
   public static boolean isInvalidTableBlock(TableBlockInfo tableInfo,
       UpdateVO invalidBlockVOForSegmentId,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
index 3ec968c..81764cb 100644
--- a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -27,8 +27,8 @@ import java.util.Map;
 import org.apache.carbondata.common.iudprocessor.cache.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
index 55e1caa..2a864a4 100644
--- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
@@ -353,7 +353,7 @@ public class QueryModel implements Serializable {
   public void setVectorReader(boolean vectorReader) {
     this.vectorReader = vectorReader;
   }
-public void setInvalidBlockForSegmentId(List<UpdateVO> invalidSegmentTimestampList) {
+  public void setInvalidBlockForSegmentId(List<UpdateVO> invalidSegmentTimestampList) {
     for (UpdateVO anUpdateVO : invalidSegmentTimestampList) {
       this.invalidSegmentBlockIdMap.put(anUpdateVO.getSegmentId(), anUpdateVO);
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index d86dfe0..108aafa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -23,16 +23,12 @@ import java.lang.reflect.Constructor;
 import java.util.*;
 
 import org.apache.carbondata.common.iudprocessor.iuddata.BlockMappingVO;
-import org.apache.carbondata.common.iudprocessor.iuddata.DeleteDeltaDataUtil;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.DataRefNode;
 import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
 import org.apache.carbondata.core.carbon.datastore.IndexKey;
-import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
 import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.*;
 import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder;
@@ -43,7 +39,6 @@ import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.carbon.querystatistics.*;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.update.SegmentUpdateDetails;
 import org.apache.carbondata.core.update.UpdateVO;
 import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
 import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
@@ -65,7 +60,15 @@ import org.apache.carbondata.scan.model.QueryModel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -76,10 +79,6 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.util.StringUtils;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.*;
-
 
 /**
  * Carbon Input format class representing one carbon table
@@ -97,11 +96,6 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
 
   /**
-   * Filter resolver reference to hold filter resolver object for a query
-   */
-  private FilterResolverIntf filterResolver;
-
-  /**
    * It is optional, if user does not set then it reads from store
    *
    * @param configuration
@@ -150,24 +144,6 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     }
   }
 
-  /**
-   * It sets the resolved filter expression
-   *
-   * @param configuration
-   * @param filterResolver
-   */
-  public void setFilterPredicates(Configuration configuration,
-                                  FilterResolverIntf filterResolver) {
-    try {
-      if (filterResolver == null) {
-        return;
-      }
-      this.filterResolver = filterResolver;
-    } catch (Exception e) {
-      throw new RuntimeException("Error while setting filter expression to Job", e);
-    }
-  }
-
   public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
     if (projection == null || projection.isEmpty()) {
       return;
@@ -216,47 +192,6 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
   /**
-   * @return updateExtension
-   */
-  private String[] getSegmentsFromConfiguration(JobContext job) throws IOException {
-    String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
-    // if no segments
-    if (segmentString.trim().isEmpty()) {
-      return new String[0];
-    }
-
-    String[] segments = segmentString.split(",");
-    String[] segmentIds = new String[segments.length];
-    int i = 0;
-    try {
-      for (; i < segments.length; i++) {
-        segmentIds[i] = segments[i];
-      }
-    } catch (NumberFormatException e) {
-      throw new IOException("segment no:" + segments[i] + " should be integer");
-    }
-    return segmentIds;
-  }
-
-  /**
-   * Below method will be used to set the segments details if
-   * segments are not added in the configuration
-   *
-   * @param job
-   * @param absoluteTableIdentifier
-   * @throws IOException
-   */
-  private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier)
-          throws IOException {
-    if (getSegmentsFromConfiguration(job).length == 0) {
-      // Get the valid segments from the carbon store.
-      SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
-              new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
-      setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments());
-    }
-  }
-
-  /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR
    * are used to get table path to read.
@@ -295,14 +230,12 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         }
         cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
       }
-    } catch (Exception ex) {
-      throw new IOException(ex);
     }
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
     CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
-	// this will be null in case of corrupt schema file.
+    // this will be null in case of corrupt schema file.
     if(null == carbonTable){
       throw new IOException("Missing/Corrupt schema file for table.");
     }
@@ -331,10 +264,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       if (segmentId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
         continue;
       }
-      // Huawei IUD confirm from vishal
       carbonSplits.add(CarbonInputSplit.from(segmentId, fileSplit,
-              ColumnarFormatVersion.valueOf(
-                  CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION)));
+          ColumnarFormatVersion.valueOf(
+              CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION)));
     }
     return carbonSplits;
   }
@@ -366,6 +298,11 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       for (DataRefNode dataRefNode : dataRefNodes) {
         BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
         TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
+        if (CarbonUtil.isInvalidTableBlock(tableBlockInfo,
+            updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()),
+            updateStatusManager)) {
+          continue;
+        }
         result.add(new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()),
             tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
             tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
@@ -398,8 +335,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       throws IndexBuilderException, IOException, CarbonUtilException {
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
-    Map<String, AbstractIndex> segmentIndexMap =
-        getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, updateStatusManager);
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+        getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, cacheClient,
+            updateStatusManager);
 
     List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
 
@@ -435,20 +373,20 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * @throws IOException
    */
   private List<TableBlockInfo> getTableBlockInfo(JobContext job,
-                                                 TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier,
-                                                 Set<String> taskKeys,
-                                                 List<String> updatedTaskList,
-                                                 UpdateVO updateDetails,
-                                                 SegmentUpdateStatusManager updateStatusManager,
-                                                 String segmentId)
-      throws IOException {
+      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier,
+      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys,
+      List<String> updatedTaskList,
+      UpdateVO updateDetails,
+      SegmentUpdateStatusManager updateStatusManager,
+      String segmentId)
+    throws IOException {
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
 
     // get file location of all files of given segment
     JobContext newJob =
         new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID());
     newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS,
-            tableSegmentUniqueIdentifier.getSegmentId() + "");
+        tableSegmentUniqueIdentifier.getSegmentId() + "");
 
     // identify table blocks
     for (InputSplit inputSplit : getSplitsInternal(newJob)) {
@@ -457,27 +395,25 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       // tableSegmentUniqueIdentifiertimestamp if time stamp is greater
       // then add as TableInfo object.
       if (isValidBlockBasedOnUpdateDetails(taskKeys, carbonInputSplit, updateDetails,
-              updateStatusManager, segmentId)) {
+          updateStatusManager, segmentId)) {
         BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
-                carbonInputSplit.getNumberOfBlocklets());
+            carbonInputSplit.getNumberOfBlocklets());
         tableBlockInfoList.add(
-                new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
-                        tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(),
-                        carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(),
-                        carbonInputSplit.getBlockStorageIdMap()));
+            new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+                tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(),
+                carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(),
+                carbonInputSplit.getBlockStorageIdMap()));
       }
     }
     return tableBlockInfoList;
   }
 
-  private boolean isValidBlockBasedOnUpdateDetails(Set<String> taskKeys,
-                                                  CarbonInputSplit carbonInputSplit, UpdateVO updateDetails,
-                                                  SegmentUpdateStatusManager updateStatusManager, String segmentId) {
+  private boolean isValidBlockBasedOnUpdateDetails(
+      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, CarbonInputSplit carbonInputSplit,
+      UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
     String taskID = null;
     if (null != carbonInputSplit) {
-
-      if(!updateStatusManager.isBlockValid(segmentId,carbonInputSplit.getPath().getName()))
-      {
+      if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
         return false;
       }
 
@@ -486,12 +422,18 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       }
 
       taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+      String bucketNo =
+          CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
+
+      SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
+          new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
+
       String blockTimestamp = carbonInputSplit.getPath().getName()
-              .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
-                      carbonInputSplit.getPath().getName().lastIndexOf('.'));
+          .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
+              carbonInputSplit.getPath().getName().lastIndexOf('.'));
       if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
-              && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
-        if (!taskKeys.contains(taskID)) {
+          && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
+        if (!taskKeys.contains(taskBucketHolder)) {
           return true;
         }
       }
@@ -499,23 +441,29 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return false;
   }
 
-  private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job,
-                                                              AbsoluteTableIdentifier absoluteTableIdentifier,
-                                                              String segmentId,
-                                                              SegmentUpdateStatusManager updateStatusManager)
-          throws IOException, IndexBuilderException, CarbonUtilException {
+  /**
+   * It returns index for each task file.
+   * @param job
+   * @param absoluteTableIdentifier
+   * @param segmentId
+   * @return
+   * @throws IOException
+   * @throws IndexBuilderException
+   */
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
+      JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId,
+      CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager)
+      throws IOException, IndexBuilderException, CarbonUtilException {
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
     SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
-    Map<String, AbstractIndex> segmentIndexMap = null;
-    Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> cache = CacheProvider.getInstance()
-            .createCache(CacheType.DRIVER_BTREE, absoluteTableIdentifier.getStorePath());
     List<String> updatedTaskList = null;
     boolean isSegmentUpdated = false;
-    Set<String> taskKeys = null;
-
+    Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
     TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
-            new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+        new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
     SegmentStatusManager statusManager = new SegmentStatusManager(absoluteTableIdentifier);
-    segmentTaskIndexWrapper = cache.getIfPresent(tableSegmentUniqueIdentifier);
+    segmentTaskIndexWrapper =
+        cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
     UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
     if (null != segmentTaskIndexWrapper) {
       segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
@@ -523,7 +471,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         taskKeys = segmentIndexMap.keySet();
         isSegmentUpdated = true;
         updatedTaskList =
-                statusManager.getUpdatedTasksDetailsForSegment(segmentId, updateStatusManager);
+            statusManager.getUpdatedTasksDetailsForSegment(segmentId, updateStatusManager);
       }
     }
     // if segment tree is not loaded, load the segment tree
@@ -532,9 +480,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       // retrieved. the same will be filtered based on taskKeys , if the task is same
       // for the block then dont add it since already its btree is loaded.
       List<TableBlockInfo> tableBlockInfoList =
-              getTableBlockInfo(job, tableSegmentUniqueIdentifier, taskKeys, updatedTaskList,
-                      updateStatusManager.getInvalidTimestampRange(segmentId), updateStatusManager,
-                      segmentId);
+          getTableBlockInfo(job, tableSegmentUniqueIdentifier, taskKeys, updatedTaskList,
+              updateStatusManager.getInvalidTimestampRange(segmentId), updateStatusManager,
+              segmentId);
       if (!tableBlockInfoList.isEmpty()) {
         // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
         Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
@@ -542,125 +490,45 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         // get Btree blocks for given segment
         tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
         tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
-        segmentTaskIndexWrapper = cache.get(tableSegmentUniqueIdentifier);
+        segmentTaskIndexWrapper =
+            cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
         segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
       }
     }
     return segmentIndexMap;
   }
 
-  /**
-   *
-   * @param job
-   * @param absoluteTableIdentifier
-   * @return
-   * @throws IOException
-   * @throws CarbonUtilException
-   * @throws IndexBuilderException
-   * @throws KeyGenException
-   */
-private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
-      JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId,
-      CacheClient cacheClient) throws IOException {
-    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
-    TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
-        new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
-    SegmentTaskIndexWrapper segmentTaskIndexWrapper = (SegmentTaskIndexWrapper)
-        cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
-    if (null != segmentTaskIndexWrapper) {
-      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
-    }
-    // if segment tree is not loaded, load the segment tree
-    if (segmentIndexMap == null) {
-      // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
-      List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job, segmentId);
-      // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
-
-    Map<String, Long> blockRowCountMapping =
-            new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-      // get Btree blocks for given segment
-      tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
-      segmentTaskIndexWrapper =
-          cacheClient.getSegmentTaskIndexWrapper(tableSegmentUniqueIdentifier);
-          cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
-      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
-    for (String eachValidSeg : validAndInvalidSegments.getValidSegments()) {
-
-      long countOfBlocksInSeg = 0;
-
-      Map<String, AbstractIndex> taskAbstractIndexMap =
-              getSegmentAbstractIndexs(job, absoluteTableIdentifier, eachValidSeg, updateStatusManager);
-
-      for (Map.Entry<String, AbstractIndex> taskMap : taskAbstractIndexMap.entrySet()) {
-
-        AbstractIndex taskAbstractIndex = taskMap.getValue();
-
-        countOfBlocksInSeg += new BlockLevelTraverser()
-                .getBlockRowMapping(taskAbstractIndex, blockRowCountMapping, eachValidSeg,
-                        updateStatusManager);
-      }
-
-      segmentAndBlockCountMapping.put(eachValidSeg, countOfBlocksInSeg);
-
-    }
-
-    return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
-
-  }
-
   public BlockMappingVO getBlockRowCount(JobContext job,
                                          AbsoluteTableIdentifier absoluteTableIdentifier)
           throws IOException, CarbonUtilException, IndexBuilderException, KeyGenException {
     CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
     try {
       SegmentUpdateStatusManager updateStatusManager =
-              new SegmentUpdateStatusManager(absoluteTableIdentifier);
+          new SegmentUpdateStatusManager(absoluteTableIdentifier);
       SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
-              new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
+          new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
       Map<String, Long> blockRowCountMapping =
-              new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+          new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
       Map<String, Long> segmentAndBlockCountMapping =
-              new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // if segment tree is not loaded, load the segment tree
-    if (segmentIndexMap == null) {
-      // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
-      List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job, segmentId);
-      // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
-
-    Map<String, Long> blockRowCountMapping =
-            new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-      // get Btree blocks for given segment
-      tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
-      segmentTaskIndexWrapper =
-          cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
-      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
-    for (String eachValidSeg : validAndInvalidSegments.getValidSegments()) {
-
-      long countOfBlocksInSeg = 0;
-
-      Map<String, AbstractIndex> taskAbstractIndexMap =
-              getSegmentAbstractIndexs(job, absoluteTableIdentifier, eachValidSeg, updateStatusManager);
-
-      for (Map.Entry<String, AbstractIndex> taskMap : taskAbstractIndexMap.entrySet()) {
-
-        AbstractIndex taskAbstractIndex = taskMap.getValue();
-
-        countOfBlocksInSeg += new BlockLevelTraverser()
-                .getBlockRowMapping(taskAbstractIndex, blockRowCountMapping, eachValidSeg,
-                        updateStatusManager);
+          new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+      for (String eachValidSeg : validAndInvalidSegments.getValidSegments()) {
+        long countOfBlocksInSeg = 0;
+        Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskAbstractIndexMap =
+            getSegmentAbstractIndexs(job, absoluteTableIdentifier, eachValidSeg, cacheClient,
+                updateStatusManager);
+        for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskMap :
+            taskAbstractIndexMap
+            .entrySet()) {
+          AbstractIndex taskAbstractIndex = taskMap.getValue();
+          countOfBlocksInSeg += new BlockLevelTraverser()
+              .getBlockRowMapping(taskAbstractIndex, blockRowCountMapping, eachValidSeg,
+                  updateStatusManager);
+        }
+        segmentAndBlockCountMapping.put(eachValidSeg, countOfBlocksInSeg);
       }
-
-      segmentAndBlockCountMapping.put(eachValidSeg, countOfBlocksInSeg);
-
       return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
-    }
-
-    return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
-
-    finally {
+    } finally {
       cacheClient.close();
     }
   }
@@ -740,7 +608,7 @@ private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbs
       }
       List<UpdateVO> invalidTimestampRangeList =
           split.getAllSplits().get(0).getInvalidTimestampRange();
-      if (invalidTimestampRangeList.size() > 0) {
+      if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
         queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 8a9d396..6847537 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -18,7 +18,13 @@
  */
 package org.apache.carbondata.spark.load;
 
-import com.google.gson.Gson;
+import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
@@ -27,7 +33,6 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.ColumnIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
@@ -58,19 +63,13 @@ import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
 import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.spark.merger.NodeBlockRelation;
 import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;
+
+import com.google.gson.Gson;
 import org.apache.spark.SparkConf;
 import org.apache.spark.util.Utils;
 
-import java.io.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
 
 public final class CarbonLoaderUtil {
 
@@ -106,12 +105,6 @@ public final class CarbonLoaderUtil {
     model.setSchemaInfo(info);
     model.setTableName(dataProcessTaskStatus.getTableName());
     List<LoadMetadataDetails> loadMetadataDetails = loadModel.getLoadMetadataDetails();
-    if (null != loadMetadataDetails && !loadMetadataDetails.isEmpty()) {
-      model.setLoadNames(
-          CarbonDataProcessorUtil.getLoadNameFromLoadMetaDataDetails(loadMetadataDetails));
-      model.setModificationOrDeletionTime(CarbonDataProcessorUtil
-          .getModificationOrDeletionTimesFromLoadMetadataDetails(loadMetadataDetails));
-    }
     model.setBlocksID(dataProcessTaskStatus.getBlocksID());
     model.setEscapeCharacter(dataProcessTaskStatus.getEscapeCharacter());
     model.setQuoteCharacter(dataProcessTaskStatus.getQuoteCharacter());
@@ -160,7 +153,6 @@ public final class CarbonLoaderUtil {
     DataProcessTaskStatus dataProcessTaskStatus
             = new DataProcessTaskStatus(databaseName, tableName);
     dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath());
-    dataProcessTaskStatus.setDimCSVDirLoc(loadModel.getDimFolderPath());
     if (loadModel.isDirectLoad()) {
       dataProcessTaskStatus.setFilesToProcess(loadModel.getFactFilesToProcess());
       dataProcessTaskStatus.setDirectLoad(true);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
index ce19080..5246c61 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -30,10 +30,7 @@
 package org.apache.carbondata.spark.load;
 
 import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -47,7 +44,6 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
 import org.apache.carbondata.core.update.CarbonUpdateUtil;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
 
 public final class DeleteLoadFolders {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
index 5c0625a..08d0c69 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
 import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
 
 public final class LoadMetadataUtil {
   private LoadMetadataUtil() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 6fb2df1..ee2c51b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.DataManagementFunc
 
@@ -68,8 +68,8 @@ object CarbonStore {
             Row(
               load.getLoadName,
               load.getLoadStatus,
-              new java.sql.Timestamp(parser.parse(load.getLoadStartTime).getTime),
-              new java.sql.Timestamp(parser.parse(load.getTimestamp).getTime)
+              new java.sql.Timestamp(load.getLoadStartTime),
+              new java.sql.Timestamp(load.getLoadEndTime)
             )
           }.toSeq
     } else {
@@ -160,8 +160,10 @@ object CarbonStore {
       tableName: String,
       segmentId: String): Boolean = {
     val identifier = AbsoluteTableIdentifier.from(dbName, tableName)
-    val status = SegmentStatusManager.getSegmentStatus(identifier)
-    status.isValid(segmentId)
+    val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
+        SegmentStatusManager(
+          identifier).getValidAndInvalidSegments
+    return validAndInvalidSegments.getValidSegments.contains(segmentId)
   }
 
   private def validateTimeFormat(timestamp: String): Long = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 86433a3..4392775 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -26,7 +26,7 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 import scala.util.Random
 
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
 import org.apache.spark.sql.Row
@@ -644,7 +644,7 @@ class RddIteratorForUpdate(rddIter: Iterator[Row],
   }
 }
 
-object CarbonDataLoadForUpdate extends Logging{
+object CarbonDataLoadForUpdate {
   def initialize(model: CarbonLoadModel,
       splitIndex: Int): String = {
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
@@ -690,6 +690,7 @@ object CarbonDataLoadForUpdate extends Logging{
       loadCount: String,
       loadMetadataDetails: LoadMetadataDetails,
       executorErrors: ExecutionErrors): Unit = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     try {
       var storeLocation = ""
       val carbonUseLocalDir = CarbonProperties.getInstance()
@@ -715,7 +716,7 @@ object CarbonDataLoadForUpdate extends Logging{
       case e: DataLoadingException => if (e.getErrorCode ==
                                           DataProcessorConstants.BAD_REC_FOUND) {
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-        logInfo("Bad Record Found")
+        LOGGER.info("Bad Record Found")
       } else if (e.getErrorCode == DataProcessorConstants.BAD_REC_FAILURE_ERROR_CODE) {
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
         executorErrors.failureCauses = FailureCauses.BAD_RECORDS
@@ -734,7 +735,7 @@ object CarbonDataLoadForUpdate extends Logging{
         CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, isCompaction)
       } catch {
         case e: Exception =>
-          logError("Failed to delete local data", e)
+          LOGGER.error("Failed to delete local data" + e)
       }
       if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
         loadMetadataDetails.getLoadStatus)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index e8310f5..3157787 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -101,7 +101,7 @@ class CarbonIUDMergerRDD[K, V](
       if (!validSplits.isEmpty) {
         val locations = validSplits(0).getLocations
         new CarbonSparkPartition(id, i,
-          new CarbonMultiBlockSplit(absoluteTableIdentifier, validSplits.asJava, locations(0)))
+          new CarbonMultiBlockSplit(absoluteTableIdentifier, validSplits.asJava, locations))
       }
       else {
         null

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index bfd0e2d..4c76a25 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -20,18 +20,14 @@ package org.apache.carbondata.spark.rdd
 import java.util
 import java.util.concurrent._
 
-import org.apache.carbondata.core.update.CarbonUpdateUtil
-import org.apache.carbondata.core.updatestatus.SegmentStatusManager
-import org.apache.carbondata.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -130,7 +126,7 @@ object DataManagementFunc {
 
           // write
           CarbonLoaderUtil.writeLoadMetadata(
-            schema,
+            storePath,
             databaseName,
             table.getDatabaseName,
             updatedloadMetadataDetails.asJava

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
index c9f9667..67e094a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rdd.{CoalescedRDDPartition, DataLoadPartitionCoalescer,
 class UpdateCoalescedRDD[T: ClassTag](
     @transient var prev: RDD[T],
     nodeList: Array[String])
-  extends RDD[T](prev.context, Nil) with Logging {
+  extends RDD[T](prev.context, Nil) {
 
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index dfb2925..7336440 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -117,7 +117,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val TYPE = carbonKeyWord("TYPE")
   protected val UPDATE = carbonKeyWord("UPDATE")
   protected val USE = carbonKeyWord("USE")
-  protected val WHERE = carbonKeyWord("WHERE")
+  protected val WHERE = Keyword("WHERE")
   protected val WITH = carbonKeyWord("WITH")
   protected val AGGREGATETABLE = carbonKeyWord("AGGREGATETABLE")
   protected val ABS = carbonKeyWord("abs")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 9f7401d..cede7b1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -640,16 +640,13 @@ object CarbonDataRDDFactory {
       def loadDataFrame(): Unit = {
         try {
           val rdd = dataFrame.get.rdd
-
+          val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
+            DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+          }.distinct.size
+          val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
+            sqlContext.sparkContext)
+          val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
           if (useKettle) {
-
-            val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
-              DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
-            }.distinct.size
-            val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
-              sqlContext.sparkContext)
-            val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
-
             status = new DataFrameLoaderRDD(sqlContext.sparkContext,
               new DataLoadResultImpl(),
               carbonLoadModel,
@@ -674,7 +671,6 @@ object CarbonDataRDDFactory {
               schemaLastUpdatedTime,
               newRdd).collect()
           }
-
         } catch {
           case ex: Exception =>
             LOGGER.error(ex, "load data frame failed")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8dda2a8d/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 3a964c1..f85e56b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -307,8 +307,8 @@ case class CarbonRelation(
   private var sizeInBytesLocalValue = 0L
 
   def sizeInBytes: Long = {
-    val tableStatusNewLastUpdatedTime = new SegmentStatusManager(
-      tableMeta.carbonTable.getAbsoluteTableIdentifier).getTableStatusLastModifiedTime
+    val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
+      tableMeta.carbonTable.getAbsoluteTableIdentifier)
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
       val tablePath = CarbonStorePath.getCarbonTablePath(
         tableMeta.storePath,


Mime
View raw message