kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/8] incubator-kudu git commit: add async method to locate table
Date Fri, 29 Jan 2016 21:28:46 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master eb592b298 -> 82fdeb64e


add async method to locate table

Now AsyncKuduClient only have one method called syncLocateTable to
locate table in a sync way, and it will block the user's thread,
which is not friendly in asynchronous programming.

This patch add another method to locate table asynchronously.

Change-Id: I0868ed77366ff7d45f92f99dfd033c244d53b48f
Reviewed-on: http://gerrit.cloudera.org:8080/1916
Reviewed-by: Jean-Daniel Cryans
Tested-by: Jean-Daniel Cryans


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/48a7b2f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/48a7b2f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/48a7b2f0

Branch: refs/heads/master
Commit: 48a7b2f0be477d897573d46f135daeb5f90abf8b
Parents: eb592b2
Author: zhangzhen <zhangzhen@xiaomi.com>
Authored: Wed Jan 27 17:18:39 2016 +0800
Committer: Jean-Daniel Cryans <jdcryans@gerrit.cloudera.org>
Committed: Fri Jan 29 18:39:48 2016 +0000

----------------------------------------------------------------------
 .../java/org/kududb/client/AsyncKuduClient.java | 110 ++++++++++++-------
 .../main/java/org/kududb/client/KuduTable.java  |  29 +++++
 .../java/org/kududb/client/TestKuduTable.java   |  10 +-
 3 files changed, 106 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/48a7b2f0/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
index 53e9066..1a396b1 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Message;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.kududb.Common;
 import org.kududb.Schema;
@@ -43,6 +44,7 @@ import org.kududb.annotations.InterfaceAudience;
 import org.kududb.annotations.InterfaceStability;
 import org.kududb.consensus.Metadata;
 import org.kududb.master.Master;
+import org.kududb.master.Master.GetTableLocationsResponsePB;
 import org.kududb.util.AsyncUtil;
 import org.kududb.util.NetUtil;
 import org.kududb.util.Pair;
@@ -62,6 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -1019,50 +1022,73 @@ public class AsyncKuduClient implements AutoCloseable {
                                       byte[] startPartitionKey,
                                       byte[] endPartitionKey,
                                       long deadline) throws Exception {
-    List<LocatedTablet> ret = Lists.newArrayList();
-    byte[] lastEndPartitionKey = null;
+    return locateTable(tableId, startPartitionKey, endPartitionKey, deadline).join();
+  }
 
