kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: KUDU-2670: Part 1: Build ScanToken by KeyRange
Date Fri, 24 May 2019 05:16:25 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 22a6faa  KUDU-2670: Part 1: Build ScanToken by KeyRange
22a6faa is described below

commit 22a6faa44364dec3a171ec79c15b814ad9277d8f
Author: oclarms <oclarms@gmail.com>
AuthorDate: Thu Jan 31 11:41:13 2019 +0800

    KUDU-2670: Part 1: Build ScanToken by KeyRange
    
    When reading data in a kudu table using spark,
    if there is a large amount of data in the tablet,
    reading the data takes a long time. The reason
    is that KuduRDD uses a tablet to generate the
    scanToken, so a spark task needs to process all
    the data in a tablet.
    
    We send SplitKeyRange RPC to TServer, split tablet's
    primary key range into multiple primary key ranges
    by size, and generate the scanToken by primary key ranges.
    
    Change-Id: I0502f5d64569e8b1d45e88de3cb36aa2e01234d0
    Reviewed-on: http://gerrit.cloudera.org:8080/12323
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Adar Dembo <adar@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    | 134 +++++++++++++++-
 .../org/apache/kudu/client/AsyncKuduScanner.java   |  10 ++
 .../main/java/org/apache/kudu/client/KeyRange.java | 103 +++++++++++++
 .../java/org/apache/kudu/client/KuduScanToken.java |  55 +++++--
 .../apache/kudu/client/SplitKeyRangeRequest.java   | 119 ++++++++++++++
 .../apache/kudu/client/SplitKeyRangeResponse.java  |  46 ++++++
 .../org/apache/kudu/client/TestSplitKeyRange.java  | 171 +++++++++++++++++++++
 .../org/apache/kudu/spark/kudu/DefaultSource.scala |   5 +-
 .../scala/org/apache/kudu/spark/kudu/KuduRDD.scala |   4 +
 .../apache/kudu/spark/kudu/KuduReadOptions.scala   |   6 +-
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala |  24 +++
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala |  39 +++++
 .../java/org/apache/kudu/test/ClientTestUtil.java  |  43 ++++++
 13 files changed, 743 insertions(+), 16 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 1c909bf..51d65f9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1941,7 +1941,139 @@ public class AsyncKuduClient implements AutoCloseable {
                            endPartitionKey,
                            fetchBatchSize,
                            ret,
-        timeoutTracker);
+                           timeoutTracker);
+  }
+
+  /**
+   * Sends a splitKeyRange RPC to split the tablet's primary key range into smaller ranges.
+   * This RPC doesn't change the layout of the tablet.
+   * @param table table to lookup
+   * @param startPrimaryKey the primary key to begin splitting at (inclusive), pass null
to
+   *                        start splitting at the beginning of the tablet
+   * @param endPrimaryKey the primary key to stop splitting at (exclusive), pass null to
+   *                      stop splitting at the end of the tablet
+   * @param partitionKey the partition key of the tablet to find
+   * @param splitSizeBytes the size of the data in each key range.
+   *                       This is a hint: The tablet server may return a key range
+   *                       larger or smaller than this value.
+   * @param parentRpc RPC that prompted the split key range request, can be null
+   * @return Deferred to track the progress
+   */
+  private Deferred<SplitKeyRangeResponse> getTabletKeyRanges(final KuduTable table,
+                                                             final byte[] startPrimaryKey,
+                                                             final byte[] endPrimaryKey,
+                                                             final byte[] partitionKey,
+                                                             long splitSizeBytes,
+                                                             KuduRpc<?> parentRpc)
{
+    long timeoutMillis = parentRpc == null ? defaultAdminOperationTimeoutMs :
+                                             parentRpc.timeoutTracker.getMillisBeforeTimeout();
+
+    SplitKeyRangeRequest rpc =
+        new SplitKeyRangeRequest(table,
+                                 startPrimaryKey,
+                                 endPrimaryKey,
+                                 partitionKey,
+                                 splitSizeBytes,
+                                 timer,
+                                 timeoutMillis);
+    rpc.setParentRpc(parentRpc);
+    return sendRpcToTablet(rpc);
+  }
+
+  /**
+   * Get all or some key range for a given table. This may query the master multiple times
if there
+   * are a lot of tablets, and query each tablet to split the tablet's primary key range
into
+   * smaller ranges. This doesn't change the layout of the tablet.
+   * @param table the table to get key ranges from
+   * @param startPrimaryKey the primary key to begin splitting at (inclusive), pass null
to
+   *                        start splitting at the beginning of the tablet
+   * @param endPrimaryKey the primary key to stop splitting at (exclusive), pass null to
+   *                      stop splitting at the end of the tablet
+   * @param startPartitionKey where to start in the table, pass null to start at the beginning
+   * @param endPartitionKey where to stop in the table, pass null to get all the tablets
until the
+   *                        end of the table
+   * @param fetchBatchSize the number of tablets to fetch per round trip from the master
+   * @param splitSizeBytes the size of the data in each key range.
+   *                       This is a hint: The tablet server may return the size of key range
+   *                       larger or smaller than this value. If unset or <= 0, the key
range
+   *                       includes all the data of the tablet.
+   * @param deadline deadline in milliseconds for this method to finish
+   * @return a {@code Deferred} object that yields a list of the key ranges in the table
+   */
+  Deferred<List<KeyRange>> getTableKeyRanges(final KuduTable table,
+                                             final byte[] startPrimaryKey,
+                                             final byte[] endPrimaryKey,
+                                             final byte[] startPartitionKey,
+                                             final byte[] endPartitionKey,
+                                             int fetchBatchSize,
+                                             long splitSizeBytes,
+                                             long deadline) {
+    final TimeoutTracker timeoutTracker = new TimeoutTracker();
+    timeoutTracker.setTimeout(deadline);
+
+    Callback<Deferred<List<KeyRange>>, List<LocatedTablet>> locateTabletCB
=
+        new Callback<Deferred<List<KeyRange>>, List<LocatedTablet>>()
{
+      @Override
+      public Deferred<List<KeyRange>> call(List<LocatedTablet> tablets)
{
+        if (splitSizeBytes <= 0) {
+          final List<KeyRange> keyRanges = Lists.newArrayList();
+          for (LocatedTablet tablet : tablets) {
+            keyRanges.add(new KeyRange(tablet, startPrimaryKey, endPrimaryKey, -1));
+          }
+          return Deferred.fromResult(keyRanges);
+        } else {
+          List<Deferred<List<KeyRange>>> deferreds = new ArrayList<>();
+          for (LocatedTablet tablet : tablets) {
+            // Build a fake RPC to encapsulate and propagate the timeout.
+            // There's no actual "RPC" to send.
+            KuduRpc fakeRpc = buildFakeRpc("getTableKeyRanges",
+                                           null,
+                                           timeoutTracker.getMillisBeforeTimeout());
+            deferreds.add(getTabletKeyRanges(table,
+                                             startPrimaryKey,
+                                             endPrimaryKey,
+                                             tablet.getPartition().getPartitionKeyStart(),
+                                             splitSizeBytes,
+                                             fakeRpc).addCallbackDeferring(
+                new Callback<Deferred<List<KeyRange>>, SplitKeyRangeResponse>()
{
+                  @Override
+                  public Deferred<List<KeyRange>> call(SplitKeyRangeResponse
resp) {
+                    final List<KeyRange> ranges = Lists.newArrayList();
+                    for (Common.KeyRangePB pb : resp.getKeyRanges()) {
+                      KeyRange newRange = new KeyRange(tablet,
+                                                       pb.getStartPrimaryKey().toByteArray(),
+                                                       pb.getStopPrimaryKey().toByteArray(),
+                                                       pb.getSizeBytesEstimates());
+                      ranges.add(newRange);
+                      LOG.debug("Add key range {}", newRange);
+                    }
+                    return Deferred.fromResult(ranges);
+                  }
+                }));
+          }
+          // Must preserve the order.
+          return Deferred.groupInOrder(deferreds).addCallbackDeferring(
+              new Callback<Deferred<List<KeyRange>>, ArrayList<List<KeyRange>>>()
{
+                @Override
+                public Deferred<List<KeyRange>> call(ArrayList<List<KeyRange>>
rangeLists) {
+                  final List<KeyRange> ret = Lists.newArrayList();
+                  for (List<KeyRange> ranges : rangeLists) {
+                    ret.addAll(ranges);
+                  }
+                  return Deferred.fromResult(ret);
+                }
+              });
+        }
+      }
+    };
+
+    final List<LocatedTablet> tablets = Lists.newArrayList();
+    return loopLocateTable(table,
+                           startPartitionKey,
+                           endPartitionKey,
+                           fetchBatchSize,
+                           tablets,
+                           timeoutTracker).addCallbackDeferring(locateTabletCB);
   }
 
   /**
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 3ecc67e..a145cf9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -722,6 +722,16 @@ public final class AsyncKuduScanner {
     buf.append(", tablet=").append(tablet);
     buf.append(", scannerId=").append(Bytes.pretty(scannerId));
     buf.append(", scanRequestTimeout=").append(scanRequestTimeout);
+    if (startPrimaryKey.length > 0) {
+      buf.append(", startPrimaryKey=").append(Bytes.hex(startPrimaryKey));
+    } else {
+      buf.append(", startPrimaryKey=<start>");
+    }
+    if (endPrimaryKey.length > 0) {
+      buf.append(", endPrimaryKey=").append(Bytes.hex(endPrimaryKey));
+    } else {
+      buf.append(", endPrimaryKey=<end>");
+    }
     buf.append(')');
     return buf.toString();
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyRange.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyRange.java
new file mode 100644
index 0000000..3c7bdd1
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyRange.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/*
+ * Class used to represent primary key range in tablet.
+ */
+@InterfaceAudience.Private
+class KeyRange {
+  private byte[] primaryKeyStart;
+  private byte[] primaryKeyEnd;
+  private long dataSizeBytes;
+  private LocatedTablet tablet;
+
+  /**
+   * Create a new key range [primaryKeyStart, primaryKeyEnd).
+   * @param tablet the tablet which the key range belongs to, cannot be null
+   * @param primaryKeyStart the encoded primary key where to start in the key range (inclusive)
+   * @param primaryKeyEnd the encoded primary key where to stop in the key range (exclusive)
+   * @param dataSizeBytes the estimated data size of the key range.
+   */
+  public KeyRange(LocatedTablet tablet,
+                  byte[] primaryKeyStart,
+                  byte[] primaryKeyEnd,
+                  long dataSizeBytes) {
+    Preconditions.checkNotNull(tablet);
+    this.tablet = tablet;
+    this.primaryKeyStart = primaryKeyStart;
+    this.primaryKeyEnd = primaryKeyEnd;
+    this.dataSizeBytes = dataSizeBytes;
+  }
+
+  /**
+   * @return the start primary key
+   */
+  public byte[] getPrimaryKeyStart() {
+    return primaryKeyStart;
+  }
+
+  /**
+   * @return the end primary key
+   */
+  public byte[] getPrimaryKeyEnd() {
+    return primaryKeyEnd;
+  }
+
+  /**
+   * @return the located tablet
+   */
+  public LocatedTablet getTablet() {
+    return tablet;
+  }
+
+  /**
+   * @return the start partition key
+   */
+  public byte[] getPartitionKeyStart() {
+    return tablet.getPartition().getPartitionKeyStart();
+  }
+
+  /**
+   * @return the end partition key
+   */
+  public byte[] getPartitionKeyEnd() {
+    return tablet.getPartition().getPartitionKeyEnd();
+  }
+
+  /**
+   * @return the estimated data size of the key range
+   */
+  public long getDataSizeBytes() {
+    return dataSizeBytes;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("[%s, %s), %s, %s",
+                         primaryKeyStart == null || primaryKeyStart.length == 0
+                             ? "<start>" : Bytes.hex(primaryKeyStart),
+                         primaryKeyStart == null || primaryKeyEnd.length == 0
+                             ? "<end>" : Bytes.hex(primaryKeyEnd),
+                         String.valueOf(dataSizeBytes),
+                         tablet.toString());
+  }
+}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index fc66da6..758d86e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -289,8 +289,13 @@ public class KuduScanToken implements Comparable<KuduScanToken>
{
   public static class KuduScanTokenBuilder
       extends AbstractKuduScannerBuilder<KuduScanTokenBuilder, List<KuduScanToken>>
{
 
+    private static final int DEFAULT_SPLIT_SIZE_BYTES = -1;
+
     private long timeout;
 
+    // By default, a scan token is created for each tablet to be scanned.
+    private long splitSizeBytes = DEFAULT_SPLIT_SIZE_BYTES;
+
     KuduScanTokenBuilder(AsyncKuduClient client, KuduTable table) {
       super(client, table);
       timeout = client.getDefaultOperationTimeoutMs();
@@ -306,6 +311,18 @@ public class KuduScanToken implements Comparable<KuduScanToken>
{
       return this;
     }
 
+    /**
+     * Sets the data size of key range. It is used to split tablet's primary key range
+     * into smaller ranges. The split doesn't change the layout of the tablet. This is a
hint:
+     * The tablet server may return the size of key range larger or smaller than this value.
+     * If unset or <= 0, the key range includes all the data of the tablet.
+     * @param splitSizeBytes the data size of key range.
+     */
+    public KuduScanTokenBuilder setSplitSizeBytes(long splitSizeBytes) {
+      this.splitSizeBytes = splitSizeBytes;
+      return this;
+    }
+
     @Override
     public List<KuduScanToken> build() {
       if (lowerBoundPartitionKey.length != 0 ||
@@ -388,32 +405,44 @@ public class KuduScanToken implements Comparable<KuduScanToken>
{
 
       try {
         PartitionPruner pruner = PartitionPruner.create(this);
-        List<LocatedTablet> tablets = new ArrayList<>();
+        List<KeyRange> keyRanges = new ArrayList<>();
         while (pruner.hasMorePartitionKeyRanges()) {
           Pair<byte[], byte[]> partitionRange = pruner.nextPartitionKeyRange();
-          List<LocatedTablet> newTablets = table.getTabletsLocations(
+          List<KeyRange> newKeyRanges = client.getTableKeyRanges(
+              table,
+              proto.getLowerBoundPrimaryKey().toByteArray(),
+              proto.getUpperBoundPrimaryKey().toByteArray(),
               partitionRange.getFirst().length == 0 ? null : partitionRange.getFirst(),
               partitionRange.getSecond().length == 0 ? null : partitionRange.getSecond(),
-              timeout);
+              AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP,
+              splitSizeBytes,
+              timeout).join();
 
-          if (newTablets.isEmpty()) {
+          if (newKeyRanges.isEmpty()) {
             pruner.removePartitionKeyRange(partitionRange.getSecond());
           } else {
-            pruner.removePartitionKeyRange(newTablets.get(newTablets.size() - 1)
-                                                     .getPartition()
-                                                     .getPartitionKeyEnd());
+            pruner.removePartitionKeyRange(newKeyRanges.get(newKeyRanges.size() - 1)
+                                                       .getPartitionKeyEnd());
           }
-          tablets.addAll(newTablets);
+          keyRanges.addAll(newKeyRanges);
         }
 
-        List<KuduScanToken> tokens = new ArrayList<>(tablets.size());
-        for (LocatedTablet tablet : tablets) {
+        List<KuduScanToken> tokens = new ArrayList<>(keyRanges.size());
+        for (KeyRange keyRange : keyRanges) {
           Client.ScanTokenPB.Builder builder = proto.clone();
           builder.setLowerBoundPartitionKey(
-              UnsafeByteOperations.unsafeWrap(tablet.getPartition().getPartitionKeyStart()));
+              UnsafeByteOperations.unsafeWrap(keyRange.getPartitionKeyStart()));
           builder.setUpperBoundPartitionKey(
-              UnsafeByteOperations.unsafeWrap(tablet.getPartition().getPartitionKeyEnd()));
-          tokens.add(new KuduScanToken(tablet, builder.build()));
+              UnsafeByteOperations.unsafeWrap(keyRange.getPartitionKeyEnd()));
+          byte[] primaryKeyStart = keyRange.getPrimaryKeyStart();
+          if (primaryKeyStart != null && primaryKeyStart.length > 0) {
+            builder.setLowerBoundPrimaryKey(UnsafeByteOperations.unsafeWrap(primaryKeyStart));
+          }
+          byte[] primaryKeyEnd = keyRange.getPrimaryKeyEnd();
+          if (primaryKeyEnd != null && primaryKeyEnd.length > 0) {
+            builder.setUpperBoundPrimaryKey(UnsafeByteOperations.unsafeWrap(primaryKeyEnd));
+          }
+          tokens.add(new KuduScanToken(keyRange.getTablet(), builder.build()));
         }
         return tokens;
       } catch (Exception e) {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
new file mode 100644
index 0000000..02644d9
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.UnsafeByteOperations;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
+
+import org.apache.kudu.Common.KeyRangePB;
+import org.apache.kudu.tserver.Tserver;
+import org.apache.kudu.util.Pair;
+
+/*
+ * RPC to split a tablet's primary key range into smaller ranges suitable for concurrent
scanning.
+ */
+@InterfaceAudience.Private
+class SplitKeyRangeRequest extends KuduRpc<SplitKeyRangeResponse> {
+
+  private final byte[] startPrimaryKey;
+  private final byte[] endPrimaryKey;
+  private final byte[] partitionKey;
+  private final long splitSizeBytes;
+
+  /**
+   * Create a new RPC request
+   * @param table table to lookup
+   * @param startPrimaryKey the primary key to begin splitting at (inclusive), pass null
to
+   *                        start at the beginning
+   * @param endPrimaryKey the primary key to stop splitting at (exclusive), pass null to
+   *                      get all the key ranges
+   * @param partitionKey the partition key of the tablet to find
+   * @param splitSizeBytes the size of the data in each key range.
+   *                       This is a hint: The tablet server may return the size of key range
+   *                       larger or smaller than this value.
+   * @param timer Timer to monitor RPC timeouts.
+   * @param timeoutMillis the timeout of the request in milliseconds
+   */
+  SplitKeyRangeRequest(KuduTable table,
+                       byte[] startPrimaryKey,
+                       byte[] endPrimaryKey,
+                       byte[] partitionKey,
+                       long splitSizeBytes,
+                       Timer timer,
+                       long timeoutMillis) {
+    super(table, timer, timeoutMillis);
+    this.startPrimaryKey = startPrimaryKey;
+    this.endPrimaryKey = endPrimaryKey;
+    this.partitionKey = partitionKey;
+    this.splitSizeBytes = splitSizeBytes;
+  }
+
+  @Override
+  Message createRequestPB() {
+    RemoteTablet tablet = super.getTablet();
+    final Tserver.SplitKeyRangeRequestPB.Builder builder =
+        Tserver.SplitKeyRangeRequestPB.newBuilder();
+    builder.setTabletId(UnsafeByteOperations.unsafeWrap(tablet.getTabletIdAsBytes()));
+    if (this.startPrimaryKey != null && this.startPrimaryKey.length > 0) {
+      builder.setStartPrimaryKey(UnsafeByteOperations.unsafeWrap(startPrimaryKey));
+    }
+    if (this.endPrimaryKey != null && this.endPrimaryKey.length > 0) {
+      builder.setStopPrimaryKey(UnsafeByteOperations.unsafeWrap(endPrimaryKey));
+    }
+    builder.setTargetChunkSizeBytes(splitSizeBytes);
+
+    return builder.build();
+  }
+
+  @Override
+  String serviceName() {
+    return TABLET_SERVER_SERVICE_NAME;
+  }
+
+  @Override
+  String method() {
+    return "SplitKeyRange";
+  }
+
+  @Override
+  Pair<SplitKeyRangeResponse, Object> deserialize(CallResponse callResponse, String
tsUuid) {
+    final Tserver.SplitKeyRangeResponsePB.Builder respBuilder =
+        Tserver.SplitKeyRangeResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), respBuilder);
+
+    List<KeyRangePB> keyRanges = new ArrayList<>();
+    for (KeyRangePB keyRange : respBuilder.getRangesList()) {
+      keyRanges.add(keyRange);
+    }
+
+    SplitKeyRangeResponse response = new SplitKeyRangeResponse(
+        timeoutTracker.getElapsedMillis(), tsUuid, keyRanges);
+    return new Pair<SplitKeyRangeResponse, Object>(response,
+      respBuilder.hasError() ? respBuilder.getError() : null);
+  }
+
+  @Override
+  byte[] partitionKey() {
+    return this.partitionKey;
+  }
+}
\ No newline at end of file
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeResponse.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeResponse.java
new file mode 100644
index 0000000..4b8c274
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeResponse.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.util.List;
+
+import org.apache.kudu.Common.KeyRangePB;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Response type for SplitKeyRangeRequest.
+ */
+@InterfaceAudience.Private
+public class SplitKeyRangeResponse extends KuduRpcResponse {
+
+  private List<KeyRangePB> keyRanges;
+
+  SplitKeyRangeResponse(long elapsedMillis, String tsUUID, List<KeyRangePB> keyRanges)
{
+    super(elapsedMillis, tsUUID);
+    this.keyRanges = keyRanges;
+  }
+
+  /**
+   * Get the list of primary key ranges key as specified in the request.
+   * @return a list of key ranges
+   */
+  public List<KeyRangePB> getKeyRanges() {
+    return keyRanges;
+  }
+}
+
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSplitKeyRange.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSplitKeyRange.java
new file mode 100644
index 0000000..4a52daf
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSplitKeyRange.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static org.apache.kudu.test.ClientTestUtil.createTableWithOneThousandRows;
+import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestSplitKeyRange {
+  // Generate a unique table name
+  private static final String TABLE_NAME =
+      TestSplitKeyRange.class.getName()+"-"+System.currentTimeMillis();
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  @Test
+  @TabletServerConfig(flags = {
+      "--flush_threshold_mb=1",
+      "--flush_threshold_secs=1"
+  })
+  public void testSplitKeyRange() throws Exception {
+    final KuduTable table = createTableWithOneThousandRows(
+        harness.getAsyncClient(), TABLE_NAME, 32 * 1024, DEFAULT_SLEEP);
+
+    // Wait for mrs flushed
+    Thread.sleep(10 * 1000);
+
+    Schema schema = table.getSchema();
+
+    // 1. Don't split tablet's key range
+    // 1.1 Get all key range for table
+    List<KeyRange> keyRanges = table.getAsyncClient().getTableKeyRanges(
+        table,null, null, null, null,
+        AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP, -1, DEFAULT_SLEEP).join();
+    assertEquals(4, keyRanges.size());
+    LocatedTablet tablet0 = keyRanges.get(0).getTablet();
+    LocatedTablet tablet1 = keyRanges.get(1).getTablet();
+    LocatedTablet tablet2 = keyRanges.get(2).getTablet();
+    LocatedTablet tablet3 = keyRanges.get(3).getTablet();
+    // 1.2 Get all key range for specified tablet
+    keyRanges = table.getAsyncClient().getTableKeyRanges(
+        table, null, null,
+        tablet1.getPartition().getPartitionKeyStart(),
+        tablet1.getPartition().getPartitionKeyEnd(),
+        AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP,
+        -1, DEFAULT_SLEEP).join();
+    assertEquals(1, keyRanges.size());
+    assertEquals(tablet1.toString(), keyRanges.get(0).getTablet().toString());
+    assertEquals(null, keyRanges.get(0).getPrimaryKeyStart());
+    assertEquals(null, keyRanges.get(0).getPrimaryKeyEnd());
+
+    // 2. Don't set primary key range, and splitSizeBytes > tablet's size
+    keyRanges = table.getAsyncClient().getTableKeyRanges(
+        table, null, null,
+        tablet1.getPartition().getPartitionKeyStart(),
+        tablet1.getPartition().getPartitionKeyEnd(),
+        AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP,
+        1024 * 1024 * 1024, DEFAULT_SLEEP).join();
+    assertEquals(1, keyRanges.size());
+    assertEquals(tablet1.toString(), keyRanges.get(0).getTablet().toString());
+
+    keyRanges = table.getAsyncClient().getTableKeyRanges(
+        table, null, null,
+        tablet1.getPartition().getPartitionKeyStart(),
+        tablet2.getPartition().getPartitionKeyEnd(),
+        AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP,
+        1024 * 1024 * 1024, DEFAULT_SLEEP).join();
+    assertEquals(2, keyRanges.size());
+    assertEquals(tablet1.toString(), keyRanges.get(0).getTablet().toString());
+    assertEquals(tablet2.toString(), keyRanges.get(1).getTablet().toString());
+
+    // 3. Set primary key range, and splitSizeBytes > tablet's size
+    // 3.1 Non-coverage
+    PartialRow partialRowStart = schema.newPartialRow();
+    partialRowStart.addInt(0, 1500);
+    PartialRow partialRowEnd = schema.newPartialRow();
+    partialRowEnd.addInt(0, 2000);
+    byte[] primaryKeyStart = KeyEncoder.encodePrimaryKey(partialRowStart);
+    byte[] primaryKeyEnd = KeyEncoder.encodePrimaryKey(partialRowEnd);
+    keyRanges = table.getAsyncClient().getTableKeyRanges(
+        table, primaryKeyStart, primaryKeyEnd,
+        tablet1.getPartition().getPartitionKeyStart(),
+        tablet1.getPartition().getPartitionKeyEnd(),
+        AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP,
+        1024 * 1024 * 1024, DEFAULT_SLEEP).join();
+    // TODO: Response should be return empty. But this does not affect scan result.
+    assertEquals(1, keyRanges.size());
+    assertEquals(tablet1.toString(), keyRanges.get(0).getTablet().toString());
+    assertEquals(partialRowStart.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(0).getPrimaryKeyStart()).toString());
+    assertEquals(partialRowEnd.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(0).getPrimaryKeyEnd()).toString());
+
+    // 3.2 Coverage, but no data. RPC return a key range for tablet's MRS, because
+    // the data that in MRS may be in the [partialRowStart, partialRowEnd). But in this
+    // test case, the MRS is empty.
+    primaryKeyStart = KeyEncoder.encodePrimaryKey(partialRowStart);
+    primaryKeyEnd = KeyEncoder.encodePrimaryKey(partialRowEnd);
+    keyRanges = table.getAsyncClient().getTableKeyRanges(
+        table, primaryKeyStart, primaryKeyEnd, null, null,
+        AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP,
+        1024 * 1024 * 1024, DEFAULT_SLEEP).join();
+    assertEquals(4, keyRanges.size());
+    assertEquals(tablet0.toString(), keyRanges.get(0).getTablet().toString());
+    assertEquals(partialRowStart.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(0).getPrimaryKeyStart()).toString());
+    assertEquals(partialRowEnd.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(0).getPrimaryKeyEnd()).toString());
+    assertEquals(tablet1.toString(), keyRanges.get(1).getTablet().toString());
+    assertEquals(partialRowStart.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(1).getPrimaryKeyStart()).toString());
+    assertEquals(partialRowEnd.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(1).getPrimaryKeyEnd()).toString());
+    assertEquals(tablet2.toString(), keyRanges.get(2).getTablet().toString());
+    assertEquals(partialRowStart.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(2).getPrimaryKeyStart()).toString());
+    assertEquals(partialRowEnd.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(2).getPrimaryKeyEnd()).toString());
+    assertEquals(tablet3.toString(), keyRanges.get(3).getTablet().toString());
+    assertEquals(partialRowStart.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(3).getPrimaryKeyStart()).toString());
+    assertEquals(partialRowEnd.toString(),
+        KeyEncoder.decodePrimaryKey(schema, keyRanges.get(3).getPrimaryKeyEnd()).toString());
+
+    // 4. Set primary key range, and splitSizeBytes < tablet's size
+    partialRowStart = schema.newPartialRow();
+    partialRowStart.addInt(0, 200);
+    partialRowEnd = schema.newPartialRow();
+    partialRowEnd.addInt(0, 800);
+    primaryKeyStart = KeyEncoder.encodePrimaryKey(partialRowStart);
+    primaryKeyEnd = KeyEncoder.encodePrimaryKey(partialRowEnd);
+    keyRanges = table.getAsyncClient().getTableKeyRanges(
+        table, primaryKeyStart, primaryKeyEnd, null, null,
+        AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP,
+        1024, DEFAULT_SLEEP).join();
+    assertTrue(keyRanges.size() >= 4);
+    for (KeyRange keyRange : keyRanges) {
+      int startKey = KeyEncoder.decodePrimaryKey(schema, keyRange.getPrimaryKeyStart()).getInt(0);
+      int endKey = KeyEncoder.decodePrimaryKey(schema, keyRange.getPrimaryKeyEnd()).getInt(0);
+      assertTrue(200 <= startKey);
+      assertTrue(startKey <= endKey);
+      assertTrue(800 >= endKey);
+      assertTrue(0 < keyRange.getDataSizeBytes());
+    }
+  }
+}
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index ecdd786..4a7470f 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -67,6 +67,7 @@ class DefaultSource
   val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs"
   val BATCH_SIZE = "kudu.batchSize"
   val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs"
