kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [1/2] kudu git commit: [java client] Implement RPC tracing, part 1
Date Fri, 04 Nov 2016 22:33:53 GMT
Repository: kudu
Updated Branches:
  refs/heads/master ce80f4ebd -> c53d9694f


[java client] Implement RPC tracing, part 1

First part of this work is adding the tracing objects and doing the tracing. A second
patch will make this information available to users.

This patch is using a pretty simple method of just
shoving container objects into a list, per RPC. The traces are lightweight
and don't try anything fancy. We also introduce the concept of "parent RPC", so that say
a Write RPC spawns a GetTableLocations, and the latter will be added to the former
so that the call to the master adds traces to both RPCs.

This patch isn't adding a nice way to present the traces (like JSON) but here's a simple
toString example:

RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973547, action=SEND_TO_SERVER, server=3926a6a73e994152be1336beb434154e},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973548, action=RECEIVE_FROM_SERVER, server=3926a6a73e994152be1336beb434154e,
callStatus=Network error: [Peer 3926a6a73e994152be1336beb434154e] Connection reset on [id:
0xc83743df]}
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973548, action=SLEEP_THEN_RETRY, callStatus=Network
error: [Peer 3926a6a73e994152be1336beb434154e] Connection reset on [id: 0xc83743df]},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973574, action=QUERY_MASTER},
RpcTraceFrame{rpcMethod='GetTableLocations', timestampMs=1477079973574, action=SEND_TO_SERVER,
server=c0d4588690d241c69821ee773eebd185},
RpcTraceFrame{rpcMethod='GetTableLocations', timestampMs=1477079973576, action=RECEIVE_FROM_SERVER,
server=c0d4588690d241c69821ee773eebd185, callStatus=OK},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973579, action=PICKED_REPLICA},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973579, action=SEND_TO_SERVER, server=0353a6d97d6c49f9a727bc1ee6c3393e},

This patch also fixes up some paths where we weren't passing a timeout correctly to an
RPC that was created in relation to another RPC (basically paths where the parent RPC
had to be set).

Change-Id: I69ef56acc071b9f80b34e38c1821df4096f54907
Reviewed-on: http://gerrit.cloudera.org:8080/4781
Reviewed-by: Dan Burkert <danburkert@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bdbee44e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bdbee44e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bdbee44e

Branch: refs/heads/master
Commit: bdbee44e029b2ada382c09ccb63a522b6de66186
Parents: ce80f4e
Author: Jean-Daniel Cryans <jdcryans@apache.org>
Authored: Fri Oct 21 13:10:31 2016 -0700
Committer: Jean-Daniel Cryans <jdcryans@apache.org>
Committed: Fri Nov 4 22:09:52 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  87 +++++++++-----
 .../apache/kudu/client/AsyncKuduScanner.java    |   2 +-
 .../main/java/org/apache/kudu/client/Batch.java |   1 +
 .../java/org/apache/kudu/client/KuduRpc.java    |  57 +++++++++
 .../org/apache/kudu/client/RpcTraceFrame.java   | 117 +++++++++++++++++++
 .../org/apache/kudu/client/TabletClient.java    |  40 ++++++-
 .../org/apache/kudu/client/BaseKuduTest.java    |   3 +-
 .../org/apache/kudu/client/TestRpcTraces.java   | 115 ++++++++++++++++++
 8 files changed, 387 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 06b9203..9274087 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -587,8 +587,8 @@ public class AsyncKuduClient implements AutoCloseable {
     RemoteTablet tablet = scanner.currentTablet();
     assert (tablet != null);
     KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
-    TabletClient client =
-        connectionCache.getClient(tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection()));
+    String uuid = tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection());
+    TabletClient client = connectionCache.getClient(uuid);
     Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
     // Important to increment the attempts before the next if statement since
     // getSleepTimeForRpc() relies on it if the client is null or dead.
