kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/2] kudu git commit: KUDU-2065, KUDU-2011: Release sidecars on cancellation or timeout
Date Fri, 18 Aug 2017 02:39:11 GMT
KUDU-2065, KUDU-2011: Release sidecars on cancellation or timeout

Previously, when an outbound call times out after transmission
of the request has begun but not finished, the RPC layer will
continue to send the entire payload (including any sidecars) to
the remote destination until the promised number of bytes specified
in the header are sent. This is problematic for the users of the
RPC layer as there is no well defined point in which the sidecars
are no longer referenced by the RPC layer. The only model in which
this would work is for the caller to either transfer or share ownership
of the sidecars with the RPC layer. This has caused some life cycle
issues with row batches in Impala before (e.g. IMPALA-5093).

This change fixes the problem above by modifying the RPC protocol
to allow a mid-transmission RPC call to be cancelled. Specifically,
a new footer is added to all outbound call requests. It contains
a flag, when true, indicates the outbound call was cancelled after
it has started but before it finished. The server will inspect this
flag and ignore inbound calls with this flag set. This footer enables
the caller to relinquish references to the sidecars early when an
outbound call is cancelled or timed-out. Once the call is cancelled
or timed-out, the RPC layer will send the remainder of the bytes for
the request with some dummy bytes. Since the server always ignores
cancelled call requests, it's okay to send random bytes.

To avoid breaking compatibility, a new RPC feature flag REQUEST_FOOTERS
is introduced in this change. During connection negotiation, the
client will always include this flag in its feature set. A server which
supports parsing footer will include REQUEST_FOOTERS in its feature set
if it sees the client also supports it. A client will only send the
footer if the remote server has the RPC feature flag REQUEST_FOOTERS.

Change-Id: I5c8e294090279649082eebc4f6cfb6fe858bbbfc
Reviewed-on: http://gerrit.cloudera.org:8080/7599
Reviewed-by: Todd Lipcon <todd@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/351337ee
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/351337ee
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/351337ee

Branch: refs/heads/master
Commit: 351337ee2d92361e2c87699d1c87a49774a3ab4d
Parents: e32faf9
Author: Michael Ho <kwho@cloudera.com>
Authored: Sat Aug 5 17:23:31 2017 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Fri Aug 18 02:38:48 2017 +0000

----------------------------------------------------------------------
 docs/design-docs/rpc.md            |  41 ++++++--
 src/kudu/rpc/blocking_ops.cc       |   3 +-
 src/kudu/rpc/connection.cc         | 169 +++++++++++++++++++++-----------
 src/kudu/rpc/connection.h          |  49 ++++++++-
 src/kudu/rpc/constants.cc          |   6 +-
 src/kudu/rpc/inbound_call.cc       |   9 +-
 src/kudu/rpc/inbound_call.h        |  11 ++-
 src/kudu/rpc/outbound_call.cc      |  71 +++++++++++---
 src/kudu/rpc/outbound_call.h       |  63 ++++++++----
 src/kudu/rpc/reactor.cc            |   4 +-
 src/kudu/rpc/rpc-test-base.h       |  20 ++--
 src/kudu/rpc/rpc-test.cc           |  28 ++++--
 src/kudu/rpc/rpc_header.proto      |  14 +++
 src/kudu/rpc/rtest.proto           |   3 +-
 src/kudu/rpc/serialization.cc      |  52 ++++++++--
 src/kudu/rpc/serialization.h       |  15 +++
 src/kudu/rpc/server_negotiation.cc |   3 +
 src/kudu/rpc/transfer.cc           | 107 ++++++++++++++++----
 src/kudu/rpc/transfer.h            |  21 +++-
 19 files changed, 531 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/docs/design-docs/rpc.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/rpc.md b/docs/design-docs/rpc.md
index 474962d..6a168d4 100644
--- a/docs/design-docs/rpc.md
+++ b/docs/design-docs/rpc.md
@@ -271,6 +271,10 @@ a typical message looks like without sidecars:
 +------------------------------------------------+
 | Main message protobuf                          |
 +------------------------------------------------+
+| RPC Footer protobuf length (variable encoding) | --- Exists only in client requests.
++------------------------------------------------+
+| RPC Footer protobuf                            | --- Exists only in client requests.
++------------------------------------------------+
 ```
 
 In this case, the main message length is equal to the protobuf's byte size.
@@ -299,6 +303,10 @@ Here's what it looks like with the sidecars:
 +------------------------------------------------+ --- ...
 | ...                                            |
 +------------------------------------------------+
+| RPC Footer protobuf length (variable encoding) | --- Exists only in client requests.
++------------------------------------------------+
+| RPC Footer protobuf                            | --- Exists only in client requests.
++------------------------------------------------+
 ```
 
 When there are sidecars, the sidecar_offsets member in the header will be a
@@ -351,6 +359,11 @@ serialized as follows:
     - for typical RPC calls, this is the user-specified request or response protobuf
     - for RPC calls which caused an error, the response is an ErrorStatusPB
     - during negotiation, this is a NegotiatePB
+
+  footer: varint-prefixed footer protobuf
+    - the last part of a RPC message
+    - exists only in client->server message if both client and server support HAS_FOOTER feature flag
+    - contains a flag which is set when the client->server message is cancelled mid-transmission.
 ```
 
 ### Example packet capture
@@ -358,19 +371,29 @@ serialized as follows:
 An example call (captured with strace on rpc-test.cc) follows:
 
 ```
-   "\x00\x00\x00\x17"   (total_size: 23 bytes to follow)
-   "\x09"  RequestHeader varint: 9 bytes
-    "\x08\x0a\x1a\x03\x41\x64\x64\x20\x01" (RequestHeader protobuf)
-      Decoded with protoc --decode=RequestHeader rpc_header.proto:
-      callId: 10
-      methodName: "Add"
-      requestParam: true
+   "\x00\x00\x00\x40"   (total_size: 64 bytes to follow)
+   "\x2f"  RequestHeader varint: 47 bytes
+    "\x18\x09\x32\x28\x0a\x21\x6b\x75\x64\x75\x2e\x72\x70\x63\x2e\x47\x65\x6e\x65"
+    "\x72\x69\x63\x43\x61\x6c\x63\x75\x6c\x61\x74\x6f\x72\x53\x65\x72\x76\x69\x63"
+    "\x65\x12\x03\x41\x64\x64\x50\x90\x4e" (RequestHeader protobuf)
+      Decoded with printf "\x08\x0a..." | protoc --decode=kudu.rpc.RequestHeader rpc_header.proto:
+      call_id: 9
+      remote_method {
+        service_name: "kudu.rpc.GenericCalculatorService"
+        method_name: "Add"
+      }
+      timeout_millis: 10000
 
    "\x0c"  Request parameter varint: 12 bytes
-    "\x08\xd4\x90\x80\x91\x01\x10\xf8\xcf\xc4\xed\x04" Request parameter
-      Decoded with protoc --decode=kudu.rpc_test.AddRequestPB rpc/rtest.proto
+    "\x08\xd4\x90\x80\x91\x01\x10\xf8\xcf\xc4\xed\x04" (Request parameter protobuf)
+      Decoded with printf "\x08\xd4..." | protoc --decode=kudu.rpc_test.AddRequestPB rtest.proto
       x: 304089172
       y: 1303455736
