carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/4] incubator-carbondata git commit: [CARBONDATA-484] fixed impacted test cases + refactored design and fixed review comments
Date Tue, 03 Jan 2017 17:53:20 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master cb2148040 -> b96604334


[CARBONDATA-484] fixed impacted test cases + refactored design and fixed review comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d53feefd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d53feefd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d53feefd

Branch: refs/heads/master
Commit: d53feefd60dbedd48ec9582751ac694316cf97f2
Parents: b6ab4ef
Author: Venkata Ramana G <g.ramana.v@gmail.com>
Authored: Tue Jan 3 22:57:21 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Tue Jan 3 23:11:09 2017 +0530

----------------------------------------------------------------------
 .../org/apache/carbondata/core/cache/Cache.java | 14 ++-
 .../carbondata/core/cache/CacheProvider.java    |  5 +-
 .../dictionary/ForwardDictionaryCache.java      | 11 ++-
 .../dictionary/ReverseDictionaryCache.java      |  9 ++
 .../core/carbon/datastore/BlockIndexStore.java  |  9 ++
 .../carbon/datastore/SegmentTaskIndexStore.java | 21 +----
 .../datastore/SegmentTaskIndexStoreTest.java    |  4 +-
 .../carbondata/hadoop/CacheAccessClient.java    | 99 ++++++++++++++++++++
 .../apache/carbondata/hadoop/CacheClient.java   | 73 +++------------
 .../carbondata/hadoop/CarbonInputFormat.java    | 15 ++-
 .../internal/index/impl/InMemoryBTreeIndex.java |  6 +-
 .../MajorCompactionIgnoreInMinorTest.scala      |  6 +-
 12 files changed, 177 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
index b519deb..c87e7d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.util.CarbonUtilException;
  * either evicted or manually invalidated.
  * Implementations of this interface are expected to be thread-safe, and can be safely accessed
  * by multiple concurrent threads.