-    DeadlineTracker deadlineTracker = new DeadlineTracker();
-    deadlineTracker.setDeadline(deadline);
-    while (true) {
-      if (deadlineTracker.timedOut()) {
-        throw new NonRecoverableException("Took too long getting the list of tablets, " +
-            "deadline=" + deadline);
-      }
-      GetTableLocationsRequest rpc =
-          new GetTableLocationsRequest(masterTable, startPartitionKey, endPartitionKey, tableId);
-      rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-      final Deferred<Master.GetTableLocationsResponsePB> d = sendRpcToTablet(rpc);
-      Master.GetTableLocationsResponsePB response =
-          d.join(deadlineTracker.getMillisBeforeDeadline());
-      // Table doesn't exist or is being created.
-      if (response.getTabletLocationsCount() == 0) {
-        break;
-      }
-      for (Master.TabletLocationsPB tabletPb : response.getTabletLocationsList()) {
-        LocatedTablet locs = new LocatedTablet(tabletPb);
-        ret.add(locs);
-
-        Partition partition = locs.getPartition();
-        if (lastEndPartitionKey != null &&
-            !partition.isEndPartition() &&
-            Bytes.memcmp(partition.getPartitionKeyEnd(), lastEndPartitionKey) < 0) {
-          throw new IllegalStateException(
-            "Server returned tablets out of order: " +
-            "end partition key '" + Bytes.pretty(partition.getPartitionKeyEnd()) + "' followed
" +
-            "end partition key '" + Bytes.pretty(lastEndPartitionKey) + "'");
-        }
-        lastEndPartitionKey = partition.getPartitionKeyEnd();
-      }
-      // If true, we're done, else we have to go back to the master with the last end key
-      if (lastEndPartitionKey.length == 0 ||
-          (endPartitionKey != null && Bytes.memcmp(lastEndPartitionKey, endPartitionKey)
> 0)) {
-        break;
-      } else {
-        startPartitionKey = lastEndPartitionKey;
-      }
+  private Deferred<List<LocatedTablet>> loopLocateTable(final String tableId,
+      final byte[] startPartitionKey, final byte[] endPartitionKey, final List<LocatedTablet>
ret,
+      final DeadlineTracker deadlineTracker) {
+    if (deadlineTracker.timedOut()) {
+      return Deferred.fromError(new NonRecoverableException(
+          "Took too long getting the list of tablets, " + deadlineTracker));
     }
-    return ret;
+    GetTableLocationsRequest rpc = new GetTableLocationsRequest(masterTable, startPartitionKey,
+        endPartitionKey, tableId);
+    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    final Deferred<Master.GetTableLocationsResponsePB> d = sendRpcToTablet(rpc);
+    return d.addCallbackDeferring(
+        new Callback<Deferred<List<LocatedTablet>>, Master.GetTableLocationsResponsePB>()
{
+          @Override
+          public Deferred<List<LocatedTablet>> call(GetTableLocationsResponsePB
response) {
+            // Table doesn't exist or is being created.
+            if (response.getTabletLocationsCount() == 0) {
+              Deferred.fromResult(ret);
+            }
+            byte[] lastEndPartition = startPartitionKey;
+            for (Master.TabletLocationsPB tabletPb : response.getTabletLocationsList()) {
+              LocatedTablet locs = new LocatedTablet(tabletPb);
+              ret.add(locs);
+              Partition partition = locs.getPartition();
+              if (lastEndPartition != null && !partition.isEndPartition()
+                  && Bytes.memcmp(partition.getPartitionKeyEnd(), lastEndPartition)
< 0) {
+                return Deferred.fromError(new IllegalStateException(
+                    "Server returned tablets out of order: " + "end partition key '"
+                        + Bytes.pretty(partition.getPartitionKeyEnd()) + "' followed "
+                        + "end partition key '" + Bytes.pretty(lastEndPartition) + "'"));
+              }
+              lastEndPartition = partition.getPartitionKeyEnd();
+            }
+            // If true, we're done, else we have to go back to the master with the last end
key
+            if (lastEndPartition.length == 0
+                || (endPartitionKey != null && Bytes.memcmp(lastEndPartition, endPartitionKey)
> 0)) {
+              return Deferred.fromResult(ret);
+            } else {
+              return loopLocateTable(tableId, lastEndPartition, endPartitionKey, ret,
+                  deadlineTracker);
+            }
+          }
+        });
+  }
+
+  /**
+   * Get all or some tablets for a given table. This may query the master multiple times
if there
+   * are a lot of tablets.
+   * @param tableId the table to locate tablets from
+   * @param startPartitionKey where to start in the table, pass null to start at the beginning
+   * @param endPartitionKey where to stop in the table, pass null to get all the tablets
until the
+   *                        end of the table
+   * @param deadline max time spent in milliseconds for the deferred result of this method
to
+   *         get called back, if deadline is reached, the deferred result will get erred
back
+   * @return a deferred object that yields a list of the tablets in the table, which can
be queried
+   *         for metadata about each tablet
+   * @throws Exception MasterErrorException if the table doesn't exist
+   */
+  Deferred<List<LocatedTablet>> locateTable(final String tableId,
+      final byte[] startPartitionKey, final byte[] endPartitionKey, long deadline) {
+    final List<LocatedTablet> ret = Lists.newArrayList();
+    final DeadlineTracker deadlineTracker = new DeadlineTracker();
+    deadlineTracker.setDeadline(deadline);
+    return loopLocateTable(tableId, startPartitionKey, endPartitionKey, ret, deadlineTracker);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/48a7b2f0/java/kudu-client/src/main/java/org/kududb/client/KuduTable.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduTable.java b/java/kudu-client/src/main/java/org/kududb/client/KuduTable.java
index 58b9a67..b0dd4e7 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduTable.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduTable.java
@@ -20,6 +20,8 @@ import org.kududb.Schema;
 import org.kududb.annotations.InterfaceAudience;
 import org.kududb.annotations.InterfaceStability;
 
+import com.stumbleupon.async.Deferred;
+
 import java.util.List;
 
 /**
@@ -140,6 +142,18 @@ public class KuduTable {
   }
 
   /**
+   * Asynchronously get all the tablets for this table.
+   * @param deadline max time spent in milliseconds for the deferred result of this method
to
+   *         get called back, if deadline is reached, the deferred result will get erred
back
+   * @return a {@link Deferred} object that yields a list containing the metadata and
+   * locations for each of the tablets in the table
+   */
+  public Deferred<List<LocatedTablet>> asyncGetTabletsLocations(
+      long deadline) throws Exception {
+    return asyncGetTabletsLocations(null, null, deadline);
+  }
+
+  /**
    * Get all or some tablets for this table. This may query the master multiple times if
there
    * are a lot of tablets.
    * This method blocks until it gets all the tablets.
@@ -156,4 +170,19 @@ public class KuduTable {
     return client.syncLocateTable(tableId, startKey, endKey, deadline);
   }
 
+  /**
+   * Asynchronously get all or some tablets for this table.
+   * @param startKey where to start in the table, pass null to start at the beginning
+   * @param endKey where to stop in the table, pass null to get all the tablets until the
end of
+   *               the table
+   * @param deadline max time spent in milliseconds for the deferred result of this method
to
+   *         get called back, if deadline is reached, the deferred result will get erred
back
+   * @return a {@link Deferred} object that yields a list containing the metadata and locations
+   *           for each of the tablets in the table
+   */
+  public Deferred<List<LocatedTablet>> asyncGetTabletsLocations(
+      byte[] startKey, byte[] endKey, long deadline) throws Exception {
+    return client.locateTable(tableId, startKey, endKey, deadline);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/48a7b2f0/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java b/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java
index abe77f1..b19753b 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestKuduTable.java
@@ -172,28 +172,35 @@ public class TestKuduTable extends BaseKuduTest {
 
     List<LocatedTablet>tablets = table.getTabletsLocations(null, getKeyInBytes(9),
DEFAULT_SLEEP);
     assertEquals(10, tablets.size());
+    assertEquals(10, table.asyncGetTabletsLocations(null, getKeyInBytes(9), DEFAULT_SLEEP).join().size());
 
     tablets = table.getTabletsLocations(getKeyInBytes(0), getKeyInBytes(9), DEFAULT_SLEEP);
     assertEquals(10, tablets.size());
+    assertEquals(10, table.asyncGetTabletsLocations(getKeyInBytes(0), getKeyInBytes(9), DEFAULT_SLEEP).join().size());
 
     tablets = table.getTabletsLocations(getKeyInBytes(5), getKeyInBytes(9), DEFAULT_SLEEP);
     assertEquals(5, tablets.size());
+    assertEquals(5, table.asyncGetTabletsLocations(getKeyInBytes(5), getKeyInBytes(9), DEFAULT_SLEEP).join().size());
 
     tablets = table.getTabletsLocations(getKeyInBytes(5), getKeyInBytes(14), DEFAULT_SLEEP);
     assertEquals(10, tablets.size());
+    assertEquals(10, table.asyncGetTabletsLocations(getKeyInBytes(5), getKeyInBytes(14),
DEFAULT_SLEEP).join().size());
 
     tablets = table.getTabletsLocations(getKeyInBytes(5), getKeyInBytes(31), DEFAULT_SLEEP);
     assertEquals(26, tablets.size());
+    assertEquals(26, table.asyncGetTabletsLocations(getKeyInBytes(5), getKeyInBytes(31),
DEFAULT_SLEEP).join().size());
 
     tablets = table.getTabletsLocations(getKeyInBytes(5), null, DEFAULT_SLEEP);
     assertEquals(26, tablets.size());
+    assertEquals(26, table.asyncGetTabletsLocations(getKeyInBytes(5), null, DEFAULT_SLEEP).join().size());
 
     tablets = table.getTabletsLocations(null, getKeyInBytes(10000), DEFAULT_SLEEP);
     assertEquals(31, tablets.size());
+    assertEquals(31, table.asyncGetTabletsLocations(null, getKeyInBytes(10000), DEFAULT_SLEEP).join().size());
 
     tablets = table.getTabletsLocations(getKeyInBytes(20), getKeyInBytes(10000), DEFAULT_SLEEP);
     assertEquals(11, tablets.size());
-
+    assertEquals(11, table.asyncGetTabletsLocations(getKeyInBytes(20), getKeyInBytes(10000),
DEFAULT_SLEEP).join().size());
 
     // Test listing tables.
     assertEquals(0, client.getTablesList(table1).join(DEFAULT_SLEEP).getTablesList().size());
@@ -232,6 +239,7 @@ public class TestKuduTable extends BaseKuduTest {
 
     List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
     assertEquals(splitsCount + 1, tablets.size());
+    assertEquals(splitsCount + 1, table.asyncGetTabletsLocations(DEFAULT_SLEEP).join().size());
     for (LocatedTablet tablet : tablets) {
       assertEquals(1, tablet.getReplicas().size());
     }


Mime
View raw message