+
+   "\x02"  RequestFooter varint: 2 bytes
+    "\x08\x00" (RequestFooter protobuf)
+      Decoded with printf "\x08\x00" | protoc --decode=kudu.rpc_test.RequestFooterPB rpc_header.proto
+      aborted: false
 ```
 
 ## Negotiation

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/blocking_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/blocking_ops.cc b/src/kudu/rpc/blocking_ops.cc
index 64ae2c0..21132a1 100644
--- a/src/kudu/rpc/blocking_ops.cc
+++ b/src/kudu/rpc/blocking_ops.cc
@@ -119,7 +119,8 @@ Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf,
   recv_buf->resize(payload_len + kMsgLengthPrefixLength);
   RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data() + kMsgLengthPrefixLength,
                 payload_len, &recvd, deadline));
-  RETURN_NOT_OK(serialization::ParseMessage(Slice(*recv_buf), header, param_buf));
+  RETURN_NOT_OK(
+      serialization::ParseMessage(Slice(*recv_buf), header, nullptr, param_buf));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index bc7446e..c532759 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -236,12 +236,45 @@ void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int rev
   conn->HandleOutboundCallTimeout(this);
 }
 
-void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
+void Connection::CancelOutboundTransfer(CallAwaitingResponse *car, const Status &status) {
   DCHECK(reactor_thread_->IsCurrentThread());
   DCHECK(car->call);
+  const shared_ptr<OutboundCall>& call = car->call;
+  if (call->IsInTransmission()) {
+    DCHECK(call->call_id_assigned());
+    DCHECK(car->transfer != nullptr);
+    DCHECK(car->transfer->is_for_outbound_call());
+    if (call->IsOnOutboundQueue()) {
+      // If the transfer hasn't started yet, it should be safe to just call Abort().
+      car->transfer->Abort(status);
+    } else if (RemoteSupportsFooter()) {
+      car->transfer->Cancel(car->call, status);
+    } else {
+      // If the remote server doesn't support parsing footer, there is no way to safely
+      // cancel a mid-transmission trasfer with sidecars. It's fatal to have non-empty
+      // sidecars attached to the call as it may result in use-after-free of the sidecars.
+      // Please see the header comment for details.
+      CHECK(!call->HasNonEmptySidecars());
+    }
+  }
+}
+
+void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+
+  if (!car->call) {
+    // If cancellation callback is scheduled before the timeout callback is
+    // invoked, car->call may be nullptr.
+    return;
+  }
+
   // The timeout timer is stopped by the car destructor exiting Connection::HandleCallResponse()
   DCHECK(!car->call->IsFinished());
 
+  // Cancel or abort any outbound transfer for this call so references to the sidecars
+  // can be relinquished.
+  CancelOutboundTransfer(car, Status::Aborted("timed out"));
+
   // Mark the call object as failed.
   car->call->SetTimedOut(negotiation_complete_ ? Phase::REMOTE_CALL
                                                : Phase::CONNECTION_NEGOTIATION);
@@ -264,7 +297,10 @@ void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
 void Connection::CancelOutboundCall(const shared_ptr<OutboundCall> &call) {
   CallAwaitingResponse* car = FindPtrOrNull(awaiting_response_, call->call_id());
   if (car != nullptr) {
-    // car->call may be NULL if the call has timed out already.
+    car->timeout_timer.stop();
+    // Cancel any outbound transfer for this call so references to the sidecars can be
+    // relinquished.
+    CancelOutboundTransfer(car, Status::Aborted("cancelled"));
     DCHECK(!car->call || car->call.get() == call.get());
     car->call.reset();
   }
@@ -328,8 +364,8 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
   // yet assigned a call ID.
   DCHECK(!call->call_id_assigned());
 
-  // We shouldn't reach this point if 'call' was requested to be cancelled.
-  DCHECK(!call->cancellation_requested());
+  // We shouldn't reach this point if 'call' was cancelled.
+  DCHECK(!call->IsCancelled());
 
   // Assign the call ID.
   int32_t call_id = GetNextCallId();
@@ -393,9 +429,11 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
   }
 
   TransferCallbacks *cb = new CallTransferCallbacks(call, this);
+  gscoped_ptr<OutboundTransfer>
+      transfer(OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, cb));
+  car->transfer = transfer.get();
   awaiting_response_[call_id] = car.release();
-  QueueOutbound(gscoped_ptr<OutboundTransfer>(
-      OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, cb)));
+  QueueOutbound(std::move(transfer));
 }
 
 // Callbacks for sending an RPC call response from the server.
@@ -412,7 +450,7 @@ struct ResponseTransferCallbacks : public TransferCallbacks {
   ~ResponseTransferCallbacks() {
     // Remove the call from the map.
     InboundCall *call_from_map = EraseKeyReturnValuePtr(
-      &conn_->calls_being_handled_, call_->call_id());
+        &conn_->calls_being_handled_, call_->call_id());
     DCHECK_EQ(call_from_map, call_.get());
   }
 
@@ -421,8 +459,8 @@ struct ResponseTransferCallbacks : public TransferCallbacks {
   }
 
   virtual void NotifyTransferAborted(const Status &status) OVERRIDE {
-    LOG(WARNING) << "Connection torn down before " <<
-      call_->ToString() << " could send its response";
+    LOG(WARNING) << "Connection torn down before " << call_->ToString()
+                 << " could send its response: " << status.ToString();
     delete this;
   }
 
@@ -545,7 +583,11 @@ void Connection::HandleIncomingCall(gscoped_ptr<InboundTransfer> transfer) {
 
   gscoped_ptr<InboundCall> call(new InboundCall(this));
   Status s = call->ParseFrom(std::move(transfer));
-  if (!s.ok()) {
+  if (PREDICT_FALSE(s.IsAborted())) {
+    VLOG(2) << "Incoming call was aborted due to: "<< s.ToString();
+    return;
+  }
+  if (PREDICT_FALSE(!s.ok())) {
     LOG(WARNING) << ToString() << ": received bad data: " << s.ToString();
     // TODO: shutdown? probably, since any future stuff on this socket will be
     // "unsynchronized"
@@ -570,7 +612,7 @@ void Connection::HandleCallResponse(gscoped_ptr<InboundTransfer> transfer) {
   CHECK_OK(resp->ParseFrom(std::move(transfer)));
 
   CallAwaitingResponse *car_ptr =
-    EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id());
+      EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id());
   if (PREDICT_FALSE(car_ptr == nullptr)) {
     LOG(WARNING) << ToString() << ": Got a response for call id " << resp->call_id() << " which "
                  << "was not pending! Ignoring.";
@@ -593,6 +635,58 @@ void Connection::HandleCallResponse(gscoped_ptr<InboundTransfer> transfer) {
   MaybeInjectCancellation(car->call);
 }
 
+bool Connection::StartCallTransfer(OutboundTransfer *transfer) {
+  DCHECK(!transfer->TransferStarted());
+  CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
+
+  // If the call has already timed out or has already been cancelled, the 'call'
+  // field would be set to NULL. In that case, don't bother sending it.
+  if (!car->call) {
+    DCHECK(transfer->TransferAborted());
+    return false;
+  }
+
+  DCHECK(!transfer->TransferAborted());
+  // If this is the start of the transfer, then check if the server has the
+  // required RPC flags. We have to wait until just before the transfer in
+  // order to ensure that the negotiation has taken place, so that the flags
+  // are available.
+  DCHECK(negotiation_complete_);
+  const set<RpcFeatureFlag>& required_features = car->call->required_rpc_features();
+  if (!includes(remote_features_.begin(), remote_features_.end(),
+                required_features.begin(), required_features.end())) {
+    Status s = Status::NotSupported("server does not support the required RPC features");
+    transfer->Abort(s);
+    car->call->SetFailed(s, Phase::REMOTE_CALL);
+    // Test cancellation when 'call_' is in 'FINISHED_ERROR' state.
+    MaybeInjectCancellation(car->call);
+    car->call.reset();
+    return false;
+  }
+
+  // The transfer is ready to be sent. Append a footer if the remote system supports it.
+  // It has to be done here because remote features cannot be determined until negotiation
+  // completes. We know for sure that negotiation has completed when we reach this point.
+  if (RemoteSupportsFooter()) {
+    transfer->AppendFooter(car->call);
+  }
+  car->call->SetSending();
+  // Test cancellation when 'call_' is in 'SENDING' state.
+  MaybeInjectCancellation(car->call);
+  return true;
+}
+
+void Connection::OutboundQueuePopFront() {
+  OutboundTransfer *transfer = &(outbound_transfers_.front());
+  // Remove all references to 'transfer' before deleting it.
+  if (transfer->is_for_outbound_call()) {
+    CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
+    car->transfer = nullptr;
+  }
+  outbound_transfers_.pop_front();
+  delete transfer;
+}
+
 void Connection::WriteHandler(ev::io &watcher, int revents) {
   DCHECK(reactor_thread_->IsCurrentThread());
 
@@ -603,7 +697,6 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
   }
   DVLOG(3) << ToString() << ": writeHandler: revents = " << revents;
 
-  OutboundTransfer *transfer;
   if (outbound_transfers_.empty()) {
     LOG(WARNING) << ToString() << " got a ready-to-write callback, but there is "
       "nothing to write.";
@@ -612,45 +705,14 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
   }
 
   while (!outbound_transfers_.empty()) {
-    transfer = &(outbound_transfers_.front());
-
-    if (!transfer->TransferStarted()) {
-
-      if (transfer->is_for_outbound_call()) {
-        CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
-        if (!car->call) {
-          // If the call has already timed out or has already been cancelled, the 'call'
-          // field would be set to NULL. In that case, don't bother sending it.
-          outbound_transfers_.pop_front();
-          transfer->Abort(Status::Aborted("already timed out or cancelled"));
-          delete transfer;
-          continue;
-        }
-
-        // If this is the start of the transfer, then check if the server has the
-        // required RPC flags. We have to wait until just before the transfer in
-        // order to ensure that the negotiation has taken place, so that the flags
-        // are available.
-        const set<RpcFeatureFlag>& required_features = car->call->required_rpc_features();
-        if (!includes(remote_features_.begin(), remote_features_.end(),
-                      required_features.begin(), required_features.end())) {
-          outbound_transfers_.pop_front();
-          Status s = Status::NotSupported("server does not support the required RPC features");
-          transfer->Abort(s);
-          car->call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL
-                                                        : Phase::CONNECTION_NEGOTIATION);
-          // Test cancellation when 'call_' is in 'FINISHED_ERROR' state.
-          MaybeInjectCancellation(car->call);
-          car->call.reset();
-          delete transfer;
-          continue;
-        }
-
-        car->call->SetSending();
-
-        // Test cancellation when 'call_' is in 'SENDING' state.
-        MaybeInjectCancellation(car->call);
-      }
+    OutboundTransfer *transfer = &(outbound_transfers_.front());
+    // Transfer for outbound call must call StartCallTransfer() before transmission can
+    // begin to append footer to the payload if the remote supports it.
+    if (!transfer->TransferStarted() &&
+        transfer->is_for_outbound_call() &&
+        !StartCallTransfer(transfer)) {
+      OutboundQueuePopFront();
+      continue;
     }
 
     last_activity_time_ = reactor_thread_->cur_time();
@@ -660,14 +722,11 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
       reactor_thread_->DestroyConnection(this, status);
       return;
     }
-
     if (!transfer->TransferFinished()) {
       DVLOG(3) << ToString() << ": writeHandler: xfer not finished.";
       return;
     }
-
-    outbound_transfers_.pop_front();
-    delete transfer;
+    OutboundQueuePopFront();
   }
 
   // If we were able to write all of our outbound transfers,

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index 7f16b7b..e782e0b 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -124,7 +124,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
   void QueueResponseForCall(gscoped_ptr<InboundCall> call);
 
   // Cancel an outbound call by removing any reference to it by CallAwaitingResponse
-  // in 'awaiting_responses_'.
+  // in 'awaiting_response_'.
   void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
 
   // The address of the remote end of the connection.
@@ -195,6 +195,16 @@ class Connection : public RefCountedThreadSafe<Connection> {
     remote_features_ = std::move(remote_features);
   }
 
+  // Returns true if the remote end has feature flag "REQUEST_FOOTERS".
+  // Always returns false before negotiation completes.
+  bool RemoteSupportsFooter() const {
+    // Note the negotiation thread may be writing to 'remote_features_' before
+    // 'negotiation_complete_' is set to true. It's not safe to read 'remote_features_'
+    // from the reactor thread when 'negotiation_complete_' is false.
+    return negotiation_complete_ &&
+        ContainsKey(remote_features_, RpcFeatureFlag::REQUEST_FOOTERS);
+  }
+
   void set_remote_user(RemoteUser user) {
     DCHECK_EQ(direction_, SERVER);
     remote_user_ = std::move(user);
@@ -232,10 +242,14 @@ class Connection : public RefCountedThreadSafe<Connection> {
     // Notification from libev that the call has timed out.
     void HandleTimeout(ev::timer &watcher, int revents);
 
-    Connection *conn;
+    Connection *conn = nullptr;
     std::shared_ptr<OutboundCall> call;
     ev::timer timeout_timer;
 
+    // The pending outbound transfer for this call. Set when an outbound call is
+    // enqueued. Set back to NULL once the transfer has been completely sent.
+    OutboundTransfer* transfer = nullptr;
+
     // We time out RPC calls in two stages. This is set to the amount of timeout
     // remaining after the next timeout fires. See Connection::QueueOutboundCall().
     double remaining_timeout;
@@ -270,11 +284,42 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Set it to Failed.
   void HandleOutboundCallTimeout(CallAwaitingResponse *car);
 
+  // Perform some checks on 'transfer' for an outbound call before beginning
+  // transmission. The check could fail if:
+  // 1. the outbound call has already timed out or has been cancelled.
+  // 2. the remote server doesn't have all the required feature flags.
+  //
+  // If the checks pass, the outbound call is set to 'SENDING' state.
+  // Appends a footer to the outbound call if remote supports "REQUEST_FOOTERS".
+  //
+  // If the checks fail due to (1), the transfer is already aborted and callback
+  // has already been invoked. If it fails due to (2), the transfer is aborted
+  // and the outbound call's callback is called with the failure status.
+  //
+  // Returns true if all checks pass; returns false otherwise.
+  bool StartCallTransfer(OutboundTransfer *transfer);
+
+  // Cancel any outbound transfer associated with 'car' and abort it with 'status'.
+  // Called when an outbound call is cancelled or timed out. If the remote server
+  // doesn't support footer (e.g. older version of Kudu server), it's fatal for the
+  // call to have any non-empty sidecars. The reasoning is as follows:
+  //
+  // Up till the point footer is introduced, Kudu doesn't have any outbound call which
+  // uses client sidecars. If any future RPCs make use of client sidecars, the Kudu
+  // server code has to be updated to process the sidecars too, in which case the newer
+  // version of Kudu server should already be able to process footers.
+  void CancelOutboundTransfer(CallAwaitingResponse *car, const Status &status);
+
   // Queue a transfer for sending on this connection.
   // We will take ownership of the transfer.
   // This must be called from the reactor thread.
   void QueueOutbound(gscoped_ptr<OutboundTransfer> transfer);
 
+  // Remove the front transfer from the outbound transfers queue. If the transfer is
+  // associated with an outbound call, also clears the transfer reference from the
+  // associated CallAwaitingResponse.
+  void OutboundQueuePopFront();
+
   // Internal test function for injecting cancellation request when 'call'
   // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
   void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/constants.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/constants.cc b/src/kudu/rpc/constants.cc
index bcf9985..51945ca 100644
--- a/src/kudu/rpc/constants.cc
+++ b/src/kudu/rpc/constants.cc
@@ -31,8 +31,12 @@ const char* const kSaslProtoName = "kudu";
 //
 // NOTE: the TLS_AUTHENTICATION_ONLY flag is dynamically added on both
 // sides based on the remote peer's address.
+//
+// NOTE: the REQUEST_FOOTERS is always set on the client side. The server side
+// which supports parsing footer sets REQUEST_FOOTERS if client side has it set.
 set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
-set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
+set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS,
+                                                        REQUEST_FOOTERS };
 
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 9350d21..11b7585 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -54,7 +54,14 @@ InboundCall::~InboundCall() {}
 Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
   TRACE_EVENT_FLOW_BEGIN0("rpc", "InboundCall", this);
   TRACE_EVENT0("rpc", "InboundCall::ParseFrom");
-  RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_, &serialized_request_));
+  bool has_footer = conn_->RemoteSupportsFooter();
+  RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_,
+      has_footer ? &footer_ : nullptr, &serialized_request_));
+
+  // If the transfer was already aborted, return.
+  if (PREDICT_FALSE(has_footer && footer_.has_aborted() && footer_.aborted())) {
+    return Status::Aborted("request footer has aborted flag set");
+  }
 
   // Adopt the service/method info from the header as soon as it's available.
   if (PREDICT_FALSE(!header_.has_remote_method())) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 84e6745..83a750f 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -71,9 +71,9 @@ class InboundCall {
 
   // Parse an inbound call message.
   //
-  // This only deserializes the call header, populating the 'header_' and
-  // 'serialized_request_' member variables. The actual call parameter is
-  // not deserialized, as this may be CPU-expensive, and this is called
+  // This only deserializes the call header and footer, populating the 'header_',
+  // 'footer_' and 'serialized_request_' member variables. The actual call parameter
+  // is not deserialized, as this may be CPU-expensive, and this is called
   // from the reactor thread.
   Status ParseFrom(gscoped_ptr<InboundTransfer> transfer);
 
@@ -222,9 +222,12 @@ class InboundCall {
   // The connection on which this inbound call arrived.
   scoped_refptr<Connection> conn_;
 
-  // The header of the incoming call. Set by ParseFrom()
+  // The header of the incoming call. Set by ParseFrom().
   RequestHeader header_;
 
+  // The footer of the incoming call. Set by ParseFrom().
+  RequestFooterPB footer_;
+
   // The serialized bytes of the request param protobuf. Set by ParseFrom().
   // This references memory held by 'transfer_'.
   Slice serialized_request_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 46c9df1..5010270 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -26,6 +26,7 @@
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
+#include "kudu/rpc/connection.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/rpc_controller.h"
@@ -78,8 +79,7 @@ OutboundCall::OutboundCall(const ConnectionId& conn_id,
       conn_id_(conn_id),
       callback_(std::move(callback)),
       controller_(DCHECK_NOTNULL(controller)),
-      response_(DCHECK_NOTNULL(response_storage)),
-      cancellation_requested_(false) {
+      response_(DCHECK_NOTNULL(response_storage)) {
   DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_)
            << " and RPC timeout: "
            << (controller->timeout().Initialized() ? controller->timeout().ToString() : "none");
@@ -118,6 +118,7 @@ size_t OutboundCall::SerializeTo(TransferPayload* slices) {
   serialization::SerializeHeader(
       header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
 
+  // Header and Msg in addition to sidecars.
   size_t n_slices = 2 + sidecars_.size();
   DCHECK_LE(n_slices, slices->size());
   auto slice_iter = slices->begin();
@@ -130,6 +131,28 @@ size_t OutboundCall::SerializeTo(TransferPayload* slices) {
   return n_slices;
 }
 
+void OutboundCall::AppendFooter(Slice* slice) {
+  DCHECK(!footer_.has_aborted());
+  footer_.set_aborted(false);
+  serialization::SerializeFooter(footer_, &footer_buf_);
+  *slice = Slice(footer_buf_);
+  serialization::IncrementMsgLength(footer_buf_.size(), &header_buf_);
+}
+
+void OutboundCall::MarkPayloadCancelled(TransferPayload* slices,
+                                        const uint8_t* relocated_dst) {
+  DCHECK(footer_.has_aborted());
+  auto slice_iter = slices->begin() + 2;
+  for (auto& sidecar : sidecars_) {
+    *slice_iter++ = Slice(relocated_dst, sidecar->AsSlice().size());
+  }
+  size_t old_size = footer_buf_.size();
+  footer_.set_aborted(true);
+  serialization::SerializeFooter(footer_, &footer_buf_);
+  DCHECK_EQ(footer_buf_.size(), old_size);
+  *slice_iter++ = Slice(footer_buf_);
+}
+
 void OutboundCall::SetRequestPayload(const Message& req,
     vector<unique_ptr<RpcSidecar>>&& sidecars) {
   DCHECK_EQ(-1, sidecar_byte_size_);
@@ -220,7 +243,8 @@ void OutboundCall::set_state_unlocked(State new_state) {
       DCHECK(state_ == SENT || state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
       break;
     case CANCELLED:
-      DCHECK(state_ == READY || state_ == ON_OUTBOUND_QUEUE || state_ == SENT);
+      DCHECK(state_ == READY || state_ == ON_OUTBOUND_QUEUE || state_ == SENDING ||
+             state_ == SENT);
       break;
     case FINISHED_SUCCESS:
       DCHECK_EQ(state_, SENT);
@@ -233,17 +257,24 @@ void OutboundCall::set_state_unlocked(State new_state) {
   state_ = new_state;
 }
 
-void OutboundCall::Cancel() {
-  cancellation_requested_ = true;
+void OutboundCall::Cancel(const Connection* conn) {
   // No lock needed as it's called from reactor thread
   switch (state_) {
+    case SENDING:
+      // 'conn' can be NULL if the connection was somehow shut down due to errors.
+      // If the remote server doesn't support parsing footer and the call is being sent
+      // already, ignore the cancellation request as if it were completed already.
+      if (PREDICT_FALSE(conn == nullptr || !conn->RemoteSupportsFooter())) {
+        break;
+      }
+      // fall-through if remote supports parsing footer.
+      FALLTHROUGH_INTENDED;
     case READY:
     case ON_OUTBOUND_QUEUE:
     case SENT: {
       SetCancelled();
       break;
     }
-    case SENDING:
     case NEGOTIATION_TIMED_OUT:
     case TIMED_OUT:
     case CANCELLED:
@@ -318,21 +349,19 @@ void OutboundCall::SetSending() {
 void OutboundCall::SetSent() {
   set_state(SENT);
 
-  // This method is called in the reactor thread, so free the header buf,
-  // which was also allocated from this thread. tcmalloc's thread caching
+  // This method is called in the reactor thread, so free the header and footer buf,
+  // which were also allocated from this thread. tcmalloc's thread caching
   // behavior is a lot more efficient if memory is freed from the same thread
   // which allocated it -- this lets it keep to thread-local operations instead
   // of taking a mutex to put memory back on the global freelist.
-  delete [] header_buf_.release();
+  header_buf_.clear();
+  header_buf_.shrink_to_fit();
+  footer_buf_.clear();
+  footer_buf_.shrink_to_fit();
 
   // request_buf_ is also done being used here, but since it was allocated by
   // the caller thread, we would rather let that thread free it whenever it
   // deletes the RpcController.
-
-  // If cancellation was requested, it's now a good time to do the actual cancellation.
-  if (cancellation_requested()) {
-    SetCancelled();
-  }
 }
 
 void OutboundCall::SetFailed(const Status &status,
@@ -416,6 +445,16 @@ bool OutboundCall::IsNegotiationError() const {
   }
 }
 
+bool OutboundCall::IsOnOutboundQueue() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return state_ == ON_OUTBOUND_QUEUE;
+}
+
+bool OutboundCall::IsInTransmission() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return state_ == ON_OUTBOUND_QUEUE || state_ == SENDING;
+}
+
 bool OutboundCall::IsFinished() const {
   std::lock_guard<simple_spinlock> l(lock_);
   switch (state_) {
@@ -502,8 +541,8 @@ Status CallResponse::GetSidecar(int idx, Slice* sidecar) const {
 
 Status CallResponse::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
   CHECK(!parsed_);
-  RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_,
-                                            &serialized_response_));
+  RETURN_NOT_OK(serialization::ParseMessage(
+      transfer->data(), &header_, nullptr, &serialized_response_));
 
   // Use information from header to extract the payload slices.
   RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 0ab1553..77e62d6 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -100,17 +100,32 @@ class OutboundCall {
     header_.set_call_id(call_id);
   }
 
-  // Serialize the call for the wire. Requires that SetRequestPayload()
-  // is called first. This is called from the Reactor thread.
-  // Returns the number of slices in the serialized call.
+  // Serialize the call for the wire. Requires that SetRequestPayload() is called first.
+  // This is called from the Reactor thread. The serialized payload are stored as slices
+  // in "slices". Returns the number of slices in the serialized call.
   size_t SerializeTo(TransferPayload* slices);
 
+  // Called right before the outbound call begins transmission to add a footer to the
+  // message. This function sets aborted flag in 'footer' to false, serializes it to PB
+  // and stores it in 'footer_buf_' and updates 'slice' to point to it. The total message
+  // length (stored in the first 4 bytes of "header_buf_") is incremented to account for
+  // the footer.
+  // REQUIRES: must be called from the reactor thread.
+  void AppendFooter(Slice* slice);
+
+  // Called when the call is cancelled. Updates all sidecar slices to point to
+  // 'relocated_dst' while keeping their sizes unchanged. Also update the footer
+  // to indicate that the call has been aborted. Can only be called if AppendFooter()
+  // was called before.
+  // REQUIRES: must be called from the reactor thread.
+  void MarkPayloadCancelled(TransferPayload* slices, const uint8_t* relocated_dst);
+
   // Mark in the call that cancellation has been requested. If the call hasn't yet
   // started sending or has finished sending the RPC request but is waiting for a
-  // response, cancel the RPC right away. Otherwise, wait until the RPC has finished
-  // sending before cancelling it. If the call is finished, it's a no-op.
+  // response, cancel the RPC right away. If the RPC is being sent, cancel it only
+  // if the remote supports "REQUEST_FOOTERS". If the call is finished, it's a no-op.
   // REQUIRES: must be called from the reactor thread.
-  void Cancel();
+  void Cancel(const Connection* conn);
 
   // Callback after the call has been put on the outbound connection queue.
   void SetQueued();
@@ -133,15 +148,27 @@ class OutboundCall {
   // Mark the call as timed out. This also triggers the callback to notify
   // the caller.
   void SetTimedOut(Phase phase);
+
   bool IsTimedOut() const;
 
   bool IsNegotiationError() const;
 
   bool IsCancelled() const;
 
+  bool IsOnOutboundQueue() const;
+
+  // True if the call is scheduled to be sent or in the process of being sent.
+  // There is an entry for the call in the OutboundTransfer queue.
+  bool IsInTransmission() const;
+
   // Is the call finished?
   bool IsFinished() const;
 
+  // True if the outbound call has any non-empty sidecars attached to it.
+  bool HasNonEmptySidecars() const {
+    return sidecar_byte_size_ > 0;
+  }
+
   // Fill in the call response.
   void SetResponse(gscoped_ptr<CallResponse> resp);
 
@@ -173,12 +200,6 @@ class OutboundCall {
     return header_.call_id();
   }
 
-  // Returns true if cancellation has been requested. Must be called from
-  // reactor thread.
-  bool cancellation_requested() const {
-    return cancellation_requested_;
-  }
-
   // Test function which returns true if a cancellation request should be injected
   // at the current state.
   bool ShouldInjectCancellation() const {
@@ -228,6 +249,11 @@ class OutboundCall {
   // This will only be non-NULL if status().IsRemoteError().
   const ErrorStatusPB* error_pb() const;
 
+  // Call the user-provided callback. Note that entries in 'sidecars_' are cleared
+  // prior to invoking the callback so the client can assume that the call doesn't
+  // hold references to outbound sidecars.
+  void CallCallback();
+
   // Lock for state_ status_, error_pb_ fields, since they
   // may be mutated by the reactor thread while the client thread
   // reads them.
@@ -236,16 +262,15 @@ class OutboundCall {
   Status status_;
   gscoped_ptr<ErrorStatusPB> error_pb_;
 
-  // Call the user-provided callback. Note that entries in 'sidecars_' are cleared
-  // prior to invoking the callback so the client can assume that the call doesn't
-  // hold references to outbound sidecars.
-  void CallCallback();
-
   // The RPC header.
   // Parts of this (eg the call ID) are only assigned once this call has been
   // passed to the reactor thread and assigned a connection.
   RequestHeader header_;
 
+  // The RPC footer.
+  // This is sent last as part of a RPC request.
+  RequestFooterPB footer_;
+
   // The remote method being called.
   RemoteMethod remote_method_;
 
@@ -262,6 +287,7 @@ class OutboundCall {
   // Buffers for storing segments of the wire-format request.
   faststring header_buf_;
   faststring request_buf_;
+  faststring footer_buf_;
 
   // Once a response has been received for this call, contains that response.
   // Otherwise NULL.
@@ -273,9 +299,6 @@ class OutboundCall {
   // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
   int64_t sidecar_byte_size_ = -1;
 
-  // True if cancellation was requested on this call.
-  bool cancellation_requested_;
-
   DISALLOW_COPY_AND_ASSIGN(OutboundCall);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index b3b7ea2..5dc2f0f 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -299,7 +299,7 @@ void ReactorThread::CancelOutboundCall(const shared_ptr<OutboundCall>& call) {
                      &conn)) {
     conn->CancelOutboundCall(call);
   }
-  call->Cancel();
+  call->Cancel(conn.get());
 }
 
 //
@@ -504,7 +504,7 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>
   ThreadPool* negotiation_pool =
       reactor()->messenger()->negotiation_pool(conn->direction());
   RETURN_NOT_OK(negotiation_pool->SubmitClosure(
-        Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
+      Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index f9cba05..38a9bda 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -225,15 +225,19 @@ class GenericCalculatorService : public ServiceIf {
     LOG(INFO) << "got call: " << pb_util::SecureShortDebugString(req);
     SleepFor(MonoDelta::FromMicroseconds(req.sleep_micros()));
 
-    uint32 pattern = req.pattern();
-    uint32 num_repetitions = req.num_repetitions();
-    Slice sidecar;
-    CHECK_OK(incoming->GetInboundSidecar(req.sidecar_idx(), &sidecar));
-    CHECK_EQ(sidecar.size(), sizeof(uint32) * num_repetitions);
-    const uint32_t *data = reinterpret_cast<const uint32_t*>(sidecar.data());
-    for (int i = 0; i < num_repetitions; ++i) CHECK_EQ(data[i], pattern);
+    uint32_t pattern = req.pattern();
+    for (int i = 0; i < req.sidecar_idx_size(); ++i) {
+      Slice sidecar;
+      CHECK_OK(incoming->GetInboundSidecar(req.sidecar_idx(i), &sidecar));
+      CHECK_EQ(sidecar.size() % sizeof(uint32_t), 0);
+      const uint32_t *data = reinterpret_cast<const uint32_t*>(sidecar.data());
+      int num_repetitions = sidecar.size() / sizeof(uint32_t);
+      for (int j = 0; j < num_repetitions; ++j) {
+        CHECK_EQ(data[j], pattern);
+      }
+    }
 
-    SleepResponsePB resp;
+    SleepWithSidecarResponsePB resp;
     incoming->RespondSuccess(resp);
   }
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 3989558..313b469 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -961,7 +961,7 @@ TEST_P(TestRpc, TestCancellation) {
   client_messenger->Shutdown();
 }
 
-#define TEST_PAYLOAD_SIZE  (1 << 23)
+#define TEST_PAYLOAD_SIZE  (1 << 26)
 #define TEST_SLEEP_TIME_MS (500)
 
 static void SleepCallback(uint8_t* payload, CountDownLatch* latch) {
@@ -991,30 +991,38 @@ TEST_P(TestRpc, TestCancellationAsync) {
   // Used to generate sleep time between invoking RPC and requesting cancellation.
   Random rand(SeedRandom());
 
-  for (int i = 0; i < 10; ++i) {
+  // Set the max allowed payload size to 1GB.
+  FLAGS_rpc_max_message_size = 1 << 30;
+
+  for (int i = 0; i < 100; ++i) {
     SleepWithSidecarRequestPB req;
     SleepWithSidecarResponsePB resp;
 
     // Initialize the payload with non-zero pattern.
-    memset(payload.get(), 0xff, TEST_PAYLOAD_SIZE);
     req.set_sleep_micros(TEST_SLEEP_TIME_MS);
     req.set_pattern(0xffffffff);
-    req.set_num_repetitions(TEST_PAYLOAD_SIZE / sizeof(uint32_t));
+    memset(payload.get(), 0xff, TEST_PAYLOAD_SIZE);
 
-    int idx;
-    Slice s(payload.get(), TEST_PAYLOAD_SIZE);
-    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(s), &idx));
-    req.set_sidecar_idx(idx);
+    uint32 num_sidecars = rand.Uniform32(TransferLimits::kMaxSidecars);
+    for (int j = 0; j < num_sidecars; ++j) {
+      uint32 num_repetitions = rand.Uniform32(TEST_PAYLOAD_SIZE) / sizeof(uint32_t);
+      Slice s(payload.get(), num_repetitions * sizeof(uint32_t));
+      int idx;
+      CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(s), &idx));
+      req.add_sidecar_idx(idx);
+    }
 
     CountDownLatch latch(1);
+    controller.set_timeout(MonoDelta::FromMicroseconds(rand.Uniform64(i * 500L + 1)));
     p.AsyncRequest(GenericCalculatorService::kSleepWithSidecarMethodName,
                    req, &resp, &controller,
                    boost::bind(SleepCallback, payload.get(), &latch));
     // Sleep for a while before cancelling the RPC.
-    if (i > 0) SleepFor(MonoDelta::FromMicroseconds(rand.Uniform64(i * 30)));
+    SleepFor(MonoDelta::FromMicroseconds(rand.Uniform64(i * 500L + 1)));
     controller.Cancel();
     latch.Wait();
-    ASSERT_TRUE(controller.status().IsAborted() || controller.status().ok());
+    const Status &status = controller.status();
+    ASSERT_TRUE(status.IsTimedOut() || status.IsAborted() || status.ok());
     controller.Reset();
   }
   client_messenger->Shutdown();

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 1d55b6a..a9f7396 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -91,6 +91,10 @@ enum RpcFeatureFlag {
   // This is currently used for loopback connections only, so that compute
   // frameworks which schedule for locality don't pay encryption overhead.
   TLS_AUTHENTICATION_ONLY = 3;
+
+  // The RPC system is required to support parsing a footer in an incoming request.
+  // See RequestFooterPB below.
+  REQUEST_FOOTERS = 4;
 };
 
 // An authentication type. This is modeled as a oneof in case any of these
@@ -265,6 +269,16 @@ message RequestHeader {
   repeated uint32 sidecar_offsets = 16;
 }
 
+message RequestFooterPB {
+  // This flag indicates whether the incoming request has been aborted by the client
+  // and the request should be ignored.
+  optional bool aborted = 1 [ default = false ];
+
+  // Note that the footer is designed to be modified after the initial serialization
+  // and it will be re-serialized after modification. To avoid unexpected change in
+  // the total message size, keep to using fixed sized fields only in the footer.
+}
+
 message ResponseHeader {
   required int32 call_id = 1;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/rtest.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto
index d212cef..257ca29 100644
--- a/src/kudu/rpc/rtest.proto
+++ b/src/kudu/rpc/rtest.proto
@@ -58,8 +58,7 @@ message SleepResponsePB {
 message SleepWithSidecarRequestPB {
   required uint32 sleep_micros = 1;
   required uint32 pattern = 2;
-  required uint32 num_repetitions = 3;
-  required uint32 sidecar_idx = 4;
+  repeated uint32 sidecar_idx = 3;
 }
 
 message SleepWithSidecarResponsePB {

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/serialization.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/serialization.cc b/src/kudu/rpc/serialization.cc
index dbb0fc5..bf24948 100644
--- a/src/kudu/rpc/serialization.cc
+++ b/src/kudu/rpc/serialization.cc
@@ -48,7 +48,7 @@ enum {
 };
 
 void SerializeMessage(const MessageLite& message, faststring* param_buf,
-                        int additional_size, bool use_cached_size) {
+                      int additional_size, bool use_cached_size) {
   int pb_size = use_cached_size ? message.GetCachedSize() : message.ByteSize();
   DCHECK_EQ(message.ByteSize(), pb_size);
   int recorded_size = pb_size + additional_size;
@@ -66,7 +66,7 @@ void SerializeMessage(const MessageLite& message, faststring* param_buf,
   uint8_t* dst = param_buf->data();
   dst = CodedOutputStream::WriteVarint32ToArray(recorded_size, dst);
   dst = message.SerializeWithCachedSizesToArray(dst);
-  CHECK_EQ(dst, param_buf->data() + size_with_delim);
+  DCHECK_EQ(dst, param_buf->data() + size_with_delim);
 }
 
 void SerializeHeader(const MessageLite& header,
@@ -96,11 +96,32 @@ void SerializeHeader(const MessageLite& header,
   dst = header.SerializeWithCachedSizesToArray(dst);
 
   // We should have used the whole buffer we allocated.
-  CHECK_EQ(dst, header_buf->data() + header_tot_len);
+  DCHECK_EQ(dst, header_buf->data() + header_tot_len);
+}
+
+void SerializeFooter(const MessageLite& footer,
+                     faststring* footer_buf) {
+  size_t footer_pb_len = footer.ByteSize();
+  size_t footer_tot_len = footer_pb_len + CodedOutputStream::VarintSize32(footer_pb_len);
+
+  footer_buf->resize(footer_tot_len);
+  uint8_t* dst = footer_buf->data();
+
+  dst = CodedOutputStream::WriteVarint32ToArray(footer_pb_len, dst);
+  dst = footer.SerializeWithCachedSizesToArray(dst);
+
+  DCHECK_EQ(dst, footer_buf->data() + footer_tot_len);
+}
+
+void IncrementMsgLength(size_t inc_len, faststring* header_buf) {
+  uint8_t* dst = header_buf->data();
+  uint32_t tot_len = NetworkByteOrder::Load32(dst);
+  NetworkByteOrder::Store32(dst, tot_len + inc_len);
 }
 
 Status ParseMessage(const Slice& buf,
                     MessageLite* parsed_header,
+                    MessageLite* parsed_footer,
                     Slice* parsed_main_message) {
 
   // First grab the total length
@@ -136,20 +157,37 @@ Status ParseMessage(const Slice& buf,
                               KUDU_REDACT(buf.ToDebugString()));
   }
 
+  int main_msg_offset = in.CurrentPosition();
+  DCHECK_GT(main_msg_offset, 0);
   if (PREDICT_FALSE(!in.Skip(main_msg_len))) {
     return Status::Corruption(
         StringPrintf("Invalid packet: data too short, expected %d byte main_msg", main_msg_len),
         KUDU_REDACT(buf.ToDebugString()));
   }
 
+  if (parsed_footer != nullptr) {
+    uint32_t footer_len = 0;
+    if (PREDICT_FALSE(!in.ReadVarint32(&footer_len))) {
+      return Status::Corruption("Invalid packet: missing footer delimiter",
+                                KUDU_REDACT(buf.ToDebugString()));
+    }
+
+    l = in.PushLimit(footer_len);
+    if (PREDICT_FALSE(!parsed_footer->ParseFromCodedStream(&in))) {
+      return Status::Corruption("Invalid packet: footer too short",
+                                KUDU_REDACT(buf.ToDebugString()));
+    }
+    in.PopLimit(l);
+  }
+
+  *parsed_main_message = Slice(buf.data() + main_msg_offset, main_msg_len);
+
   if (PREDICT_FALSE(in.BytesUntilLimit() > 0)) {
     return Status::Corruption(
-      StringPrintf("Invalid packet: %d extra bytes at end of packet", in.BytesUntilLimit()),
-      KUDU_REDACT(buf.ToDebugString()));
+        StringPrintf("Invalid packet: %d extra bytes at end of packet",
+                     in.BytesUntilLimit()), KUDU_REDACT(buf.ToDebugString()));
   }
 
-  *parsed_main_message = Slice(buf.data() + buf.size() - main_msg_len,
-                              main_msg_len);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/serialization.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/serialization.h b/src/kudu/rpc/serialization.h
index 26df3a7..f61f973 100644
--- a/src/kudu/rpc/serialization.h
+++ b/src/kudu/rpc/serialization.h
@@ -65,13 +65,28 @@ void SerializeHeader(const google::protobuf::MessageLite& header,
                      size_t param_len,
                      faststring* header_buf);
 
+// Serialize the request footer into a buffer which is allocated by this function.
+// In: 'footer' Protobuf footer to serialize,
+// Out: 'footer_buf' faststring to be populated with the serialized bytes.
+void SerializeFooter(const google::protobuf::MessageLite& footer,
+                     faststring* footer_buf);
+
+// Increment the total message length in a header after it was serialized initially.
+// In: 'inc_len' bytes to increment the total message length by.
+//     'header_buf' is the serialized PB buffer returned from SerializeHeader().
+//      Assumes that the first 4 bytes encode the total msg length in big endian order.
+void IncrementMsgLength(size_t inc_len,
+                        faststring* header_buf);
+
 // Deserialize the request.
 // In: data buffer Slice.
 // Out: parsed_header PB initialized,
+//      parsed_footer PB initialized if it's not NULL,
 //      parsed_main_message pointing to offset in original buffer containing
 //      the main payload.
 Status ParseMessage(const Slice& buf,
                     google::protobuf::MessageLite* parsed_header,
+                    google::protobuf::MessageLite* parsed_footer,
                     Slice* parsed_main_message);
 
 // Serialize the RPC connection header (magic number + flags).

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index 741310e..6e0ccf8 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -499,6 +499,9 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
       server_features_.insert(TLS_AUTHENTICATION_ONLY);
     }
   }
+  if (ContainsKey(client_features_, REQUEST_FOOTERS)) {
+    server_features_.insert(REQUEST_FOOTERS);
+  }
 
   for (RpcFeatureFlag feature : server_features_) {
     response.add_supported_features(feature);

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index d660869..6446d5e 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -30,6 +30,7 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
+#include "kudu/util/memory/memory.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 
@@ -53,10 +54,19 @@ namespace kudu {
 namespace rpc {
 
 using std::ostringstream;
-using std::set;
+using std::shared_ptr;
 using std::string;
 using strings::Substitute;
 
+// The outbound transfer for an outbound call can be aborted after it has started already.
+// The system has to continue sending up to number of bytes specified in the header or the
+// receiving end will never make progress, leading to resource leak. In order to release
+// the sidecars early, we call MarkPayloadCancelled() on the outbound call to relocate the
+// sidecars to point to 'g_dummy_buffer'. SendBuffer() below recognizes slices pointing to
+// this dummy buffer and handles them differently.
+#define DUMMY_BUFFER_SIZE (1 << 20)
+uint8_t g_dummy_buffer[DUMMY_BUFFER_SIZE];
+
 #define RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status) \
   if (PREDICT_FALSE(!status.ok())) {                            \
     if (Socket::IsTemporarySocketError(status.posix_code())) {  \
@@ -65,6 +75,13 @@ using strings::Substitute;
     return status;                                              \
   }
 
+// Initialize the dummy buffer with a known pattern for easier debugging.
+__attribute__((constructor))
+static void InitializeDummyBuffer() {
+  OverwriteWithPattern(reinterpret_cast<char*>(g_dummy_buffer),
+                       DUMMY_BUFFER_SIZE, "ABORTED-TRANSFER");
+}
+
 TransferCallbacks::~TransferCallbacks()
 {}
 
@@ -152,23 +169,30 @@ OutboundTransfer::OutboundTransfer(int32_t call_id,
                                    const TransferPayload &payload,
                                    size_t n_payload_slices,
                                    TransferCallbacks *callbacks)
-  : cur_slice_idx_(0),
+  : n_payload_slices_(n_payload_slices),
+    cur_slice_idx_(0),
     cur_offset_in_slice_(0),
     callbacks_(callbacks),
     call_id_(call_id),
     aborted_(false) {
-
-  n_payload_slices_ = n_payload_slices;
-  CHECK_LE(n_payload_slices_, payload_slices_.size());
+  // We should leave the last entry for the footer.
+  CHECK_LE(n_payload_slices_, payload_slices_.size() - 1);
   for (int i = 0; i < n_payload_slices; i++) {
     payload_slices_[i] = payload[i];
   }
 }
 
+void OutboundTransfer::AppendFooter(const shared_ptr<OutboundCall> &call) {
+  DCHECK(!TransferStarted());
+  DCHECK(is_for_outbound_call());
+  DCHECK_LE(n_payload_slices_, payload_slices_.size() - 1);
+  call->AppendFooter(&payload_slices_[n_payload_slices_++]);
+}
+
 OutboundTransfer::~OutboundTransfer() {
   if (!TransferFinished() && !aborted_) {
     callbacks_->NotifyTransferAborted(
-      Status::RuntimeError("RPC transfer destroyed before it finished sending"));
+        Status::RuntimeError("RPC transfer destroyed before it finished sending"));
   }
 }
 
@@ -179,20 +203,57 @@ void OutboundTransfer::Abort(const Status &status) {
   aborted_ = true;
 }
 
+void OutboundTransfer::Cancel(const shared_ptr<OutboundCall> &call,
+                              const Status &status) {
+  // Only transfer for outbound calls support cancellation.
+  DCHECK(is_for_outbound_call());
+  // If transfer has finished already, it's a no-op.
+  if (TransferFinished()) {
+    return;
+  }
+  // If transfer hasn't started yet, simply abort it.
+  if (!TransferStarted()) {
+    Abort(status);
+    return;
+  }
+  // Transfer has started already. Update the payload to give up the sidecars
+  // if they haven't all been sent yet. We only reach this point if remote also
+  // supports cancellation so the last slice is always the footer. Don't cancel
+  // the transfer if only the footer is left.
+  if (cur_slice_idx_ < n_payload_slices_ - 1) {
+    call->MarkPayloadCancelled(&payload_slices_, g_dummy_buffer);
+    status_ = status;
+  }
+}
+
+#define IO_VEC_SIZE (16)
+
 Status OutboundTransfer::SendBuffer(Socket &socket) {
-  CHECK_LT(cur_slice_idx_, n_payload_slices_);
-
-  int n_iovecs = n_payload_slices_ - cur_slice_idx_;
-  struct iovec iovec[n_iovecs];
-  {
-    int offset_in_slice = cur_offset_in_slice_;
-    for (int i = 0; i < n_iovecs; i++) {
-      Slice &slice = payload_slices_[cur_slice_idx_ + i];
-      iovec[i].iov_base = slice.mutable_data() + offset_in_slice;
-      iovec[i].iov_len = slice.size() - offset_in_slice;
-
-      offset_in_slice = 0;
+  DCHECK_LT(cur_slice_idx_, n_payload_slices_);
+
+  int idx = cur_slice_idx_;
+  int offset_in_slice = cur_offset_in_slice_;
+  struct iovec iovec[IO_VEC_SIZE];
+  int n_iovecs;
+  for (n_iovecs = 0; n_iovecs < IO_VEC_SIZE && idx < n_payload_slices_; ++n_iovecs) {
+    Slice &slice = payload_slices_[idx];
+    uint8_t* ptr = slice.mutable_data();
+    if (PREDICT_TRUE(ptr != g_dummy_buffer)) {
+      iovec[n_iovecs].iov_base = ptr + offset_in_slice;
+      iovec[n_iovecs].iov_len = slice.size() - offset_in_slice;
+    } else {
+      iovec[n_iovecs].iov_base = g_dummy_buffer;
+      iovec[n_iovecs].iov_len = slice.size() - offset_in_slice;
+      // The dummy buffer is of limited size. Split a slice into multiple slices
+      // if it's larger than DUMMY_BUFFER_SIZE.
+      if (iovec[n_iovecs].iov_len > DUMMY_BUFFER_SIZE) {
+        iovec[n_iovecs].iov_len = DUMMY_BUFFER_SIZE;
+        offset_in_slice += DUMMY_BUFFER_SIZE;
+        continue;
+      }
     }
+    offset_in_slice = 0;
+    ++idx;
   }
 
   int32_t written;
@@ -218,7 +279,11 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
   }
 
   if (cur_slice_idx_ == n_payload_slices_) {
-    callbacks_->NotifyTransferFinished();
+    if (PREDICT_FALSE(!status_.ok())) {
+      callbacks_->NotifyTransferAborted(status_);
+    } else {
+      callbacks_->NotifyTransferFinished();
+    }
     DCHECK_EQ(0, cur_offset_in_slice_);
   } else {
     DCHECK_LT(cur_slice_idx_, n_payload_slices_);
@@ -240,6 +305,10 @@ bool OutboundTransfer::TransferFinished() const {
   return false;
 }
 
+bool OutboundTransfer::TransferAborted() const {
+  return aborted_;
+}
+
 string OutboundTransfer::HexDump() const {
   if (KUDU_SHOULD_REDACT()) {
     return kRedactionMessage;

http://git-wip-us.apache.org/repos/asf/kudu/blob/351337ee/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 2a2b726..d8e3a59 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -45,13 +45,14 @@ class Socket;
 namespace rpc {
 
 class Messenger;
+class OutboundCall;
 struct TransferCallbacks;
 
 class TransferLimits {
  public:
   enum {
     kMaxSidecars = 10,
-    kMaxPayloadSlices = kMaxSidecars + 2 // (header + msg)
+    kMaxPayloadSlices = kMaxSidecars + 3 // (header + msg + footer)
   };
 
   DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
@@ -140,6 +141,17 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
   // This triggers TransferCallbacks::NotifyTransferAborted.
   void Abort(const Status &status);
 
+  // Cancel the current transfer. If the transfer hasn't started yet, call Abort()
+  // with 'status'. If the transfer has started already, relinquish the sidecars
+  // associated with 'call' and send the remaining bytes as dummy bytes. When the
+  // transfer eventually finishes, NotifyTransferAborted() wll be called with 'status'.
+  void Cancel(const std::shared_ptr<OutboundCall> &call, const Status &status);
+
+  // Called right before the outbound transfer is sent for the first time to append
+  // a footer to the entire message. Only valid if the transfer is for an outbound
+  // call and the remote server has RPC feature flag "REQUEST_FOOTERS".
+  void AppendFooter(const std::shared_ptr<OutboundCall> &call);
+
   // send from our buffers into the sock
   Status SendBuffer(Socket &socket);
 
@@ -149,6 +161,9 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
   // Return true if the entire transfer has been sent.
   bool TransferFinished() const;
 
+  // Return true if the transfer has been aborted before completion.
+  bool TransferAborted() const;
+
   // Return the total number of bytes to be sent (including those already sent)
   int32_t TotalLength() const;
 
@@ -187,8 +202,12 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
   // In the case of call responses, kInvalidCallId
   int32_t call_id_;
 
+  // Set to true when the outbound transfer is aborted (e.g. time out, cancellation).
   bool aborted_;
 
+  // Status passed to NotifyTransferAborted() when the transfer is aborted.
+  Status status_;
+
   DISALLOW_COPY_AND_ASSIGN(OutboundTransfer);
 };
 


Mime
View raw message