+  val SPLIT_SIZE_BYTES = "kudu.splitSizeBytes"
 
   /**
    * A nice alias for the data source so that when specifying the format
@@ -179,6 +180,7 @@ class DefaultSource
     val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
     val keepAlivePeriodMs =
       parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs)
+    val splitSizeBytes = parameters.get(SPLIT_SIZE_BYTES).map(_.toLong)
 
     KuduReadOptions(
       batchSize,
@@ -186,7 +188,8 @@ class DefaultSource
       faultTolerantScanner,
       keepAlivePeriodMs,
       scanRequestTimeoutMs,
-      /* socketReadTimeoutMs= */ None)
+      /* socketReadTimeoutMs= */ None,
+      splitSizeBytes)
   }
 
   private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index c47a084..52bcba8 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -74,6 +74,10 @@ class KuduRDD private[kudu] (
       builder.scanRequestTimeout(timeout)
     }
 
+    options.splitSizeBytes.foreach { size =>
+      builder.setSplitSizeBytes(size)
+    }
+
     for (predicate <- predicates) {
       builder.addPredicate(predicate)
     }
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index afa0b2f..afe2084 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -35,6 +35,9 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
  *                          server to ensure that scanners do not time out
  * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds
  * @param socketReadTimeoutMs This parameter is deprecated and has no effect