+ * This class also responsible for incrementing and decrementing access count during get
operation
  */
 public interface Cache<K, V> {
 
@@ -36,6 +37,8 @@ public interface Cache<K, V> {
    * This method will get the value for the given key. If value does not exist
    * for the given key, it will check and load the value.
    *
+   * Access count of Cacheable entry will be incremented
+   *
    * @param key
    * @return
    * @throws CarbonUtilException in case memory is not sufficient to load data into memory
@@ -45,7 +48,7 @@ public interface Cache<K, V> {
   /**
    * This method will return a list of values for the given list of keys.
    * For each key, this method will check and load the data if required.
-   *
+   * Access count of Cacheable entry will be incremented
    * @param keys
    * @return
    * @throws CarbonUtilException in case memory is not sufficient to load data into memory
@@ -55,7 +58,7 @@ public interface Cache<K, V> {
   /**
    * This method will return the value for the given key. It will not check and load
    * the data for the given key
-   *
+   * Access count of Cacheable entry will be incremented
    * @param key
    * @return
    */
@@ -67,5 +70,12 @@ public interface Cache<K, V> {
    * @param key
    */
   void invalidate(K key);
+
+  /**
+   * Access count of Cacheable entry will be decremented
+   *
+   * @param keys
+   */
+  void clearAccessCount(List<K> keys);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index 7d92ca2..412f094 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -30,9 +30,7 @@ import org.apache.carbondata.core.cache.dictionary.ForwardDictionaryCache;
 import org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache;
 import org.apache.carbondata.core.carbon.datastore.BlockIndexStore;
 import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
-import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -129,8 +127,7 @@ public class CacheProvider {
           carbonLRUCache);
     } else if (cacheType.equals(cacheType.DRIVER_BTREE)) {
       cacheObject =
-          new SegmentTaskIndexStore<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>(
-              carbonStorePath, carbonLRUCache);
+          new SegmentTaskIndexStore(carbonStorePath, carbonLRUCache);
     }
     cacheTypeToCacheMap.put(cacheType, cacheObject);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java
index e6fc9d8..ff30c73 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java
@@ -167,7 +167,7 @@ public class ForwardDictionaryCache<K extends DictionaryColumnUniqueIdentifier,
       throws CarbonUtilException {
     Dictionary forwardDictionary = null;
     // dictionary is only for primitive data type
-    assert(!dictionaryColumnUniqueIdentifier.getDataType().isComplexType());
+    assert (!dictionaryColumnUniqueIdentifier.getDataType().isComplexType());
     String columnIdentifier = dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId();
     ColumnDictionaryInfo columnDictionaryInfo =
         getColumnDictionaryInfo(dictionaryColumnUniqueIdentifier, columnIdentifier);
@@ -202,4 +202,13 @@ public class ForwardDictionaryCache<K extends DictionaryColumnUniqueIdentifier,
     }
     return columnDictionaryInfo;
   }
+
+  @Override public void clearAccessCount(List<DictionaryColumnUniqueIdentifier> keys)
{
+    for (DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier : keys) {
+      Dictionary cacheable = (Dictionary) carbonLRUCache.get(
+          getLruCacheKey(dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId(),
+              CacheType.FORWARD_DICTIONARY));
+      cacheable.clear();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java
index 2a0cd38..fab767f 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java
@@ -203,4 +203,13 @@ public class ReverseDictionaryCache<K extends DictionaryColumnUniqueIdentifier,
     }
     return columnReverseDictionaryInfo;
   }
+
+  @Override public void clearAccessCount(List<DictionaryColumnUniqueIdentifier> keys)
{
+    for (DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier : keys) {
+      Dictionary cacheable = (Dictionary) carbonLRUCache.get(
+          getLruCacheKey(dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId(),
+              CacheType.REVERSE_DICTIONARY));
+      cacheable.clear();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
index 9b5818f..a452338 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.carbon.datastore.block.BlockIndex;
 import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;
 import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
@@ -226,6 +227,14 @@ public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K,
V> {
         .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
   }
 
+  @Override public void clearAccessCount(List<TableBlockUniqueIdentifier> keys) {
+    for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : keys) {
+      SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache
+          .get(tableBlockUniqueIdentifier.getUniqueTableBlockName());
+      cacheable.clear();
+    }
+  }
+
   /**
    * Below method will be used to fill the loaded blocks to the array
    * which will be used for query execution

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index 83e06e9..ef0ac62 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -46,7 +46,7 @@ import org.apache.carbondata.core.util.CarbonUtilException;
  * Class to handle loading, unloading,clearing,storing of the table
  * blocks
  */
-public class SegmentTaskIndexStore<K, V>
+public class SegmentTaskIndexStore
     implements Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(SegmentTaskIndexStore.class.getName());
@@ -322,27 +322,12 @@ public class SegmentTaskIndexStore<K, V>
   }
 
   /**
-   * Below method will be used to remove the segment based on
-   * segment id is passed
-   *
-   * @param segmentToBeRemoved      segment to be removed
-   * @param absoluteTableIdentifier absoluteTableIdentifier
-   */
-  public void removeSegments(List<String> segmentToBeRemoved,
-      AbsoluteTableIdentifier absoluteTableIdentifier) {
-    for (String segmentId : segmentToBeRemoved) {
-      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
-          new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
-      lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
-    }
-  }
-
-  /**
    * The method clears the access count of table segments
    *
    * @param tableSegmentUniqueIdentifiers
    */
-  public void clear(List<TableSegmentUniqueIdentifier> tableSegmentUniqueIdentifiers)
{
+  @Override
+  public void clearAccessCount(List<TableSegmentUniqueIdentifier> tableSegmentUniqueIdentifiers)
{
     for (TableSegmentUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers)
{
       SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache
           .get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
index 3490917..8adb8bb 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
@@ -60,7 +60,9 @@ public class SegmentTaskIndexStoreTest {
 
   @BeforeClass public static void setUp() {
     CacheProvider cacheProvider = CacheProvider.getInstance();
-    taskIndexStore = (SegmentTaskIndexStore) cacheProvider.createCache(CacheType.DRIVER_BTREE,
"");
+    taskIndexStore = (SegmentTaskIndexStore) cacheProvider.
+        <TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
+            createCache(CacheType.DRIVER_BTREE, "");
     tableBlockInfo = new TableBlockInfo("file", 0L, "SG100", locations, 10L,
         ColumnarFormatVersion.valueOf(version));
     absoluteTableIdentifier = new AbsoluteTableIdentifier("/tmp",

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
new file mode 100644
index 0000000..2760035
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.hadoop;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtilException;
+
+/**
+ * CacheClient : Class used to request the segments cache
+ */
+public class CacheAccessClient<K, V> {
+  /**
+   * List of segments
+   */
+  private List<K> segmentList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  /**
+   * absolute table identifier
+   */
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  private Cache<K, V> cache;
+
+  public CacheAccessClient(Cache cache) {
+    this.cache = cache;
+  }
+
+  /**
+   * This method will return the value for the given key. It will not check and load
+   * the data for the given key
+   *
+   * @param key
+   * @return
+   */
+  public V getIfPresent(K key) {
+    V value = cache.getIfPresent(key);
+    if (value != null) {
+      segmentList.add(key);
+    }
+    return value;
+  }
+
+  /**
+   * This method will get the value for the given key. If value does not exist
+   * for the given key, it will check and load the value.
+   *
+   * @param key
+   * @return
+   * @throws CarbonUtilException in case memory is not sufficient to load data into memory
+   */
+  public V get(K key) throws CarbonUtilException {
+    V value = cache.get(key);
+    if (value != null) {
+      segmentList.add(key);
+    }
+    return value;
+  }
+
+  /**
+   * the method is used to clear access count of the unused segments cacheable object
+   */
+  public void close() {
+    cache.clearAccessCount(segmentList);
+    cache = null;
+  }
+
+  /**
+   * This method will remove the cache for a given key
+   *
+   * @param keys
+   */
+  public void invalidateAll(List<K> keys) {
+    for (K key : keys) {
+      cache.invalidate(key);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index cbd3511..1982e2c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -18,79 +18,34 @@
  */
 package org.apache.carbondata.hadoop;
 
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
 import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonUtilException;
 
 /**
- * CacheClient : Class used to request the segments cache
+ * CacheClient : Holds all the Cache access clients for Btree, Dictionary
  */
 public class CacheClient {
-  /**
-   * List of segments
-   */
-  private List<TableSegmentUniqueIdentifier> segmentList =
-      new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  /**
-   * absolute table identifier
-   */
-  private AbsoluteTableIdentifier absoluteTableIdentifier;
 
-  private SegmentTaskIndexStore segmentCache;
+  // segment access client for driver LRU cache
+  private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
+      segmentAccessClient;
 
-  /**
-   * @param absoluteTableIdentifier
-   */
-  public CacheClient(AbsoluteTableIdentifier absoluteTableIdentifier) {
-    this.absoluteTableIdentifier = absoluteTableIdentifier;
-    segmentCache = (SegmentTaskIndexStore) CacheProvider.getInstance()
-        .createCache(CacheType.DRIVER_BTREE, absoluteTableIdentifier.getStorePath());
+  public CacheClient(String storePath) {
+    Cache segmentCache = CacheProvider
+        .getInstance().<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>createCache(
+            CacheType.DRIVER_BTREE, storePath);
+    segmentAccessClient = new CacheAccessClient<>(segmentCache);
   }
 
-  /**
-   * The method returns the SegmentTaskIndexWrapper from the segments cache
-   *
-   * @param tableSegmentUniqueIdentifier
-   * @return
-   * @throws CarbonUtilException
-   */
-  public SegmentTaskIndexWrapper getSegmentTaskIndexWrapper(
-      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) throws CarbonUtilException
{
-    SegmentTaskIndexWrapper segmentTaskIndexWrapper;
-    if (null == tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos()) {
-      segmentTaskIndexWrapper = segmentCache.getIfPresent(tableSegmentUniqueIdentifier);
-    } else {
-      segmentTaskIndexWrapper = segmentCache.get(tableSegmentUniqueIdentifier);
-    }
-    if (null != segmentTaskIndexWrapper) {
-      segmentList.add(tableSegmentUniqueIdentifier);
-    }
-    return segmentTaskIndexWrapper;
+  public CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
+      getSegmentAccessClient() {
+    return segmentAccessClient;
   }
 
-  /**
-   * the method is used to clear access count of the unused segments cacheable object
-   */
   public void close() {
-    segmentCache.clear(segmentList);
-    segmentCache =null;
-  }
-
-  /**
-   * The method removes invalid segments from the segment level cache
-   *
-   * @param invalidSegments
-   */
-  public void removeInvalidSegments(List<String> invalidSegments) {
-    segmentCache.removeSegments(invalidSegments, absoluteTableIdentifier);
+    segmentAccessClient.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 2e92842..7e9dc7a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -211,7 +211,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
    */
   @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-    CacheClient cacheClient = new CacheClient(identifier);
+    CacheClient cacheClient = new CacheClient(identifier.getStorePath());
     List<String> invalidSegments = new ArrayList<>();
 
     // get all valid segments and set them into the configuration
@@ -226,7 +226,12 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       // remove entry in the segment index if there are invalid segments
       invalidSegments.addAll(segments.getInvalidSegments());
       if (invalidSegments.size() > 0) {
-        cacheClient.removeInvalidSegments(invalidSegments);
+        List<TableSegmentUniqueIdentifier> invalidSegmentsIds
+            = new ArrayList<>(invalidSegments.size());
+        for(String segId: invalidSegments) {
+          invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId));
+        }
+        cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
       }
     }
 
@@ -404,8 +409,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
     TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
         new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
-    SegmentTaskIndexWrapper segmentTaskIndexWrapper =
-        cacheClient.getSegmentTaskIndexWrapper(tableSegmentUniqueIdentifier);
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = (SegmentTaskIndexWrapper)
+        cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
     if (null != segmentTaskIndexWrapper) {
       segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
     }
@@ -422,7 +427,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       // get Btree blocks for given segment
       tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
       segmentTaskIndexWrapper =
-          cacheClient.getSegmentTaskIndexWrapper(tableSegmentUniqueIdentifier);
+          cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
       segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
     }
     return segmentIndexMap;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index dd16e57..9d8b136 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -108,12 +108,12 @@ class InMemoryBTreeIndex implements Index {
       JobContext job, AbsoluteTableIdentifier identifier)
       throws IOException, IndexBuilderException {
     Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
-    CacheClient cacheClient = new CacheClient(identifier);
+    CacheClient cacheClient = new CacheClient(identifier.getStorePath());
     TableSegmentUniqueIdentifier segmentUniqueIdentifier =
         new TableSegmentUniqueIdentifier(identifier, segment.getId());
     try {
       SegmentTaskIndexWrapper segmentTaskIndexWrapper =
-          cacheClient.getSegmentTaskIndexWrapper(segmentUniqueIdentifier);
+          cacheClient.getSegmentAccessClient().getIfPresent(segmentUniqueIdentifier);
       if (null != segmentTaskIndexWrapper) {
         segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
       }
@@ -125,7 +125,7 @@ class InMemoryBTreeIndex implements Index {
         segmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
         // TODO: loadAndGetTaskIdToSegmentsMap can be optimized, use tableBlockInfoList as
input
         // get Btree blocks for given segment
-        segmentTaskIndexWrapper = cacheClient.getSegmentTaskIndexWrapper(segmentUniqueIdentifier);
+        segmentTaskIndexWrapper = cacheClient.getSegmentAccessClient().get(segmentUniqueIdentifier);
         segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d53feefd/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 451b95d..37ce089 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -141,9 +141,11 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     assert(segments.contains("2.1"))
     assert(!segments.contains("2"))
     assert(!segments.contains("3"))
-    val cacheClient = new CacheClient(identifier);
+    val cacheClient = new CacheClient(CarbonProperties.getInstance.
+      getProperty(CarbonCommonConstants.STORE_LOCATION));
     val segmentIdentifier = new TableSegmentUniqueIdentifier(identifier, "2")
-    val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentTaskIndexWrapper(segmentIdentifier)
+    val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentAccessClient.
+      getIfPresent(segmentIdentifier)
     assert(null == wrapper)
 
   }


Mime
View raw message