carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [09/50] carbondata git commit: [HOTFIX] Improved BlockDataMap caching performance during first time query
Date Mon, 30 Jul 2018 18:42:35 GMT
[HOTFIX] Improved BlockDataMap caching performance during first time query

Things done as part of this PR

Created taskSumamry and FileFooterEntry schema once and stored in member variable.
Everytime creation of schema was a costly operation as time to prune dataMaps increased because
of that.

Used TreeMap instead of HashMap while adding the complete file path and data to the map diring
merge file read.
Using TreeMap improved the map filling performance by 10 sec for 1200 entries.

This closes #2531


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c144e3da
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c144e3da
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c144e3da

Branch: refs/heads/branch-1.4
Commit: c144e3da5359793ea2ade78211a655a1d5470b86
Parents: 8046bca
Author: manishgupta88 <tomanishgupta18@gmail.com>
Authored: Thu Jul 19 19:15:12 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Jul 31 00:10:41 2018 +0530

----------------------------------------------------------------------
 .../block/SegmentPropertiesAndSchemaHolder.java | 96 ++++++++++++++++++--
 .../indexstore/BlockletDataMapIndexStore.java   | 37 +++-----
 .../blockletindex/SegmentIndexFileStore.java    | 13 ++-
 .../core/metadata/CarbonTableIdentifier.java    |  9 +-
 4 files changed, 115 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c144e3da/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index bb7ff0d..21e22b1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -108,7 +108,8 @@ public class SegmentPropertiesAndSchemaHolder {
             this.segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
         if (null == segmentIdSetAndIndexWrapper) {
           // create new segmentProperties
-          segmentPropertiesWrapper.initSegmentProperties(carbonTable);
+          segmentPropertiesWrapper.initSegmentProperties();
+          segmentPropertiesWrapper.addMinMaxColumns(carbonTable);
           int segmentPropertiesIndex = segmentPropertiesIndexCounter.incrementAndGet();
           indexToSegmentPropertiesWrapperMapping
               .put(segmentPropertiesIndex, segmentPropertiesWrapper);
@@ -128,6 +129,9 @@ public class SegmentPropertiesAndSchemaHolder {
     } else {
       synchronized (getOrCreateTableLock(carbonTable.getAbsoluteTableIdentifier())) {
         segmentIdSetAndIndexWrapper.addSegmentId(segmentId);
+        indexToSegmentPropertiesWrapperMapping
+            .get(segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex())
+            .addMinMaxColumns(carbonTable);
       }
     }
     return segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex();
@@ -226,8 +230,7 @@ public class SegmentPropertiesAndSchemaHolder {
     if (null != segmentPropertiesWrapper) {
       SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper
=
           segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
-      synchronized (segmentPropertiesWrapper.getTableIdentifier().getCarbonTableIdentifier()
-          .getTableUniqueName()) {
+      synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier()))
{
         segmentIdAndSegmentPropertiesIndexWrapper.removeSegmentId(segmentId);
       }
       // if after removal of given SegmentId, the segmentIdSet becomes empty that means this
@@ -237,6 +240,31 @@ public class SegmentPropertiesAndSchemaHolder {
           .isEmpty()) {
         indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex);
         segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
+      } else if (!clearSegmentWrapperFromMap
+          && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) {
+        // min max columns can very when cache is modified. So even though entry is not required
+        // to be deleted from map clear the column cache so that it can filled again
+        segmentPropertiesWrapper.clear();
+        LOGGER.info("cleared min max for segmentProperties at index: " + segmentPropertiesIndex);
+      }
+    }
+  }
+
+  /**
+   * add segmentId at given segmentPropertyIndex
+   * Note: This method is getting used in extension with other features. Please do not remove
+   *
+   * @param segmentPropertiesIndex
+   * @param segmentId
+   */
+  public void addSegmentId(int segmentPropertiesIndex, String segmentId) {
+    SegmentPropertiesWrapper segmentPropertiesWrapper =
+        indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesIndex);
+    if (null != segmentPropertiesWrapper) {
+      SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper
=
+          segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
+      synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier()))
{
+        segmentIdAndSegmentPropertiesIndexWrapper.addSegmentId(segmentId);
       }
     }
   }