+ * @param splitSizeBytes Sets the target number of bytes per spark task. If set, tablet's
+ *                       primary key range will be split to generate uniform task sizes instead
of
+ *                       the default of 1 task per tablet.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -44,7 +47,8 @@ case class KuduReadOptions(
     faultTolerantScanner: Boolean = defaultFaultTolerantScanner,
     keepAlivePeriodMs: Long = defaultKeepAlivePeriodMs,
     scanRequestTimeoutMs: Option[Long] = None,
-    socketReadTimeoutMs: Option[Long] = None)
+    socketReadTimeoutMs: Option[Long] = None,
+    splitSizeBytes: Option[Long] = None)
 
 object KuduReadOptions {
   val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting?
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 101546f..4d1944b 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -34,6 +34,7 @@ import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.junit.Before
 import org.junit.Test
@@ -1023,4 +1024,27 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     val kuduRelation = kuduRelationFromDataFrame(dataFrame)
     assert(kuduRelation.readOptions.scanRequestTimeoutMs == Some(66666))
   }
+
+  @Test
+  @TabletServerConfig(
+    flags = Array(
+      "--flush_threshold_mb=1",
+      "--flush_threshold_secs=1"
+    ))
+  def testScanWithKeyRange() {
+    upsertRowsWithRowDataSize(table, rowCount * 100, 32 * 1024)
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.splitSizeBytes" -> "1024")
+
+    // count the number of tasks that end.
+    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ =>
+      val t = "scanWithKeyRangeTest"
+      sqlContext.read.options(kuduOptions).format("kudu").load.createOrReplaceTempView(t)
+      val results = sqlContext.sql(s"SELECT * FROM $t").collectAsList()
+      assertEquals(rowCount * 100, results.size())
+    }
+    assert(actualNumTasks >= 2)
+  }
 }
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index 4e139e2..54f0ffa 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -190,4 +190,43 @@ trait KuduTestSuite extends JUnitSuite {
     }
     rows
   }
