carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xubo...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3829] Support pagination in SDK reader
Date Thu, 23 Jul 2020 14:34:47 GMT
This is an automated email from the ASF dual-hosted git repository.

xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ce2095  [CARBONDATA-3829] Support pagination in SDK reader
1ce2095 is described below

commit 1ce20951828b388022fb6a1aae8113b750f47cfa
Author: ajantha-bhat <ajanthabhat@gmail.com>
AuthorDate: Mon Apr 13 11:52:18 2020 +0530

    [CARBONDATA-3829] Support pagination in SDK reader
    
    Why is this PR needed?
    
    Please refer the design document attached in the JIRA.
    Carbondata SDK now currently doesn't support pagination.
    
    What changes were proposed in this PR?
    
    a) Support pagination from java SDK with LRU cache support.
    b) Support pagination in python SDK by calling JAVA SDK.
    
    Does this PR introduce any user interface change?
    
    No [Added new interfaces]
    Is any new testcase added?
    
    Yes
    
    This closes #3770
---
 .../core/constants/CarbonCommonConstants.java      |  12 +
 docs/configuration-parameters.md                   |   1 +
 docs/sdk-guide.md                                  |  53 +++-
 python/pycarbon/sdk/PaginationCarbonReader.py      |  57 ++++
 .../pycarbon/tests/sdk/test_read_write_carbon.py   |  51 +++-
 .../apache/carbondata/sdk/file/CarbonReader.java   |   4 +
 .../carbondata/sdk/file/CarbonReaderBuilder.java   |  76 +++++-
 .../sdk/file/PaginationCarbonReader.java           | 302 +++++++++++++++++++++
 .../carbondata/sdk/file/cache/BlockletRows.java    |  69 +++++
 .../sdk/file/PaginationCarbonReaderTest.java       | 221 +++++++++++++++
 10 files changed, 828 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 2e48a7a..52bea59 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1301,6 +1301,18 @@ public final class CarbonCommonConstants {
    */
   public static final String MIN_MAX_DEFAULT_VALUE = "true";
 
+  /**
+   * max SDK pagination lru cache size in MB upto which lru cache will be loaded in memory
+   */
+  @CarbonProperty
+  public static final String CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB =
+      "carbon.max.pagination.lru.cache.size.in.mb";
+
+  /**
+   * max SDK lru cache size default value in MB
+   */
+  public static final String CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT = "-1";
+
   @CarbonProperty(dynamicConfigurable = true)
   public static final String ENABLE_VECTOR_READER = "carbon.enable.vector.reader";
 
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 7e4e153..3e4f8bd 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -143,6 +143,7 @@ This section provides the details of all the configurations required for
the Car
 | carbon.load.all.segment.indexes.to.cache | true | Setting this configuration to false,
will prune and load only matched segment indexes to cache using segment metadata information
such as columnid and it's minmax values, which decreases the usage of driver memory.  |
 | carbon.secondary.index.creation.threads | 1 | Specifies the number of threads to concurrently
process segments during secondary index creation. This property helps fine tuning the system
when there are a lot of segments in a table. The value range is 1 to 50. |
 | carbon.si.lookup.partialstring | true | When true, it includes starts with, ends with and
contains. When false, it includes only starts with secondary indexes. |
+| carbon.max.pagination.lru.cache.size.in.mb | -1 | Maximum memory **(in MB)** upto which
the SDK pagination reader can cache the blocklet rows. Suggest to configure as multiple of
blocklet size. Default value of -1 means there is no memory limit for caching. Only integer
values greater than 0 are accepted. |
 
 ## Data Mutation Configuration
 | Parameter | Default Value | Description |
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 8949fd0..73aed3b 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -611,11 +611,16 @@ while (reader.hasNext()) {
 reader.close();
 ```
 
-Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java)
in the CarbonData repo.
+1. Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java)
in the CarbonData repo.
 
-SDK reader also supports reading carbondata files and filling it to apache arrow vectors.
+2. SDK reader also supports reading carbondata files and filling it to apache arrow vectors.
 Find example code at [ArrowCarbonReaderTest](https://github.com/apache/carbondata/blob/master/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ArrowCarbonReaderTest.java)
in the CarbonData repo.
 
+3. SDK reader also support reading data with pagination support.
+Find example code at  [PaginationCarbonReaderTest](https://github.com/apache/carbondata/blob/master/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/PaginationCarbonReaderTest.java)
in the CarbonData repo.
+
+Note: For pagination reader configure the LRU cache size as multiple of blocklet size of
carbon files to be read.
+Refer "carbon.max.pagination.lru.cache.size.in.mb" from [Configuring CarbonData](https://github.com/apache/carbondata/blob/master/docs/configuration-parameters.md)
 
 ## API List
 
@@ -766,6 +771,41 @@ public VectorSchemaRoot getArrowVectors() throws IOException;
 public static ArrowRecordBatch byteArrayToArrowBatch(byte[] batchBytes, BufferAllocator bufferAllocator)
throws IOException;
 ```
 
+### Class org.apache.carbondata.sdk.file.PaginationCarbonReader
+```
+/**
+* Pagination query with from and to range.
+*
+* @param fromRowNumber must be greater than 0 (as row id starts from 1)
+*                      and less than or equals to toRowNumber
+* @param toRowNumber must be greater than 0 (as row id starts from 1)
+*                and greater than or equals to fromRowNumber and should not cross the total
rows count
+* @return array of rows between fromRowNumber and toRowNumber (inclusive)
+* @throws Exception
+*/
+public Object[] read(long fromRowNumber, long toRowNumber) throws IOException, InterruptedException;
+```
+
+```
+/**
+* Get total rows in the folder or a list of CarbonData files.
+* It is based on the snapshot of files taken while building the reader.
+*
+* @return total rows from all the files in the reader.
+*/
+public long getTotalRows();
+```
+
+```
+/**
+* Closes the pagination reader, drops the cache and snapshot.
+* Need to build reader again if the files need to be read again.
+* call this when the all pagination queries are finished and can the drop cache.
+*
+* @throws IOException
+*/
+public void close() throws IOException;
+```
 ### Class org.apache.carbondata.sdk.file.CarbonReaderBuilder
 ```
 /**
@@ -827,6 +867,15 @@ public CarbonReaderBuilder withHadoopConf(Configuration conf);
  */
 public CarbonReaderBuilder withHadoopConf(String key, String value);
 ```
+
+```  
+/**
+* If pagination reader is required then set builder for pagination support.
+* 
+* @return CarbonReaderBuilder, current object with updated configuration.
+*/
+public CarbonReaderBuilder withPaginationSupport(); 
+```  
   
 ```
 /**
diff --git a/python/pycarbon/sdk/PaginationCarbonReader.py b/python/pycarbon/sdk/PaginationCarbonReader.py
new file mode 100644
index 0000000..1cd3eec
--- /dev/null
+++ b/python/pycarbon/sdk/PaginationCarbonReader.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class PaginationCarbonReader(object):
+  def __init__(self):
+    from jnius import autoclass
+    self.readerClass = autoclass('org.apache.carbondata.sdk.file.PaginationCarbonReader')
+
+  def builder(self, path, table_name):
+    self.PaginationCarbonReaderBuilder = self.readerClass.builder(path, table_name)
+    return self
+
+  def projection(self, projection_list):
+    self.PaginationCarbonReaderBuilder.projection(projection_list)
+    return self
+
+  def withHadoopConf(self, key, value):
+    if "fs.s3a.access.key" == key:
+      self.ak = value
+    elif "fs.s3a.secret.key" == key:
+      self.sk = value
+    elif "fs.s3a.endpoint" == key:
+      self.end_point = value
+    elif "fs.s3a.proxy.host" == key:
+      self.host = value
+    elif "fs.s3a.proxy.port" == key:
+      self.port = value
+
+    self.PaginationCarbonReaderBuilder.withHadoopConf(key, value)
+    return self
+
+  def build(self):
+    self.reader = self.PaginationCarbonReaderBuilder.buildPaginationReader()
+    return self.reader
+
+  def read(self, from_index, to_index):
+    return self.reader.read(from_index, to_index)
+
+  def getTotalRowsAsString(self):
+    return self.reader.getTotalRowsAsString()
+
+  def close(self):
+    return self.reader.close()
+
diff --git a/python/pycarbon/tests/sdk/test_read_write_carbon.py b/python/pycarbon/tests/sdk/test_read_write_carbon.py
index 5585200..cbd0083 100644
--- a/python/pycarbon/tests/sdk/test_read_write_carbon.py
+++ b/python/pycarbon/tests/sdk/test_read_write_carbon.py
@@ -16,6 +16,7 @@
 import pytest
 
 from pycarbon.sdk.CarbonReader import CarbonReader
+from pycarbon.sdk.PaginationCarbonReader import PaginationCarbonReader
 from pycarbon.sdk.CarbonSchemaReader import CarbonSchemaReader
 from pycarbon.sdk.CarbonWriter import CarbonWriter
 
@@ -25,7 +26,7 @@ import shutil
 import os
 import jnius_config
 
-jnius_config.set_classpath("../../../sdk/sdk/target/carbondata-sdk.jar")
+jnius_config.set_classpath("../../../../sdk/sdk/target/carbondata-sdk.jar")
 IMAGE_DATA_PATH = "./resources"
 
 def test_run_write_carbon():
@@ -466,3 +467,51 @@ def test_run_write_carbon_binary_base64_encode_decodeInJava_many_files():
   reader.close()
 
   shutil.rmtree(path)
+
+
+def test_pagination_carbon_reader():
+  jsonSchema = "[{stringField:string},{shortField:short},{intField:int}]"
+  path = "/tmp/data/writeCarbon" + str(time.time())
+
+  if os.path.exists(path):
+    shutil.rmtree(path)
+
+  writer = CarbonWriter() \
+    .builder() \
+    .outputPath(path) \
+    .withCsvInput(jsonSchema) \
+    .writtenBy("pycarbon") \
+    .build()
+  for i in range(1, 32001):
+    from jnius import autoclass
+    arrayListClass = autoclass("java.util.ArrayList")
+    data_list = arrayListClass()
+    data_list.add("pycarbon")
+    data_list.add(str(i))
+    data_list.add(str(i * 10))
+    writer.write(data_list.toArray())
+  writer.close()
+
+  # build the pagination reader
+  reader_builder = PaginationCarbonReader().builder(path, "temp")
+  reader = reader_builder.build()
+  # read the rows
+  rows = reader.read(100, 200)
+  for k in range(100, 201):
+    val = rows[k - 100][1]
+    assert val == k
+
+  # read the rows
+  rows = reader.read(31000, 31200)
+  for k in range(31000, 31201):
+    val = rows[k - 31000][1]
+    assert val == k
+
+  # check the total row count
+  assert int(reader.getTotalRowsAsString()) == 32000
+
+  # close the reader
+  reader.close()
+
+if __name__ == '__main__':
+    test_pagination_carbon_reader()
\ No newline at end of file
diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index 204c36d..4e24a63 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -57,6 +57,10 @@ public class CarbonReader<T> {
    * Call {@link #builder(String)} to construct an instance
    */
   CarbonReader(List<RecordReader<Void, T>> readers) {
+    if (readers == null) {
+      // In case of pagination, initialize with no reader
+      return;
+    }
     this.initialise = true;
     this.readers = readers;
     this.index = 0;
diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 69f3bc6..b2ceb0c 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -67,6 +67,7 @@ public class CarbonReaderBuilder {
   private boolean useVectorReader = true;
   private InputSplit inputSplit;
   private boolean useArrowReader;
+  private boolean usePaginationReader;
   private List fileLists;
   private Class<? extends CarbonReadSupport> readSupportClass;
 
@@ -235,6 +236,15 @@ public class CarbonReaderBuilder {
   }
 
   /**
+   * To set the input split before build() to build the reader with the specified input split.
+   *
+   * @param inputSplit CarbonInputSplit
+   */
+  void setInputSplit(InputSplit inputSplit) {
+    this.inputSplit = inputSplit;
+  }
+
+  /**
    * build Arrow carbon reader
    *
    * @param <T>
@@ -247,6 +257,31 @@ public class CarbonReaderBuilder {
     return (ArrowCarbonReader<T>) this.build();
   }
 
+  /**
+   * If pagination reader is required then set builder for pagination support.
+   *
+   * @return CarbonReaderBuilder, current object with updated configuration.
+   */
+  public CarbonReaderBuilder withPaginationSupport() {
+    usePaginationReader = true;
+    return this;
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot use build() which returns superclass.
+   * Casting is not possible from python for java objects.
+   *
+   * If pagination reader is required then set builder for pagination support.
+   *
+   * @return CarbonReaderBuilder, current object with updated configuration.
+   */
+  public <T> PaginationCarbonReader<T> buildPaginationReader()
+      throws IOException, InterruptedException {
+    usePaginationReader = true;
+    return (PaginationCarbonReader<T>) this.build();
+  }
+
   private CarbonFileInputFormat prepareFileInputFormat(Job job, boolean enableBlockletDistribution,
       boolean disableLoadBlockIndex) throws IOException {
     if (inputSplit != null && inputSplit instanceof CarbonInputSplit) {
@@ -358,25 +393,36 @@ public class CarbonReaderBuilder {
     }
     CarbonTableInputFormat.setCarbonReadSupport(hadoopConf, readSupportClass);
     final Job job = new Job(new JobConf(hadoopConf));
-    CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
+    CarbonFileInputFormat format = null;
     try {
-      List<InputSplit> splits =
-          format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
-      List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
-      for (InputSplit split : splits) {
-        RecordReader reader = getRecordReader(job, format, readers, split);
-        readers.add(reader);
-      }
-      if (useArrowReader) {
-        return new ArrowCarbonReader<>(readers);
-      } else {
+      if (!usePaginationReader) {
+        // block level dummy splits without IO and loading the cache (if filter not present)
+        format = prepareFileInputFormat(job, false, true);
+        List<InputSplit> splits =
+            format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
+        List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
+        for (InputSplit split : splits) {
+          RecordReader reader = getRecordReader(job, format, readers, split);
+          readers.add(reader);
+        }
+        if (useArrowReader) {
+          return new ArrowCarbonReader<>(readers);
+        }
         return new CarbonReader<>(readers);
+      } else {
+        // blocklet level splits formed by reading footer and loading the cache
+        format = prepareFileInputFormat(job, true, false);
+        List<InputSplit> splits =
+            format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
+        return new PaginationCarbonReader(splits, this);
       }
     } catch (Exception ex) {
-      // Clear the index cache as it can get added in getSplits() method
-      IndexStoreManager.getInstance().clearIndexCache(
-          format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier(),
-          false);
+      if (format != null) {
+        // Clear the index cache as it can get added in getSplits() method
+        IndexStoreManager.getInstance().clearIndexCache(
+            format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier(),
+            false);
+      }
       throw ex;
     }
   }
diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
new file mode 100644
index 0000000..83f7acd
--- /dev/null
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder)
{
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the
query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = 0;
+    for (InputSplit split : splits) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) split).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param fromRowNumber must be greater than 0 (as row id starts from 1)
+   *                      and less than or equals to toRowNumber
+   * @param toRowNumber must be greater than 0 (as row id starts from 1)
+   *                and greater than or equals to fromRowNumber
+   *                and should not cross the total rows count
+   * @return array of rows between fromRowNumber and toRowNumber (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long fromRowNumber, long toRowNumber)
+      throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (fromRowNumber < 1) {
+      throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than
1");
+    }
+    if (fromRowNumber > toRowNumber) {
+      throw new IllegalArgumentException(
+          "from row id:" + fromRowNumber + " is greater than to row id:" + toRowNumber);
+    }
+    if (toRowNumber > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + toRowNumber + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(fromRowNumber, toRowNumber);
+  }
+
+  /**
+   * Get total rows in the folder or a list of CarbonData files.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder or a list of CarbonData files.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      // offset the one index shifted and get absolute value of it.
+      index = Math.abs(index + 1);
+    }
+    return index;
+  }
+
+  private Range getBlockletIndexRange(long fromRowNumber, long toRowNumber) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, toRowNumber);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), fromRowNumber);
+    return new Range(lowerBound, upperBound);
+  }
+
+  private Object[] getRows(long fromRowNumber, long toRowNumber)
+      throws IOException, InterruptedException {
+    int rowCount = 0;
+    Object[] rows = new Object[(int)(toRowNumber - fromRowNumber + 1)];
+    // get the matching split index (blocklets) range for the input range.
+    Range blockletIndexRange = getBlockletIndexRange(fromRowNumber, toRowNumber);
+    for (int i = blockletIndexRange.getFrom(); i <= blockletIndexRange.getTo(); i++) {
+      String blockletUniqueId = String.valueOf(i);
+      BlockletRows blockletRows;
+      if (cache.get(blockletUniqueId) != null) {
+        blockletRows = (BlockletRows)cache.get(blockletUniqueId);
+      } else {
+        BlockletDetailInfo detailInfo =
+            ((CarbonInputSplit) allBlockletSplits.get(i)).getDetailInfo();
+        int rowCountInBlocklet = detailInfo.getRowCount();
+        Object[] rowsInBlocklet = new Object[rowCountInBlocklet];
+        // read the rows from the blocklet
+        // TODO: read blocklets in multi-thread if there is a performance requirement.
+        readerBuilder.setInputSplit(allBlockletSplits.get(i));
+        CarbonReader<Object> carbonReader = readerBuilder.build();
+        int count = 0;
+        while (carbonReader.hasNext()) {
+          rowsInBlocklet[count++] = carbonReader.readNextRow();
+        }
+        carbonReader.close();
+        long fromRowId;
+        if (i == 0) {
+          fromRowId = 1;
+        } else {
+          // previous index will contain the sum of rows till previous blocklet.
+          fromRowId = rowCountInSplits.get(i - 1) + 1;
+        }
+        blockletRows = new BlockletRows(fromRowId, detailInfo.getBlockSize(), rowsInBlocklet);
+        // add entry to cache with no expiry time
+        // key: unique blocklet id
+        // value: BlockletRows
+        cache.put(String.valueOf(i), blockletRows, blockletRows.getMemorySize(), Integer.MAX_VALUE);
+      }
+      long fromBlockletRow = blockletRows.getRowIdStartIndex();
+      long toBlockletRow = fromBlockletRow + blockletRows.getRowsCount();
+      Object[] rowsInBlocklet = blockletRows.getRows();
+      if (toRowNumber > toBlockletRow) {
+        if (fromRowNumber >= fromBlockletRow) {
+          // only fromRowNumber lies in this blocklet,
+          // read from fromRowNumber to end of the blocklet.
+          // -1 because row id starts form 0
+          int start = (int) (fromRowNumber - blockletRows.getRowIdStartIndex());
+          int end = blockletRows.getRowsCount();
+          while (start < end) {
+            rows[rowCount++] = rowsInBlocklet[start++];
+          }
+        } else {
+          // both fromRowNumber and toRowNumber doesn't lie in this blocklet.
+          // Read the whole blocklet.
+          System.arraycopy(rowsInBlocklet, 0, rows, rowCount, rowsInBlocklet.length);
+          rowCount += rowsInBlocklet.length;
+        }
+      } else {
+        if (fromRowNumber >= fromBlockletRow) {
+          // both fromRowNumber and toRowNumber exist in this blocklet itself.
+          // prune it and fill the results.
+          int start = (int) (fromRowNumber - blockletRows.getRowIdStartIndex());
+          int end = (int) (start + (toRowNumber + 1 - fromRowNumber));
+          while (start < end) {
+            rows[rowCount++] = rowsInBlocklet[start++];
+          }
+        } else {
+          // toRowNumber lies in this blocklet. Read from Starting of blocklet to toRowNumber.
+          int start = 0;
+          int end = (int) (toRowNumber + 1 - blockletRows.getRowIdStartIndex());
+          while (start < end) {
+            rows[rowCount++] = rowsInBlocklet[start++];
+          }
+        }
+      }
+    }
+    return rows;
+  }
+
+  @Override
+  public boolean hasNext() throws IOException, InterruptedException {
+    throw new UnsupportedOperationException(
+        "cannot support this operation with pagination");
+  }
+
+  @Override
+  public T readNextRow() throws IOException, InterruptedException {
+    throw new UnsupportedOperationException(
+        "cannot support this operation with pagination");
+  }
+
+  @Override
+  public Object[] readNextBatchRow() throws Exception {
+    throw new UnsupportedOperationException(
+        "cannot support this operation with pagination");
+  }
+
+  @Override
+  public List<CarbonReader> split(int maxSplits) {
+    throw new UnsupportedOperationException(
+        "cannot support this operation with pagination");
+  }
+
+  /**
+   * Closes the pagination reader, drops the cache and snapshot.
+   * Need to build reader again if the files need to be read again.
+   * call this when the all pagination queries are finished and can the drop cache.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is already closed");
+    }
+    cache.clear();
+    rowCountInSplits = null;
+    allBlockletSplits = null;
+    isClosed = true;
+  }
+
+  private class Range {
+    private int from;
+    private int to;
+
+    Range(int from, int to) {
+      this.from = from;
+      this.to = to;
+    }
+
+    public int getFrom() {
+      return from;
+    }
+
+    public int getTo() {
+      return to;
+    }
+  }
+
+}
+
diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/cache/BlockletRows.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/cache/BlockletRows.java
new file mode 100644
index 0000000..9d361e2
--- /dev/null
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/cache/BlockletRows.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file.cache;
+
+import org.apache.carbondata.core.cache.Cacheable;
+
+/**
+ * A Cacheable object containing all the rows in a blocklet
+ */
+public class BlockletRows implements Cacheable {
+
+  // rows in this blocklet
+  private Object[] rows;
+
+  // this is considering other bloklets. This index represents total rows till previous blocklet.
+  private long rowIdStartIndex;
+
+  // size of blocklet in bytes
+  private long blockletSizeInBytes;
+
+  public BlockletRows(long rowIdStartIndex, long blockletSizeInBytes, Object[] rows) {
+    this.rowIdStartIndex = rowIdStartIndex;
+    this.blockletSizeInBytes = blockletSizeInBytes;
+    this.rows = rows;
+  }
+
+  @Override
+  public int getAccessCount() {
+    return 0;
+  }
+
+  @Override
+  public long getMemorySize() {
+    return blockletSizeInBytes;
+  }
+
+  @Override
+  public void invalidate() {
+    // TODO: not applicable ?
+  }
+
+  public Object[] getRows() {
+    return rows;
+  }
+
+  public int getRowsCount() {
+    return rows.length;
+  }
+
+  public long getRowIdStartIndex() {
+    return rowIdStartIndex;
+  }
+
+}
\ No newline at end of file
diff --git a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/PaginationCarbonReaderTest.java
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/PaginationCarbonReaderTest.java
new file mode 100644
index 0000000..e1f30fd
--- /dev/null
+++ b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/PaginationCarbonReaderTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.Field;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link CSVCarbonWriter}
+ */
+public class PaginationCarbonReaderTest {
+
+  @Test
+  public void testMultipleBlocklet() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    // create more than one blocklet
+    TestUtil.writeFilesAndVerify(1000 * 3000, new Schema(fields), path, null, 1, 2);
+    // configure cache size = 8 blocklet
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, "8");
+    try {
+      CarbonReaderBuilder carbonReaderBuilder =
+          new CarbonReaderBuilder(path, "temptest")
+              .withPaginationSupport();
+      PaginationCarbonReader<Object> paginationCarbonReader =
+          (PaginationCarbonReader<Object>) carbonReaderBuilder.build();
+      assert(paginationCarbonReader.getTotalRows() == 3000000);
+      Object[] rows;
+      // test query random range
+      rows = paginationCarbonReader.read(100, 300);
+      assert(rows.length == 201);
+      rows = paginationCarbonReader.read(21, 1000000);
+      assert(rows.length == 999980);
+      rows = paginationCarbonReader.read(1000001, 3000000);
+      assert(rows.length == 2000000);
+      // test case creates 8 blocklets and total rows are split as shown below
+      //      0 = [1 - 416000]
+      //      1 = [416001 - 832000]
+      //      2 = [832001 - 1248000]
+      //      3 = [1248001 - 1664000]
+      //      4 = [1664001 - 2080000]
+      //      5 = [2080001 - 2496000]
+      //      6 = [2496001 - 2912000]
+      //      7 = [2912001 - 3000000]
+      // so test for all combination of ranges
+      // 1. from resides in one blocklet, to resides in another blocklet
+      // a. from and to exist beside each other (from - 0, to - 1)
+      rows = paginationCarbonReader.read(415999, 830000);
+      assert(rows.length == 414002);
+      // b. from and to exit with some gap (from - 0, to - 3)
+      rows = paginationCarbonReader.read(1, 1248005);
+      assert(rows.length == 1248005);
+      // 2. from and to resides in the same blocklet
+      // a. whole blocklet
+      rows = paginationCarbonReader.read(2496001, 2912000);
+      assert(rows.length == 416000);
+      // b. some rows in blocklet
+      rows = paginationCarbonReader.read(2912101, 2912301);
+      assert(rows.length == 201);
+      // read one row
+      rows = paginationCarbonReader.read(10, 10);
+      assert(rows.length == 1);
+      // test negative scenario inputs
+      try {
+        rows = paginationCarbonReader.read(-1, 2);
+        // fail the test if read is successful for invalid arguments
+        assert(false);
+        rows = paginationCarbonReader.read(1, -2);
+        assert(false);
+        rows = paginationCarbonReader.read(0, 100);
+        assert(false);
+        rows = paginationCarbonReader.read(1, 3000001);
+        assert(false);
+        rows = paginationCarbonReader.read(100, 10);
+        assert(false);
+      } catch (Exception ex) {
+        // nothing to do, expected to throw exception for negative inputs.
+      }
+      // close the reader
+      paginationCarbonReader.close();
+      // test read after closing
+      try {
+        rows = paginationCarbonReader.read(10, 100);
+        assert(false);
+      } catch (Exception ex) {
+        // nothing to do, expected to throw exception for negative scenario.
+      }
+    } catch (Exception ex) {
+      Assert.fail(ex.getMessage());
+    }
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testDataCorrectness() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("data", DataTypes.VARCHAR);
+    fields[2] = new Field("id", DataTypes.LONG);
+
+
+    String data = RandomStringUtils.randomAlphabetic(10 * 1024);
+    // create more than one blocklet
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .outputPath(path).withBlockletSize(1).withBlockSize(2).withTableProperty("local_dictionary_enable",
"false");
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("TestUtil").build();
+      for (int i = 1; i <= 100000; i++) {
+        writer.write(new String[]{ "robot" + i, data , String.valueOf(i)});
+      }
+      writer.close();
+    } catch (Exception ex) {
+      assert(false);
+    }
+
+    // configure cache size = 4 blocklet
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, "4");
+    try {
+      CarbonReaderBuilder carbonReaderBuilder =
+          new CarbonReaderBuilder(path, "temptest")
+              .withPaginationSupport();
+      PaginationCarbonReader<Object> paginationCarbonReader =
+          (PaginationCarbonReader<Object>) carbonReaderBuilder.build();
+      assert(paginationCarbonReader.getTotalRows() == 100000);
+      // test case creates 4 blocklets and total rows are split as shown below
+      //      0 = 1 - 32000
+      //      1 = 32001 - 64000
+      //      2 = 64001 - 96000
+      //      3 = 96001 - 100000
+      Object[] rows;
+      // so test for all combination of ranges
+      // 1. from resides in one blocklet, to resides in another blocklet
+      // a. from and to exist beside each other (from - 0, to - 1)
+      rows = paginationCarbonReader.read(31999, 32005);
+      assert(rows.length == 7); // note length is (from - to + 1)
+      int index = 31999;
+      for (Object row : rows) {
+        // verify the result
+        assert (((Object [])row)[0].equals("robot" + (index)));
+        index++;
+      }
+      // b. from and to exit with some gap (from - 0, to - 3)
+      rows = paginationCarbonReader.read(31999, 64005);
+      assert(rows.length == 32007); // (from - to + 1)
+      index = 31999;
+      for (Object row : rows) {
+        // verify the result
+        assert (((Object [])row)[0].equals("robot" + (index)));
+        index++;
+      }
+      // 2. from and to resides in the same blocklet
+      // a. whole blocklet
+      rows = paginationCarbonReader.read(64001, 96000);
+      assert(rows.length == 32000); // (from - to + 1)
+      index = 64001;
+      for (Object row : rows) {
+        // verify the result
+        assert (((Object [])row)[0].equals("robot" + (index)));
+        index++;
+      }
+      // b. some rows in blocklet
+      rows = paginationCarbonReader.read(100, 300);
+      assert(rows.length == 201); // (from - to + 1)
+      index = 100;
+      for (Object row : rows) {
+        // verify the result
+        assert (((Object [])row)[0].equals("robot" + (index)));
+        index++;
+      }
+      // read one row
+      rows = paginationCarbonReader.read(10, 10);
+      assert(rows.length == 1); // (from - to + 1)
+      index = 10;
+      for (Object row : rows) {
+        // verify the result
+        assert (((Object [])row)[0].equals("robot" + (index)));
+        index++;
+      }
+      // close the reader
+      paginationCarbonReader.close();
+    } catch (Exception ex) {
+      Assert.fail(ex.getMessage());
+    }
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+}


Mime
View raw message