@@ -247,11 +275,19 @@ public class SegmentPropertiesAndSchemaHolder {
    */
   public static class SegmentPropertiesWrapper {
 
+    private static final Object taskSchemaLock = new Object();
+    private static final Object fileFooterSchemaLock = new Object();
+
     private AbsoluteTableIdentifier tableIdentifier;
     private List<ColumnSchema> columnsInTable;
     private int[] columnCardinality;
     private SegmentProperties segmentProperties;
     private List<CarbonColumn> minMaxCacheColumns;
+    private CarbonRowSchema[] taskSummarySchema;
+    // same variable can be used for block and blocklet schema because at any given cache_level
+    // with either block or blocklet and whenever cache_level is changed the cache and its
+    // corresponding segmentProperties is flushed
+    private CarbonRowSchema[] fileFooterEntrySchema;
 
     public SegmentPropertiesWrapper(CarbonTable carbonTable,
         List<ColumnSchema> columnsInTable, int[] columnCardinality) {
@@ -260,9 +296,25 @@ public class SegmentPropertiesAndSchemaHolder {
       this.columnCardinality = columnCardinality;
     }
 
-    public void initSegmentProperties(CarbonTable carbonTable) {
+    public void initSegmentProperties() {
       segmentProperties = new SegmentProperties(columnsInTable, columnCardinality);
-      this.minMaxCacheColumns = carbonTable.getMinMaxCacheColumns(segmentProperties);
+    }
+
+    public void addMinMaxColumns(CarbonTable carbonTable) {
+      if (null == minMaxCacheColumns || minMaxCacheColumns.isEmpty()) {
+        minMaxCacheColumns = carbonTable.getMinMaxCacheColumns(segmentProperties);
+      }
+    }
+
+    /**
+     * clear required fields
+     */
+    public void clear() {
+      if (null != minMaxCacheColumns) {
+        minMaxCacheColumns.clear();
+      }
+      taskSummarySchema = null;
+      fileFooterEntrySchema = null;
     }
 
     @Override public boolean equals(Object obj) {
@@ -299,22 +351,46 @@ public class SegmentPropertiesAndSchemaHolder {
 
     public CarbonRowSchema[] getTaskSummarySchema(boolean storeBlockletCount,
         boolean filePathToBeStored) throws MemoryException {
-      return SchemaGenerator
-          .createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount,
-              filePathToBeStored);
+      if (null == taskSummarySchema) {
+        synchronized (taskSchemaLock) {
+          if (null == taskSummarySchema) {
+            taskSummarySchema = SchemaGenerator
+                .createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount,
+                    filePathToBeStored);
+          }
+        }
+      }
+      return taskSummarySchema;
     }
 
     public CarbonRowSchema[] getBlockFileFooterEntrySchema() {
-      return SchemaGenerator.createBlockSchema(segmentProperties, minMaxCacheColumns);
+      return getOrCreateFileFooterEntrySchema(true);
     }
 
     public CarbonRowSchema[] getBlockletFileFooterEntrySchema() {
-      return SchemaGenerator.createBlockletSchema(segmentProperties, minMaxCacheColumns);
+      return getOrCreateFileFooterEntrySchema(false);
     }
 
     public List<CarbonColumn> getMinMaxCacheColumns() {
       return minMaxCacheColumns;
     }
+
+    private CarbonRowSchema[] getOrCreateFileFooterEntrySchema(boolean isCacheLevelBlock)
{
+      if (null == fileFooterEntrySchema) {
+        synchronized (fileFooterSchemaLock) {
+          if (null == fileFooterEntrySchema) {
+            if (isCacheLevelBlock) {
+              fileFooterEntrySchema =
+                  SchemaGenerator.createBlockSchema(segmentProperties, minMaxCacheColumns);
+            } else {
+              fileFooterEntrySchema =
+                  SchemaGenerator.createBlockletSchema(segmentProperties, minMaxCacheColumns);
+            }
+          }
+        }
+      }
+      return fileFooterEntrySchema;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c144e3da/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 1501e6d..3a8aa52 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -210,35 +210,26 @@ public class BlockletDataMapIndexStore
   @Override
   public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
       BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
-    String uniqueTableSegmentIdentifier =
-        tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
-            .getUniqueTableSegmentIdentifier();
-    Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
-    if (lock == null) {
-      lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
-    }
     // As dataMap will use unsafe memory, it is not recommended to overwrite an existing
entry
     // as in that case clearing unsafe memory need to be taken card. If at all datamap entry
     // in the cache need to be overwritten then use the invalidate interface
     // and then use the put interface
     if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
-      synchronized (lock) {
-        if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
-          List<BlockDataMap> dataMaps = wrapper.getDataMaps();
-          try {
-            for (BlockDataMap blockletDataMap: dataMaps) {
-              blockletDataMap.convertToUnsafeDMStore();
-            }
-            lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
-                .getUniqueTableSegmentIdentifier(), wrapper, wrapper.getMemorySize());
-          } catch (Throwable e) {
-            // clear all the memory acquired by data map in case of any failure
-            for (DataMap blockletDataMap : dataMaps) {
-              blockletDataMap.clear();
-            }
-            throw new IOException("Problem in adding datamap to cache.", e);
-          }
+      List<BlockDataMap> dataMaps = wrapper.getDataMaps();
+      try {
+        for (BlockDataMap blockletDataMap : dataMaps) {
+          blockletDataMap.convertToUnsafeDMStore();
+        }
+        // Locking is not required here because in LRU cache map add method is synchronized
to add
+        // only one entry at a time and if a key already exists it will not overwrite the
entry
+        lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
+            .getUniqueTableSegmentIdentifier(), wrapper, wrapper.getMemorySize());
+      } catch (Throwable e) {
+        // clear all the memory acquired by data map in case of any failure
+        for (DataMap blockletDataMap : dataMaps) {
+          blockletDataMap.clear();
         }
+        throw new IOException("Problem in adding datamap to cache.", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c144e3da/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 35e512d..16910ac 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -77,7 +78,7 @@ public class SegmentIndexFileStore {
 
   public SegmentIndexFileStore() {
     carbonIndexMap = new HashMap<>();
-    carbonIndexMapWithFullPath = new HashMap<>();
+    carbonIndexMapWithFullPath = new TreeMap<>();
     carbonMergeFileToIndexFilesMap = new HashMap<>();
   }
 
@@ -259,12 +260,14 @@ public class SegmentIndexFileStore {
       carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names);
       List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
       CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
+      String mergeFileAbsolutePath = mergeFile.getParentFile().getAbsolutePath();
       assert (file_names.size() == fileData.size());
       for (int i = 0; i < file_names.size(); i++) {
-        carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
-        carbonIndexMapWithFullPath.put(
-            mergeFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR
-                + file_names.get(i), fileData.get(i).array());
+        byte[] data = fileData.get(i).array();
+        carbonIndexMap.put(file_names.get(i), data);
+        carbonIndexMapWithFullPath
+            .put(mergeFileAbsolutePath + CarbonCommonConstants.FILE_SEPARATOR + file_names.get(i),
+                data);
       }
     } finally {
       thriftReader.close();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c144e3da/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
index b0b325a..7d93e27 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
@@ -28,6 +28,10 @@ public class CarbonTableIdentifier implements Serializable {
   private static final long serialVersionUID = -0L;
 
   /**
+   * variable for storing tableUnique name. As it is constant it should be constructed only
once
+   */
+  private String tableUniqueName;
+  /**
    * database name
    */
   private String databaseName;
@@ -49,6 +53,7 @@ public class CarbonTableIdentifier implements Serializable {
     this.databaseName = databaseName;
     this.tableName = tableName;
     this.tableId = tableId;
+    tableUniqueName = databaseName + '_' + tableName;
   }
 
   /**
@@ -76,7 +81,7 @@ public class CarbonTableIdentifier implements Serializable {
    * @return table unique name
    */
   public String getTableUniqueName() {
-    return databaseName + '_' + tableName;
+    return tableUniqueName;
   }
 
   /**
@@ -134,6 +139,6 @@ public class CarbonTableIdentifier implements Serializable {
    * return unique table name
    */
   @Override public String toString() {
-    return databaseName + '_' + tableName;
+    return tableUniqueName;
   }
 }


Mime
View raw message