+
+  def upsertRowsWithRowDataSize(
+      targetTable: KuduTable,
+      rowCount: Integer,
+      rowDataSize: Integer): IndexedSeq[(Int, Int, String, Long)] = {
+    val kuduSession = kuduClient.newSession()
+
+    val rows = Range(0, rowCount).map { i =>
+      val upsert = targetTable.newUpsert
+      val row = upsert.getRow
+      row.addInt(0, i)
+      row.addInt(1, i)
+      row.addDouble(3, i.toDouble)
+      row.addLong(4, i.toLong)
+      row.addBoolean(5, i % 2 == 1)
+      row.addShort(6, i.toShort)
+      row.addFloat(7, i.toFloat)
+      row.addBinary(8, (s"*" * rowDataSize).getBytes())
+      val ts = System.currentTimeMillis() * 1000
+      row.addLong(9, ts)
+      row.addByte(10, i.toByte)
+      row.addDecimal(11, BigDecimal.valueOf(i))
+      row.addDecimal(12, BigDecimal.valueOf(i))
+      row.addDecimal(13, BigDecimal.valueOf(i))
+
+      // Sprinkling some nulls so that queries see them.
+      val s = if (i % 2 == 0) {
+        row.addString(2, i.toString)
+        i.toString
+      } else {
+        row.setNull(2)
+        null
+      }
+
+      kuduSession.apply(upsert)
+      (i, i, s, ts)
+    }
+    rows
+  }
 }
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
index 38ffff7..5e3a37b 100644
--- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
@@ -43,6 +43,7 @@ import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
 import org.apache.kudu.client.Upsert;
 import org.apache.kudu.util.DecimalUtil;
