carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [18/50] [abbrv] carbondata git commit: [CARBONDATA-1505] Get the detailed blocklet information using default BlockletDataMap for other datamaps
Date Tue, 10 Oct 2017 03:08:05 GMT
[CARBONDATA-1505] Get the detailed blocklet information using default BlockletDataMap for other
datamaps

All the detail information of blocklet which is need for exceuting query is present only BlockletDataMap.
It is actually default datamap.
So if new datamap is added then it gives only information of blocklet and blockid, it is insuffucient
information to exceute query.
Now this PR adds the functionality of retrieving detailed blocklet information from the BlockletDataMap
based on block and blocklet id. So now new datamaps can only concentrate on business logic
and return only block and blockletid.

This closes #1376


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/28f78b2f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/28f78b2f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/28f78b2f

Branch: refs/heads/streaming_ingest
Commit: 28f78b2fc1eda650d1e7a7ea48258ba911361b1a
Parents: eb771f5
Author: Ravindra Pesala <ravi.pesala@gmail.com>
Authored: Thu Sep 21 11:10:57 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Thu Sep 28 18:16:17 2017 +0800

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       | 22 +++++-
 .../carbondata/core/datamap/TableDataMap.java   | 35 ++++++---
 .../carbondata/core/indexstore/Blocklet.java    | 49 ------------
 .../core/indexstore/BlockletDetailsFetcher.java | 47 +++++++++++
 .../core/indexstore/ExtendedBlocklet.java       | 82 ++++++++++++++++++++
 .../TableBlockIndexUniqueIdentifier.java        |  4 +
 .../blockletindex/BlockletDataMap.java          | 11 ++-
 .../blockletindex/BlockletDataMapFactory.java   | 64 ++++++++++++++-
 .../hadoop/api/CarbonTableInputFormat.java      | 23 ++++--
 .../carbondata/hadoop/api/DataMapJob.java       |  5 +-
 .../hadoop/api/DistributableDataMapFormat.java  | 14 ++--
 .../carbondata/spark/rdd/SparkDataMapJob.scala  | 12 +--
 12 files changed, 279 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index f19e733..2b5d5cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -25,6 +25,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -109,7 +112,13 @@ public final class DataMapStoreManager {
           (Class<? extends DataMapFactory>) Class.forName(factoryClassName);
       DataMapFactory dataMapFactory = factoryClass.newInstance();
       dataMapFactory.init(identifier, dataMapName);
-      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
+      BlockletDetailsFetcher blockletDetailsFetcher;
+      if (dataMapFactory instanceof BlockletDetailsFetcher) {
+        blockletDetailsFetcher = (BlockletDetailsFetcher) dataMapFactory;
+      } else {
+        blockletDetailsFetcher = getBlockletDetailsFetcher(identifier);
+      }
+      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory, blockletDetailsFetcher);
     } catch (Exception e) {
       LOGGER.error(e);
       throw new RuntimeException(e);
@@ -152,6 +161,17 @@ public final class DataMapStoreManager {
   }
 
   /**
+   * Get the blocklet datamap factory to get the detail information of blocklets
+   * @param identifier
+   * @return
+   */
+  private BlockletDetailsFetcher getBlockletDetailsFetcher(AbsoluteTableIdentifier identifier)
{
+    TableDataMap blockletMap =
+        getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+    return (BlockletDetailsFetcher) blockletMap.getDataMapFactory();
+  }
+
+  /**
    * Returns the singleton instance
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 66bb257..3e5e9e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.events.ChangeEvent;
 import org.apache.carbondata.core.events.EventListener;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
@@ -40,14 +42,17 @@ public final class TableDataMap implements EventListener {
 
   private DataMapFactory dataMapFactory;
 
+  private BlockletDetailsFetcher blockletDetailsFetcher;
+
   /**
    * It is called to initialize and load the required table datamap metadata.
    */
-  public TableDataMap(AbsoluteTableIdentifier identifier,
-      String dataMapName, DataMapFactory dataMapFactory) {
+  public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+      DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher) {
     this.identifier = identifier;
     this.dataMapName = dataMapName;
     this.dataMapFactory = dataMapFactory;
+    this.blockletDetailsFetcher = blockletDetailsFetcher;
   }
 
   /**
@@ -57,21 +62,24 @@ public final class TableDataMap implements EventListener {
    * @param filterExp
    * @return
    */
-  public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp)
+  public List<ExtendedBlocklet> prune(List<String> segmentIds, FilterResolverIntf
filterExp)
       throws IOException {
-    List<Blocklet> blocklets = new ArrayList<>();
+    List<ExtendedBlocklet> blocklets = new ArrayList<>();
     for (String segmentId : segmentIds) {
+      List<Blocklet> pruneBlocklets = new ArrayList<>();
       List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
       for (DataMap dataMap : dataMaps) {
-        List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
-        blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+        pruneBlocklets.addAll(dataMap.prune(filterExp));
       }
+      blocklets.addAll(addSegmentId(blockletDetailsFetcher
+          .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId));
     }
     return blocklets;
   }
 