@@ -597,7 +597,9 @@ public class AsyncKuduClient implements AutoCloseable {
       // A null client means we either don't know about this tablet anymore (unlikely) or
we
       // couldn't find a leader (which could be triggered by a read timeout).
       // We'll first delay the RPC in case things take some time to settle down, then retry.
-      return delayedSendRpcToTablet(nextRequest, null);
+      Status statusRemoteError = Status.RemoteError("Not connected to server " + uuid
+          + " will retry after a delay");
+      return delayedSendRpcToTablet(nextRequest, new RecoverableException(statusRemoteError));
     }
     client.sendRpc(nextRequest);
     return d;
@@ -685,6 +687,12 @@ public class AsyncKuduClient implements AutoCloseable {
       }
     }
 
+    request.addTrace(
+        new RpcTraceFrame.RpcTraceFrameBuilder(
+            request.method(),
+            RpcTraceFrame.Action.QUERY_MASTER)
+            .build());
+
     // We fall through to here in two cases:
     //
     // 1) This client has not yet discovered the tablet which is responsible for
@@ -702,7 +710,7 @@ public class AsyncKuduClient implements AutoCloseable {
     Callback<Deferred<R>, Master.GetTableLocationsResponsePB> cb = new RetryRpcCB<>(request);
     Callback<Deferred<R>, Exception> eb = new RetryRpcErrback<>(request);
     Deferred<Master.GetTableLocationsResponsePB> returnedD =
-        locateTablet(request.getTable(), partitionKey);
+        locateTablet(request.getTable(), partitionKey, request);
     return AsyncUtil.addCallbacksDeferring(returnedD, cb, eb);
   }
 
@@ -712,7 +720,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * <p>
    * Use {@code AsyncUtil.addCallbacksDeferring} to add this as the callback and
    * {@link AsyncKuduClient.RetryRpcErrback} as the "errback" to the {@code Deferred}