+import org.apache.kudu.util.StringUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -299,6 +300,23 @@ public abstract class ClientTestUtil {
     return upsert;
   }
 
+  public static Upsert createBasicSchemaUpsertWithDataSize(KuduTable table, int key, int
dataSize) {
+    Upsert upsert = table.newUpsert();
+    PartialRow row = upsert.getRow();
+    row.addInt(0, key);
+    row.addInt(1, 3);
+    row.addInt(2, 4);
+
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < dataSize; i++) {
+      builder.append("*");
+    }
+    String val = builder.toString();
+    row.addString(3, val);
+    row.addBoolean(4, false);
+    return upsert;
+  }
+
   public static Insert createBasicSchemaInsert(KuduTable table, int key) {
     Insert insert = table.newInsert();
     PartialRow row = insert.getRow();
@@ -342,6 +360,31 @@ public abstract class ClientTestUtil {
     return table;
   }
 
+  public static KuduTable createTableWithOneThousandRows(AsyncKuduClient client,
+                                                         String tableName,
+                                                         final int rowDataSize,
+                                                         final long timeoutMs)
+      throws Exception {
+    final int[] KEYS = new int[] { 250, 500, 750 };
+    final Schema basicSchema = getBasicSchema();
+    CreateTableOptions builder = getBasicCreateTableOptions();
+    for (int i : KEYS) {
+      PartialRow splitRow = basicSchema.newPartialRow();
+      splitRow.addInt(0, i);
+      builder.addSplitRow(splitRow);
+    }
+    KuduTable table = client.syncClient().createTable(tableName, basicSchema, builder);
+    AsyncKuduSession session = client.newSession();
+
+    // create a table with on 4 tablets of 250 rows each
+    for (int key = 0; key < 1000; key++) {
+      Upsert upsert = createBasicSchemaUpsertWithDataSize(table, key, rowDataSize);
+      session.apply(upsert).join(timeoutMs);
+    }
+    session.close().join(timeoutMs);
+    return table;
+  }
+
   public static Schema createManyStringsSchema() {
     ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4);
     columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());


Mime
View raw message