-  private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId)
{
-    for (Blocklet blocklet : pruneBlocklets) {
+  private List<ExtendedBlocklet> addSegmentId(List<ExtendedBlocklet> pruneBlocklets,
+      String segmentId) {
+    for (ExtendedBlocklet blocklet : pruneBlocklets) {
       blocklet.setSegmentId(segmentId);
     }
     return pruneBlocklets;
@@ -107,12 +115,17 @@ public final class TableDataMap implements EventListener {
    * @param filterExp
    * @return
    */
-  public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf
filterExp) {
+  public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+      FilterResolverIntf filterExp) throws IOException {
+    List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     List<Blocklet> blocklets = dataMapFactory.getDataMap(distributable).prune(filterExp);
     for (Blocklet blocklet: blocklets) {
-      blocklet.setSegmentId(distributable.getSegmentId());
+      ExtendedBlocklet detailedBlocklet =
+          blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegmentId());
+      detailedBlocklet.setSegmentId(distributable.getSegmentId());
+      detailedBlocklets.add(detailedBlocklet);
     }
-    return blocklets;
+    return detailedBlocklets;
   }
 
   @Override public void fireEvent(ChangeEvent event) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 919a48d..d84f3f6 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,16 +16,8 @@
  */
 package org.apache.carbondata.core.indexstore;
 
-import java.io.IOException;
 import java.io.Serializable;
 
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
 /**
  * Blocklet
  */
@@ -33,16 +25,8 @@ public class Blocklet implements Serializable {
 
   private String path;
 
-  private String segmentId;
-
   private String blockletId;
 
-  private BlockletDetailInfo detailInfo;
-
-  private long length;
-
-  private String[] location;
-
   public Blocklet(String path, String blockletId) {
     this.path = path;
     this.blockletId = blockletId;
@@ -56,37 +40,4 @@ public class Blocklet implements Serializable {
     return blockletId;
   }
 
-  public BlockletDetailInfo getDetailInfo() {
-    return detailInfo;
-  }
-
-  public void setDetailInfo(BlockletDetailInfo detailInfo) {
-    this.detailInfo = detailInfo;
-  }
-
-  public void updateLocations() throws IOException {
-    Path path = new Path(this.path);
-    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
-    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
-    LocatedFileStatus fileStatus = iter.next();
-    location = fileStatus.getBlockLocations()[0].getHosts();
-    length = fileStatus.getLen();
-  }
-
-  public String[] getLocations() throws IOException {
-    return location;
-  }
-
-  public long getLength() throws IOException {
-    return length;
-  }
-
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
new file mode 100644
index 0000000..21ecba1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Fetches the detailed blocklet which has more information to execute the query
+ */
+public interface BlockletDetailsFetcher {
+
+  /**
+   * Get the blocklet detail information based on blockletid, blockid and segmentid.
+   *
+   * @param blocklets
+   * @param segmentId
+   * @return
+   * @throws IOException
+   */
+  List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String
segmentId)
+      throws IOException;
+
+  /**
+   * Get the blocklet detail information based on blockletid, blockid and segmentid.
+   *
+   * @param blocklet
+   * @param segmentId
+   * @return
+   * @throws IOException
+   */
+  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
new file mode 100644
index 0000000..e0cfefb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * Detailed blocklet information
+ */
+public class ExtendedBlocklet extends Blocklet {
+
+  private String segmentId;
+
+  private BlockletDetailInfo detailInfo;
+
+  private long length;
+
+  private String[] location;
+
+  public ExtendedBlocklet(String path, String blockletId) {
+    super(path, blockletId);
+  }
+
+  public BlockletDetailInfo getDetailInfo() {
+    return detailInfo;
+  }
+
+  public void setDetailInfo(BlockletDetailInfo detailInfo) {
+    this.detailInfo = detailInfo;
+  }
+
+  /**
+   * It gets the hdfs block locations and length for this blocklet. It is used internally
to get the
+   * locations for allocating tasks.
+   * @throws IOException
+   */
+  public void updateLocations() throws IOException {
+    Path path = new Path(getPath());
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+    LocatedFileStatus fileStatus = iter.next();
+    location = fileStatus.getBlockLocations()[0].getHosts();
+    length = fileStatus.getLen();
+  }
+
+  public String[] getLocations() throws IOException {
+    return location;
+  }
+
+  public long getLength() throws IOException {
+    return length;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
index 7e2bc0e..18357ac 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -100,4 +100,8 @@ public class TableBlockIndexUniqueIdentifier {
     result = 31 * result + carbonIndexFileName.hashCode();
     return result;
   }
+
+  public String getCarbonIndexFileName() {
+    return carbonIndexFileName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 57211fd..adcb1a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -286,6 +287,12 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return blocklets;
   }
 
+  public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
+    int index = Integer.parseInt(blockletId);
+    DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(index);
+    return createBlocklet(unsafeRow, index);
+  }
+
   private byte[][] getMinMaxValue(DataMapRow row, int index) {
     DataMapRow minMaxRow = row.getRow(index);
     byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
@@ -295,8 +302,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return minMax;
   }
 
-  private Blocklet createBlocklet(DataMapRow row, int blockletId) {
-    Blocklet blocklet = new Blocklet(
+  private ExtendedBlocklet createBlocklet(DataMapRow row, int blockletId) {
+    ExtendedBlocklet blocklet = new ExtendedBlocklet(
         new String(row.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
         blockletId + "");
     BlockletDetailInfo detailInfo = new BlockletDetailInfo();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index d734d81..5edc5b7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -34,8 +34,12 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -45,7 +49,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 /**
  * Table map for blocklet
  */
-public class BlockletDataMapFactory implements DataMapFactory {
+public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher {
 
   private AbsoluteTableIdentifier identifier;
 
@@ -69,6 +73,13 @@ public class BlockletDataMapFactory implements DataMapFactory {
   @Override
   public List<DataMap> getDataMaps(String segmentId) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+        getTableBlockIndexUniqueIdentifiers(segmentId);
+    return cache.getAll(tableBlockIndexUniqueIdentifiers);
+  }
+
+  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
+      String segmentId) {
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segmentId);
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = new ArrayList<>();
@@ -79,8 +90,55 @@ public class BlockletDataMapFactory implements DataMapFactory {
       }
       segmentMap.put(segmentId, tableBlockIndexUniqueIdentifiers);
     }
+    return tableBlockIndexUniqueIdentifiers;
+  }
 
-    return cache.getAll(tableBlockIndexUniqueIdentifiers);
+  /**
+   * Get the blocklet detail information based on blockletid, blockid and segmentid. This
method is
+   * exclusively for BlockletDataMapFactory as detail information is only available in this
default
+   * datamap.
+   */
+  @Override
+  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets,
String segmentId)
+      throws IOException {
+    List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
+    // If it is already detailed blocklet then type cast and return same
+    if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet)
{
+      for (Blocklet blocklet : blocklets) {
+        detailedBlocklets.add((ExtendedBlocklet) blocklet);
+      }
+      return detailedBlocklets;
+    }
+    List<TableBlockIndexUniqueIdentifier> identifiers =
+        getTableBlockIndexUniqueIdentifiers(segmentId);
+    // Retrieve each blocklets detail information from blocklet datamap
+    for (Blocklet blocklet : blocklets) {
+      detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
+    }
+    return detailedBlocklets;
+  }
+
+  @Override
+  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId)
+      throws IOException {
+    if (blocklet instanceof ExtendedBlocklet) {
+      return (ExtendedBlocklet) blocklet;
+    }
+    List<TableBlockIndexUniqueIdentifier> identifiers =
+        getTableBlockIndexUniqueIdentifiers(segmentId);
+    return getExtendedBlocklet(identifiers, blocklet);
+  }
+
+  private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier>
identifiers,
+      Blocklet blocklet) throws IOException {
+    String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
+    for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
+      if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) {
+        DataMap dataMap = cache.get(identifier);
+        return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+      }
+    }
+    throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
   }
 
   private CarbonFile[] getCarbonIndexFiles(String segmentId) {
@@ -131,7 +189,7 @@ public class BlockletDataMapFactory implements DataMapFactory {
 
   @Override
   public void clear() {
-    for (String segmentId: segmentMap.keySet().toArray(new String[segmentMap.size()])) {
+    for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
       clear(segmentId);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 55d22d1..314ffd5 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -21,12 +21,18 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
-import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
@@ -533,7 +539,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
         .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME,
             BlockletDataMapFactory.class.getName());
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
-    List<Blocklet> prunedBlocklets;
+    List<ExtendedBlocklet> prunedBlocklets;
     if (dataMapJob != null) {
       DistributableDataMapFormat datamapDstr =
           new DistributableDataMapFormat(absoluteTableIdentifier, BlockletDataMap.NAME,
@@ -549,7 +555,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     if (partitionInfo != null) {
       partitionIdList = partitionInfo.getPartitionIds();
     }
-    for (Blocklet blocklet : prunedBlocklets) {
+    for (ExtendedBlocklet blocklet : prunedBlocklets) {
       int partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
           CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString()));
 
@@ -580,7 +586,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     return resultFilterredBlocks;
   }
 
-  private org.apache.carbondata.hadoop.CarbonInputSplit convertToCarbonInputSplit(Blocklet
blocklet)
+  private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet)
       throws IOException {
     blocklet.updateLocations();
     org.apache.carbondata.hadoop.CarbonInputSplit split =
@@ -715,9 +721,10 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
         new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     Map<String, Long> segmentAndBlockCountMapping =
         new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<Blocklet> blocklets = blockletMap.prune(validAndInvalidSegments.getValidSegments(),
null);
-    for (Blocklet blocklet : blocklets) {
-      String blockName = blocklet.getPath().toString();
+    List<ExtendedBlocklet> blocklets =
+        blockletMap.prune(validAndInvalidSegments.getValidSegments(), null);
+    for (ExtendedBlocklet blocklet : blocklets) {
+      String blockName = blocklet.getPath();
       blockName = CarbonTablePath.getCarbonDataFileName(blockName);
       blockName = blockName + CarbonTablePath.getCarbonDataExtension();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
index e33c356..fad2336 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.hadoop.api;
 import java.io.Serializable;
 import java.util.List;
 
-import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
  */
 public interface DataMapJob extends Serializable {
 
-  List<Blocklet> execute(DistributableDataMapFormat dataMapFormat, FilterResolverIntf
resolverIntf);
+  List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
+      FilterResolverIntf resolverIntf);
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 653e33f..66a06ba 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
-import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 /**
  * Input format for datamaps, it makes the datamap pruning distributable.
  */
-public class DistributableDataMapFormat extends FileInputFormat<Void, Blocklet> implements
+public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBlocklet>
implements
     Serializable {
 
   private static final String FILTER_EXP = "mapreduce.input.distributed.datamap.filter";
@@ -89,11 +89,11 @@ public class DistributableDataMapFormat extends FileInputFormat<Void,
Blocklet>
   }
 
   @Override
-  public RecordReader<Void, Blocklet> createRecordReader(InputSplit inputSplit,
+  public RecordReader<Void, ExtendedBlocklet> createRecordReader(InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-    return new RecordReader<Void, Blocklet>() {
-      private Iterator<Blocklet> blockletIterator;
-      private Blocklet currBlocklet;
+    return new RecordReader<Void, ExtendedBlocklet>() {
+      private Iterator<ExtendedBlocklet> blockletIterator;
+      private ExtendedBlocklet currBlocklet;
 
       @Override
       public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
@@ -124,7 +124,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void,
Blocklet>
       }
 
       @Override
-      public Blocklet getCurrentValue() throws IOException, InterruptedException {
+      public ExtendedBlocklet getCurrentValue() throws IOException, InterruptedException
{
         return currBlocklet;
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28f78b2f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index f9e4f8d..fbe9377 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -29,7 +29,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledExcepti
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
 import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
 
@@ -39,7 +39,7 @@ import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
 class SparkDataMapJob extends DataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat,
-      resolverIntf: FilterResolverIntf): util.List[Blocklet] = {
+      resolverIntf: FilterResolverIntf): util.List[ExtendedBlocklet] = {
     new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, resolverIntf).collect().toList
       .asJava
   }
@@ -60,7 +60,7 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
exte
 class DataMapPruneRDD(sc: SparkContext,
     dataMapFormat: DistributableDataMapFormat,
     resolverIntf: FilterResolverIntf)
-  extends CarbonRDD[(Blocklet)](sc, Nil) {
+  extends CarbonRDD[(ExtendedBlocklet)](sc, Nil) {
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -68,7 +68,7 @@ class DataMapPruneRDD(sc: SparkContext,
   }
 
   override def internalCompute(split: Partition,
-      context: TaskContext): Iterator[Blocklet] = {
+      context: TaskContext): Iterator[ExtendedBlocklet] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
@@ -77,7 +77,7 @@ class DataMapPruneRDD(sc: SparkContext,
     DistributableDataMapFormat.setFilterExp(attemptContext.getConfiguration, resolverIntf)
     val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
     reader.initialize(inputSplit, attemptContext)
-    val iter = new Iterator[Blocklet] {
+    val iter = new Iterator[ExtendedBlocklet] {
 
       private var havePair = false
       private var finished = false
@@ -93,7 +93,7 @@ class DataMapPruneRDD(sc: SparkContext,
         !finished
       }
 
-      override def next(): Blocklet = {
+      override def next(): ExtendedBlocklet = {
         if (!hasNext) {
           throw new java.util.NoSuchElementException("End of stream")
         }


Mime
View raw message