-   * returned by {@link #locateTablet(KuduTable, byte[])}.
+   * returned by {@link #locateTablet(KuduTable, byte[], KuduRpc)}.
    * @param <R> RPC's return type.
    * @param <D> Previous query's return type, which we don't use, but need to specify
in order to
    *           tie it all together.
@@ -739,7 +747,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * <p>
    * Use {@code AsyncUtil.addCallbacksDeferring} to add this as the "errback" and
    * {@link RetryRpcCB} as the callback to the {@code Deferred} returned by
-   * {@link #locateTablet(KuduTable, byte[])}.
+   * {@link #locateTablet(KuduTable, byte[], KuduRpc)}.
    * @see #delayedSendRpcToTablet(KuduRpc, KuduException)
    * @param <R> The type of the original RPC.
    */
@@ -818,10 +826,12 @@ public class AsyncKuduClient implements AutoCloseable {
             }
           }
         }
-        IsCreateTableDoneRequest rpc = new IsCreateTableDoneRequest(masterTable, tableId);
-        rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+        IsCreateTableDoneRequest isCreateTableDoneRequest =
+            new IsCreateTableDoneRequest(masterTable, tableId);
+        isCreateTableDoneRequest.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+        isCreateTableDoneRequest.setParentRpc(rpc);
         final Deferred<Master.IsCreateTableDoneResponsePB> d =
-            sendRpcToTablet(rpc).addCallback(new IsCreateTableDoneCB(tableId));
+            sendRpcToTablet(isCreateTableDoneRequest).addCallback(new IsCreateTableDoneCB(tableId));
         if (has_permit) {
           // The errback is needed here to release the lookup permit
           d.addCallbacks(new ReleaseMasterLookupPermit<Master.IsCreateTableDoneResponsePB>(),
@@ -958,13 +968,15 @@ public class AsyncKuduClient implements AutoCloseable {
    * Sends a getTableLocations RPC to the master to find the table's tablets.
    * @param table table to lookup
    * @param partitionKey can be null, if not we'll find the exact tablet that contains it
+   * @param parentRpc RPC that prompted a master lookup, can be null
    * @return Deferred to track the progress
    */
   private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable table,
-                                                                    byte[] partitionKey)
{
-    final boolean has_permit = acquireMasterLookupPermit();
+                                                                    byte[] partitionKey,
+                                                                    KuduRpc<?> parentRpc)
{
+    boolean hasPermit = acquireMasterLookupPermit();
     String tableId = table.getTableId();
-    if (!has_permit) {
+    if (!hasPermit) {
       // If we failed to acquire a permit, it's worth checking if someone
       // looked up the tablet we're interested in.  Every once in a while
       // this will save us a Master lookup.
@@ -974,22 +986,27 @@ public class AsyncKuduClient implements AutoCloseable {
         return Deferred.fromResult(null);  // Looks like no lookup needed.
       }
     }
-    // Leave the end of the partition key range empty in order to pre-fetch tablet locations.
-    GetTableLocationsRequest rpc =
-        new GetTableLocationsRequest(masterTable, partitionKey, null, tableId);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    final Deferred<Master.GetTableLocationsResponsePB> d;
 
     // If we know this is going to the master, check the master consensus
     // configuration (as specified by 'masterAddresses' field) to determine and
     // cache the current leader.
+    Deferred<Master.GetTableLocationsResponsePB> d;
     if (isMasterTable(tableId)) {
-      d = getMasterTableLocationsPB();
+      d = getMasterTableLocationsPB(parentRpc);
     } else {
+      // Leave the end of the partition key range empty in order to pre-fetch tablet locations.
+      GetTableLocationsRequest rpc =
+          new GetTableLocationsRequest(masterTable, partitionKey, null, tableId);
+      if (parentRpc != null) {
+        rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
+        rpc.setParentRpc(parentRpc);
+      } else {
+        rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+      }
       d = sendRpcToTablet(rpc);
     }
     d.addCallback(new MasterLookupCB(table, partitionKey));
-    if (has_permit) {
+    if (hasPermit) {
       d.addBoth(new ReleaseMasterLookupPermit<Master.GetTableLocationsResponsePB>());
     }
     return d;
@@ -1000,7 +1017,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * fill a {@link Master.GetTabletLocationsResponsePB} object.
    * @return An initialized Deferred object to hold the response.
    */
-  Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB() {
+  Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?>
parentRpc) {
     final Deferred<Master.GetTableLocationsResponsePB> responseD = new Deferred<>();
     final GetMasterRegistrationReceived received =
         new GetMasterRegistrationReceived(masterAddresses, responseD);
@@ -1017,7 +1034,7 @@ public class AsyncKuduClient implements AutoCloseable {
         Status statusIOE = Status.IOError(message);
         d = Deferred.fromError(new NonRecoverableException(statusIOE));
       } else {
-        d = getMasterRegistration(clientForHostAndPort);
+        d = getMasterRegistration(clientForHostAndPort, parentRpc);
       }
       d.addCallbacks(received.callbackForNode(hostAndPort), received.errbackForNode(hostAndPort));
     }
@@ -1088,7 +1105,7 @@ public class AsyncKuduClient implements AutoCloseable {
       // When lookup completes, the tablet (or non-covered range) for the next
       // partition key will be located and added to the client's cache.
       final byte[] lookupKey = partitionKey;
-      return locateTablet(table, key).addCallbackDeferring(
+      return locateTablet(table, key, null).addCallbackDeferring(
           new Callback<Deferred<List<LocatedTablet>>, GetTableLocationsResponsePB>()
{
             @Override
             public Deferred<List<LocatedTablet>> call(GetTableLocationsResponsePB
resp) {
@@ -1156,7 +1173,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * {@link #getSleepTimeForRpc(KuduRpc)}. If the RPC is out of time/retries, its errback
will
    * be immediately called.
    * @param rpc the RPC to retry later
-   * @param ex the reason why we need to retry, might be null
+   * @param ex the reason why we need to retry
    * @return a Deferred object to use if this method is called inline with the user's original
    * attempt to send the RPC. Can be ignored in any other context that doesn't need to return
a
    * Deferred back to the user.
@@ -1171,6 +1188,15 @@ public class AsyncKuduClient implements AutoCloseable {
         sendRpcToTablet(rpc);
       }
     }
+    assert (ex != null);
+    Status reasonForRetry = ex.getStatus();
+    rpc.addTrace(
+        new RpcTraceFrame.RpcTraceFrameBuilder(
+            rpc.method(),
+            RpcTraceFrame.Action.SLEEP_THEN_RETRY)
+            .callStatus(reasonForRetry)
+            .build());
+
     long sleepTime = getSleepTimeForRpc(rpc);
     if (cannotRetryRequest(rpc) || rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) {
       // Don't let it retry.
@@ -1389,14 +1415,21 @@ public class AsyncKuduClient implements AutoCloseable {
   /**
    * Retrieve the master registration (see {@link GetMasterRegistrationResponse}
    * for a replica.
-   * @param masterClient An initialized client for the master replica.
-   * @return A Deferred object for the master replica's current registration.
+   * @param masterClient an initialized client for the master replica
+   * @param parentRpc RPC that prompted a master lookup, can be null
+   * @return a Deferred object for the master replica's current registration
    */
-  Deferred<GetMasterRegistrationResponse> getMasterRegistration(TabletClient masterClient)
{
+  Deferred<GetMasterRegistrationResponse> getMasterRegistration(
+      TabletClient masterClient, KuduRpc<?> parentRpc) {
     // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
     // basically reuse in some way the master permits.
     GetMasterRegistrationRequest rpc = new GetMasterRegistrationRequest(masterTable);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    if (parentRpc != null) {
+      rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
+      rpc.setParentRpc(parentRpc);
+    } else {
+      rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    }
     Deferred<GetMasterRegistrationResponse> d = rpc.getDeferred();
     rpc.attempt++;
     masterClient.sendRpc(rpc);

http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 3864088..8171ac1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -789,7 +789,7 @@ public final class AsyncKuduScanner {
     public String toString() {
       return "ScanRequest(scannerId=" + Bytes.pretty(scannerId)
           + (tablet != null? ", tabletSlice=" + tablet.getTabletId() : "")
-          + ", attempt=" + attempt + ')';
+          + ", attempt=" + attempt + ", " + super.toString() + ")";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index 8dacdd7..0f86a42 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -178,6 +178,7 @@ class Batch extends KuduRpc<BatchResponse> {
                       .add("operations", operations.size())
                       .add("tablet", tablet)
                       .add("ignoreAllDuplicateRows", ignoreAllDuplicateRows)
+                      .add("rpc", super.toString())
                       .toString();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 5ae3f56..03fcce1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -25,6 +25,7 @@
  */
 package org.apache.kudu.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -39,7 +40,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
 
@@ -61,12 +65,20 @@ import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
 @InterfaceAudience.Private
 public abstract class KuduRpc<R> {
 
+  @VisibleForTesting
+  public static final int MAX_TRACES_SIZE = 100;
+
   // Service names.
   protected static final String MASTER_SERVICE_NAME = "kudu.master.MasterService";
   protected static final String TABLET_SERVER_SERVICE_NAME = "kudu.tserver.TabletServerService";
 
   private static final Logger LOG = LoggerFactory.getLogger(KuduRpc.class);
 
+  private final List<RpcTraceFrame> traces =
+      Collections.synchronizedList(new ArrayList<RpcTraceFrame>());
+
+  private KuduRpc<?> parentRpc;
+
   /**
    * Returns the partition key this RPC is for, or {@code null} if the RPC is
    * not tablet specific.
@@ -203,10 +215,46 @@ public abstract class KuduRpc<R> {
       sequenceId = RequestTracker.NO_SEQ_NO;
     }
     deadlineTracker.reset();
+    traces.clear();
+    parentRpc = null;
     d.callback(result);
   }
 
   /**
+   * Add the provided trace to this RPC's collection of traces. If this RPC has a parent
RPC, it
+   * will also receive that trace. If this RPC has reached the limit of traces it can track
then
+   * the trace will just be discarded.
+   * @param rpcTraceFrame trace to add
+   */
+  void addTrace(RpcTraceFrame rpcTraceFrame) {
+    if (parentRpc != null) {
+      parentRpc.addTrace(rpcTraceFrame);
+    }
+
+    if (traces.size() == MAX_TRACES_SIZE) {
+      // Add a last trace that indicates that we've reached the max size.
+      traces.add(
+          new RpcTraceFrame.RpcTraceFrameBuilder(
+              this.method(),
+              RpcTraceFrame.Action.TRACE_TRUNCATED)
+              .build());
+    } else if (traces.size() < MAX_TRACES_SIZE) {
+      traces.add(rpcTraceFrame);
+    }
+  }
+
+  /**
+   * Sets this RPC to receive traces from the provided parent RPC. An RPC can only have one
and
+   * only one parent RPC.
+   * @param parentRpc RPC that will also receive traces from this RPC
+   */
+  void setParentRpc(KuduRpc<?> parentRpc) {
+    assert (this.parentRpc == null);
+    assert (this.parentRpc != this);
+    this.parentRpc = parentRpc;
+  }
+
+  /**
    * Package private way of making an RPC complete by giving it its result.
    * If this RPC has no {@link Deferred} associated to it, nothing will
    * happen.  This may happen if the RPC was already called back.
@@ -267,6 +315,14 @@ public abstract class KuduRpc<R> {
     return ReplicaSelection.LEADER_ONLY;
   }
 
+  /**
+   * Get an immutable copy of the traces.
+   * @return list of traces
+   */
+  List<RpcTraceFrame> getImmutableTraces() {
+    return ImmutableList.copyOf(traces);
+  }
+
   void setSequenceId(long sequenceId) {
     assert (this.sequenceId == RequestTracker.NO_SEQ_NO);
     this.sequenceId = sequenceId;
@@ -289,6 +345,7 @@ public abstract class KuduRpc<R> {
     // this method if DEBUG is enabled.
     if (LOG.isDebugEnabled()) {
       buf.append(", ").append(deferred);
+      buf.append(", ").append(traces);
     }
     buf.append(')');
     return buf.toString();

http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
new file mode 100644
index 0000000..cae2b02
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.google.common.base.MoreObjects;
+import org.apache.kudu.annotations.InterfaceAudience;
+
+/**
+ * Container class for traces. Most of its properties can be null, when they aren't set via
the
+ * builder. The timestamp is set automatically.
+ */
+@InterfaceAudience.Private
+class RpcTraceFrame {
+  enum Action {
+    // Just before putting the RPC on the wire.
+    SEND_TO_SERVER,
+    // Just after parsing the response from the server.
+    RECEIVE_FROM_SERVER,
+    // Just before sleeping and then retrying.
+    SLEEP_THEN_RETRY,
+    // After having figured out that we don't know where the RPC is going,
+    // before querying the master.
+    QUERY_MASTER,
+    // Once the trace becomes too large, will be the last trace object in the list.
+    TRACE_TRUNCATED
+  }
+
+  private final String rpcMethod;
+  private final Action action;
+  private final ServerInfo serverInfo;
+  private final long timestampMs;
+  private final Status callStatus;
+
+  private RpcTraceFrame(String rpcMethod, Action action,
+                        ServerInfo serverInfo, Status callStatus) {
+    this.rpcMethod = rpcMethod;
+    this.action = action;
+    this.serverInfo = serverInfo;
+    this.callStatus = callStatus;
+    this.timestampMs = System.currentTimeMillis();
+  }
+
+  public String getRpcMethod() {
+    return rpcMethod;
+  }
+
+  Action getAction() {
+    return action;
+  }
+
+  ServerInfo getServer() {
+    return serverInfo;
+  }
+
+  long getTimestampMs() {
+    return timestampMs;
+  }
+
+  public Status getStatus() {
+    return callStatus;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("rpcMethod", rpcMethod)
+        .add("timestampMs", timestampMs)
+        .add("action", action)
+        .add("serverInfo", serverInfo)
+        .add("callStatus", callStatus)
+        .toString();
+  }
+
+  /**
+   * Builder class for trace frames. The only required parameters are set in the constructor.
+   * Timestamp is set automatically.
+   */
+  static class RpcTraceFrameBuilder {
+    private final String rpcMethod;
+    private final Action action;
+    private ServerInfo serverInfo;
+    private Status callStatus;
+
+    RpcTraceFrameBuilder(String rpcMethod, Action action) {
+      this.rpcMethod = rpcMethod;
+      this.action = action;
+    }
+
+    public RpcTraceFrameBuilder serverInfo(ServerInfo serverInfo) {
+      this.serverInfo = serverInfo;
+      return this;
+    }
+
+    public RpcTraceFrameBuilder callStatus(Status callStatus) {
+      this.callStatus = callStatus;
+      return this;
+    }
+
+    public RpcTraceFrame build() {
+      return new RpcTraceFrame(rpcMethod, action, serverInfo, callStatus);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 1b6f5ac..813f02b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -153,6 +153,13 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
   }
 
   <R> void sendRpc(KuduRpc<R> rpc) {
+    rpc.addTrace(
+        new RpcTraceFrame.RpcTraceFrameBuilder(
+            rpc.method(),
+            RpcTraceFrame.Action.SEND_TO_SERVER)
+            .serverInfo(serverInfo)
+            .build());
+
     if (!rpc.deadlineTracker.hasDeadline()) {
       LOG.warn(getPeerUuidLoggingString() + " sending an rpc without a timeout " + rpc);
     }
@@ -420,6 +427,13 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
       throw new NonRecoverableException(statusIllegalState);
     }
 
+    // Start building the trace, we'll finish it as we parse the response.
+    RpcTraceFrame.RpcTraceFrameBuilder traceBuilder =
+        new RpcTraceFrame.RpcTraceFrameBuilder(
+            rpc.method(),
+            RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
+            .serverInfo(serverInfo);
+
     Pair<Object, Object> decoded = null;
     KuduException exception = null;
     Status retryableHeaderError = Status.OK();
@@ -463,6 +477,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
 
     // This check is specifically for the ERROR_SERVER_TOO_BUSY case above.
     if (!retryableHeaderError.ok()) {
+      rpc.addTrace(traceBuilder.callStatus(retryableHeaderError).build());
       kuduClient.handleRetryableError(rpc, new RecoverableException(retryableHeaderError));
       return null;
     }
@@ -473,7 +488,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     if (decoded != null) {
       if (decoded.getSecond() instanceof Tserver.TabletServerErrorPB) {
         Tserver.TabletServerErrorPB error = (Tserver.TabletServerErrorPB) decoded.getSecond();
-        exception = dispatchTSErrorOrReturnException(rpc, error);
+        exception = dispatchTSErrorOrReturnException(rpc, error, traceBuilder);
         if (exception == null) {
           // It was taken care of.
           return null;
@@ -484,7 +499,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
 
       } else if (decoded.getSecond() instanceof Master.MasterErrorPB) {
         Master.MasterErrorPB error = (Master.MasterErrorPB) decoded.getSecond();
-        exception = dispatchMasterErrorOrReturnException(rpc, error);
+        exception = dispatchMasterErrorOrReturnException(rpc, error, traceBuilder);
         if (exception == null) {
           // Exception was taken care of.
           return null;
@@ -500,11 +515,13 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
         if (kuduClient.isStatisticsEnabled()) {
           rpc.updateStatistics(kuduClient.getStatistics(), decoded.getFirst());
         }
+        rpc.addTrace(traceBuilder.callStatus(Status.OK()).build());
         rpc.callback(decoded.getFirst());
       } else {
         if (kuduClient.isStatisticsEnabled()) {
           rpc.updateStatistics(kuduClient.getStatistics(), null);
         }
+        rpc.addTrace(traceBuilder.callStatus(exception.getStatus()).build());
         rpc.errback(exception);
       }
     } catch (Exception e) {
@@ -525,8 +542,9 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
    * @param error the error the TS sent
    * @return an exception if we couldn't dispatch the error, or null
    */
-  private KuduException dispatchTSErrorOrReturnException(KuduRpc rpc,
-                                                         Tserver.TabletServerErrorPB error)
{
+  private KuduException dispatchTSErrorOrReturnException(
+      KuduRpc rpc, Tserver.TabletServerErrorPB error,
+      RpcTraceFrame.RpcTraceFrameBuilder traceBuilder) {
     WireProtocol.AppStatusPB.ErrorCode code = error.getStatus().getCode();
     Status status = Status.fromTabletServerErrorPB(error);
     if (error.getCode() == Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND) {
@@ -541,6 +559,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     } else {
       return new NonRecoverableException(status);
     }
+    rpc.addTrace(traceBuilder.callStatus(status).build());
     return null;
   }
 
@@ -551,8 +570,8 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
    * @param error the error the master sent
    * @return an exception if we couldn't dispatch the error, or null
    */
-  private KuduException dispatchMasterErrorOrReturnException(KuduRpc rpc,
-                                                             Master.MasterErrorPB error)
{
+  private KuduException dispatchMasterErrorOrReturnException(
+      KuduRpc rpc, Master.MasterErrorPB error, RpcTraceFrame.RpcTraceFrameBuilder traceBuilder)
{
     WireProtocol.AppStatusPB.ErrorCode code = error.getStatus().getCode();
     Status status = Status.fromMasterErrorPB(error);
     if (error.getCode() == Master.MasterErrorPB.Code.NOT_THE_LEADER) {
@@ -571,6 +590,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     } else {
       return new NonRecoverableException(status);
     }
+    rpc.addTrace(traceBuilder.callStatus(status).build());
     return null;
   }
 
@@ -735,6 +755,14 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
    */
   private void failOrRetryRpc(final KuduRpc<?> rpc,
                               final RecoverableException exception) {
+    rpc.addTrace(
+        new RpcTraceFrame.RpcTraceFrameBuilder(
+            rpc.method(),
+            RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
+            .serverInfo(serverInfo)
+            .callStatus(exception.getStatus())
+            .build());
+
     RemoteTablet tablet = rpc.getTablet();
     // Note As of the time of writing (03/11/16), a null tablet doesn't make sense, if we
see a null
     // tablet it's because we didn't set it properly before calling sendRpc().

http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 8989f50..d30762c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -380,7 +380,8 @@ public class BaseKuduTest {
     Stopwatch sw = Stopwatch.createStarted();
     int leaderPort = -1;
     while (leaderPort == -1 && sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP)
{
-      Deferred<Master.GetTableLocationsResponsePB> masterLocD = client.getMasterTableLocationsPB();
+      Deferred<Master.GetTableLocationsResponsePB> masterLocD =
+          client.getMasterTableLocationsPB(null);
       Master.GetTableLocationsResponsePB r = masterLocD.join(DEFAULT_SLEEP);
       leaderPort = r.getTabletLocations(0)
           .getReplicas(0)

http://git-wip-us.apache.org/repos/asf/kudu/blob/bdbee44e/java/kudu-client/src/test/java/org/apache/kudu/client/TestRpcTraces.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRpcTraces.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRpcTraces.java
new file mode 100644
index 0000000..6e8842c
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRpcTraces.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestRpcTraces {
+
+  @Test
+  public void testLimit() {
+    PingRequest ping = PingRequest.makeMasterPingRequest();
+
+    ping.addTrace(getTrace());
+    assertNotTruncated(ping);
+
+    for (int i = 0; i < KuduRpc.MAX_TRACES_SIZE - 2; i++) {
+      ping.addTrace(getTrace());
+    }
+    assertNotTruncated(ping);
+
+    ping.addTrace(getTrace());
+    assertNotTruncated(ping);
+
+    ping.addTrace(getTrace());
+    assertTruncateIsLast(ping);
+
+    ping.addTrace(getTrace());
+    assertTruncateIsLast(ping);
+  }
+
+  @Test
+  public void testParentRpc() {
+    PingRequest parent = PingRequest.makeMasterPingRequest();
+
+    PingRequest daughter = PingRequest.makeMasterPingRequest();
+    PingRequest son = PingRequest.makeMasterPingRequest();
+
+    PingRequest sonsDaughter = PingRequest.makeMasterPingRequest();
+
+    sonsDaughter.setParentRpc(son);
+    son.setParentRpc(parent);
+    daughter.setParentRpc(parent);
+
+    // Son's daughter => son => parent.
+    RpcTraceFrame trace = getTrace();
+    sonsDaughter.addTrace(trace);
+    assertTrue(son.getImmutableTraces().get(0) == trace);
+    assertTrue(parent.getImmutableTraces().get(0) == trace);
+    assertTrue(daughter.getImmutableTraces().isEmpty());
+
+    // Son => parent.
+    trace = getTrace();
+    son.addTrace(trace);
+    assertTrue(son.getImmutableTraces().get(1) == trace);
+    assertTrue(parent.getImmutableTraces().get(1) == trace);
+    assertTrue(daughter.getImmutableTraces().isEmpty());
+    assertEquals(1, sonsDaughter.getImmutableTraces().size());
+
+    // Daughter => parent.
+    trace = getTrace();
+    daughter.addTrace(trace);
+    assertTrue(daughter.getImmutableTraces().get(0) == trace);
+    assertTrue(parent.getImmutableTraces().get(2) == trace);
+    assertEquals(2, son.getImmutableTraces().size());
+    assertEquals(1, sonsDaughter.getImmutableTraces().size());
+
+    // Parent alone.
+    trace = getTrace();
+    parent.addTrace(trace);
+    assertTrue(parent.getImmutableTraces().get(3) == trace);
+    assertEquals(1, daughter.getImmutableTraces().size());
+    assertEquals(2, son.getImmutableTraces().size());
+    assertEquals(1, sonsDaughter.getImmutableTraces().size());
+  }
+
+  private RpcTraceFrame getTrace() {
+    return new RpcTraceFrame.RpcTraceFrameBuilder(
+        "trace",
+        RpcTraceFrame.Action.QUERY_MASTER) // Just a random action.
+        .build();
+  }
+
+  private void assertNotTruncated(KuduRpc<?> rpc) {
+    for (RpcTraceFrame trace : rpc.getImmutableTraces()) {
+      assertNotEquals(RpcTraceFrame.Action.TRACE_TRUNCATED, trace.getAction());
+    }
+  }
+
+  private void assertTruncateIsLast(KuduRpc<?> rpc) {
+    List<RpcTraceFrame> traces = rpc.getImmutableTraces();
+    assertEquals(KuduRpc.MAX_TRACES_SIZE + 1, traces.size());
+    for (int i = 0; i < traces.size() - 1; i++) {
+      assertNotEquals(RpcTraceFrame.Action.TRACE_TRUNCATED, traces.get(i).getAction());
+    }
+    assertEquals(RpcTraceFrame.Action.TRACE_TRUNCATED, traces.get(traces.size() - 1).getAction());
+  }
+}


Mime
View raw message