kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [4/4] kudu git commit: Fix bug in incorrect response rebuilding on tablet bootstrap
Date Fri, 05 May 2017 20:49:27 GMT
Fix bug in incorrect response rebuilding on tablet bootstrap

This fixes the bug described in: https://gerrit.cloudera.org/#/c/5417/
... and enables the test disabled in that patch.

Along the way this also performs some cleanup of tablet bootstrap.

I ran raft_consensus-itest's TestInsertDuplicateKeysWithCrashyNodes
on dist-test for 5000 loops with the following config:

KUDU_ALLOW_SLOW_TESTS=1 ../../build-support/dist_test.py \
--disable-sharding loop -n 5000 -- bin/raft_consensus-itest \
--gtest_filter=*DuplicateKeysWithCrashyNodes

Prior to this patch the test failed 62/5000:
http://dist-test.cloudera.org//job?job_id=david.alves.1493915326.763

After this patch the test passes 5000/5000:
http://dist-test.cloudera.org//job?job_id=david.alves.1493914745.27867

Change-Id: I1219ed5f7835e93cd7f3b128cedd650fc3384482
Reviewed-on: http://gerrit.cloudera.org:8080/5489
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4df9f596
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4df9f596
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4df9f596

Branch: refs/heads/master
Commit: 4df9f59698dbc4808dfe055dba99b1f8745ede95
Parents: 30d9e72
Author: David Alves <dralves@apache.org>
Authored: Tue Dec 13 00:59:17 2016 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Fri May 5 20:48:33 2017 +0000

----------------------------------------------------------------------
 src/kudu/tablet/row_op.cc              |   8 +-
 src/kudu/tablet/row_op.h               |   9 +-
 src/kudu/tablet/tablet.proto           |   5 +-
 src/kudu/tablet/tablet_bootstrap.cc    | 214 ++++++++++++++--------------
 src/kudu/tserver/tablet_server-test.cc |   2 +-
 5 files changed, 123 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4df9f596/src/kudu/tablet/row_op.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/row_op.cc b/src/kudu/tablet/row_op.cc
index 50d4fc3..c1e9533 100644
--- a/src/kudu/tablet/row_op.cc
+++ b/src/kudu/tablet/row_op.cc
@@ -52,10 +52,10 @@ string RowOp::ToString(const Schema& schema) const {
   return decoded_op.ToString(schema);
 }
 
-void RowOp::SetAlreadyFlushed() {
-  DCHECK(!result) << SecureDebugString(*result);
-  result.reset(new OperationResultPB());
-  result->set_flushed(true);
+void RowOp::SetSkippedResult(const OperationResultPB& result) {
+  DCHECK(!this->result) << SecureDebugString(*this->result);
+  DCHECK(result.skip_on_replay());
+  this->result.reset(new OperationResultPB(result));
 }
 
 } // namespace tablet

http://git-wip-us.apache.org/repos/asf/kudu/blob/4df9f596/src/kudu/tablet/row_op.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h
index 904ba70..f10bc9b 100644
--- a/src/kudu/tablet/row_op.h
+++ b/src/kudu/tablet/row_op.h
@@ -34,15 +34,18 @@ namespace tablet {
 struct RowOp {
  public:
   explicit RowOp(DecodedRowOperation decoded_op);
+  RowOp();
   ~RowOp();
 
   // Functions to set the result of the mutation.
-  // Only one of the following three functions must be called,
-  // at most once.
+  // Only one of the following four functions must be called, at most once.
   void SetFailed(const Status& s);
   void SetInsertSucceeded(int mrs_id);
   void SetMutateSucceeded(gscoped_ptr<OperationResultPB> result);
-  void SetAlreadyFlushed();
+  // Sets the result of a skipped operation on bootstrap.
+  // TODO(dralves) Currently this performs a copy. Might be avoided with some refactoring.
+  // see TODO(dralves) in TabletBoostrap::ApplyOperations().
+  void SetSkippedResult(const OperationResultPB& result);
 
   // In the case that this operation is being replayed from the WAL
   // during tablet bootstrap, we may need to look at the original result

http://git-wip-us.apache.org/repos/asf/kudu/blob/4df9f596/src/kudu/tablet/tablet.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.proto b/src/kudu/tablet/tablet.proto
index 975c3be..6850ac1 100644
--- a/src/kudu/tablet/tablet.proto
+++ b/src/kudu/tablet/tablet.proto
@@ -39,8 +39,9 @@ message MemStoreTargetPB {
 // Stores the result of an Insert or Mutate.
 message OperationResultPB {
 
-  // set on replay if this operation was already flushed.
-  optional bool flushed = 1 [ default = false ];
+  // set on replay to reflect that this operation was already flushed or had previously
+  // failed and should not be applied again.
+  optional bool skip_on_replay = 1 [ default = false ];
 
   // set if this particular operation failed
   optional kudu.AppStatusPB failed_status = 2;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4df9f596/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 47775b9..c9150d2 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -248,32 +248,46 @@ class TabletBootstrap {
   Status PlayNoOpRequest(ReplicateMsg* replicate_msg,
                          const CommitMsg& commit_msg);
 
-  // Plays operations, skipping those that have already been flushed,
-  // as indicated in the 'already_flushed' vector.
+  // Plays operations, skipping those that have already been flushed or have previously failed.
+  // See ApplyRowOperations() for more details on how the decision of whether an operation
+  // is applied or skipped is made.
   Status PlayRowOperations(WriteTransactionState* tx_state,
-                           const SchemaPB& schema_pb,
-                           const RowOperationsPB& ops_pb,
-                           const TxResultPB& result,
-                           const vector<bool>& already_flushed);
+                           const TxResultPB& orig_result,
+                           TxResultPB* new_result);
 
-  // Determine which of the operations from 'result' correspond to already-flushed stores.
+  // Determine which of the operations from 'orig_result' must be skipped.
   // At the same time this builds the WriteResponsePB that we'll store on the ResultTracker.
-  Status DetermineFlushedOpsAndBuildResponse(const TxResultPB& result,
-                                             vector<bool>* flushed_by_op,
-                                             WriteResponsePB* response);
-
-  // Pass through all of the decoded operations in tx_state. For
-  // each op:
+  // 'new_result' store the results of the operations that were skipped, 'response' stores
+  // any error that might have previously happened so that we can send them back to clients,
+  // if needed.
+  // Finally 'all_skipped' indicates whether all of the original operations were skipped.
+  Status DetermineSkippedOpsAndBuildResponse(const TxResultPB& orig_result,
+                                             TxResultPB* new_result,
+                                             WriteResponsePB* response,
+                                             bool* all_skipped);
+
+  // Pass through all of the decoded operations in tx_state. For each op:
   // - if it was previously failed, mark as failed
   // - if it previously succeeded but was flushed, skip it.
   // - otherwise, re-apply to the tablet being bootstrapped.
   Status ApplyOperations(WriteTransactionState* tx_state,
-                         const TxResultPB& orig_result);
+                         const TxResultPB& orig_result,
+                         TxResultPB* new_result);
+
+  enum OpAction {
+    // The operation was never applied or was applied to an unflushed memory store and thus
+    // needs to be applied again.
+    NEEDS_REPLAY,
+    // The operation was already applied to a memory store that was flushed.
+    SKIP_PREVIOUSLY_FLUSHED,
+    // The operation was never applied due to an error.
+    SKIP_PREVIOUSLY_FAILED
+  };
 
-  // Filter a row  operation, setting '*already_flushed' to indicate if
-  // it was already flushed.
+  // Filter a row operation, setting 'action' to indicate what needs to be done
+  // to the operation, i.e. whether it must applied or skipped.
   Status FilterOperation(const OperationResultPB& op_result,
-                         bool* already_flushed);
+                         OpAction* action);
 
   enum ActiveStores {
     // The OperationResultPBs in the commit message do not reference any stores.
@@ -1242,22 +1256,38 @@ Status TabletBootstrap::AppendCommitMsg(const CommitMsg& commit_msg)
{
   return log_->Append(&commit_entry);
 }
 
-Status TabletBootstrap::DetermineFlushedOpsAndBuildResponse(const TxResultPB& result,
-                                                            vector<bool>* flushed_by_op,
-                                                            WriteResponsePB* response) {
-  int num_ops = result.ops_size();
-  flushed_by_op->resize(num_ops);
+Status TabletBootstrap::DetermineSkippedOpsAndBuildResponse(const TxResultPB& orig_result,
+                                                            TxResultPB* new_result,
+                                                            WriteResponsePB* response,
+                                                            bool* all_skipped) {
+  int num_ops = orig_result.ops_size();
+  new_result->mutable_ops()->Reserve(num_ops);
+  *all_skipped = true;
 
   for (int i = 0; i < num_ops; i++) {
-    const auto& orig_op_result = result.ops(i);
-    if (orig_op_result.has_failed_status() && response) {
-      WriteResponsePB::PerRowErrorPB* error = response->add_per_row_errors();
-      error->set_row_index(i);
-      error->mutable_error()->CopyFrom(orig_op_result.failed_status());
+    const auto& orig_op_result = orig_result.ops(i);
+    OpAction action;
+    RETURN_NOT_OK(FilterOperation(orig_op_result, &action));
+    *all_skipped &= action != NEEDS_REPLAY;
+
+    if (action != NEEDS_REPLAY) {
+      new_result->mutable_ops(i)->set_skip_on_replay(true);
+    }
+
+    if (action == SKIP_PREVIOUSLY_FAILED) {
+      if (response) {
+        WriteResponsePB::PerRowErrorPB* error = response->add_per_row_errors();
+        error->set_row_index(i);
+        error->mutable_error()->CopyFrom(orig_op_result.failed_status());
+      }
+      // If the op is already flushed we won't be applying it.
+      DCHECK(orig_op_result.has_failed_status());
+      new_result->mutable_ops(i)->mutable_failed_status()->CopyFrom(orig_op_result.failed_status());
     }
-    bool f;
-    RETURN_NOT_OK(FilterOperation(orig_op_result, &f));
-    (*flushed_by_op)[i] = f;
+  }
+
+  if (*all_skipped) {
+    stats_.ops_ignored++;
   }
   return Status::OK();
 }
@@ -1267,8 +1297,8 @@ Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
   // Prepare the commit entry for the rewritten log.
   LogEntryPB commit_entry;
   commit_entry.set_type(log::COMMIT);
-  CommitMsg* commit = commit_entry.mutable_commit();
-  commit->CopyFrom(commit_msg);
+  CommitMsg* new_commit = commit_entry.mutable_commit();
+  new_commit->CopyFrom(commit_msg);
 
   // Set up the new transaction.
   // Even if we're going to ignore the transaction, it's important to
@@ -1313,45 +1343,30 @@ Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
   // storage and don't need to be re-applied. We can do this even before
   // we decode any row operations, so we can short-circuit that decoding
   // in the case that the entire op has been already flushed.
-  vector<bool> already_flushed;
-  RETURN_NOT_OK(DetermineFlushedOpsAndBuildResponse(commit_msg.result(),
-                                                    &already_flushed,
-                                                    response.get()));
+  TxResultPB* new_result = new_commit->mutable_result();
+  bool all_flushed;
+  RETURN_NOT_OK(DetermineSkippedOpsAndBuildResponse(commit_msg.result(),
+                                                    new_result,
+                                                    response.get(),
+                                                    &all_flushed));
 
   if (tracking_results && state == ResultTracker::NEW) {
     result_tracker_->RecordCompletionAndRespond(replicate_msg->request_id(), response.get());
   }
 
   Status play_status;
-  bool all_already_flushed = std::all_of(already_flushed.begin(),
-                                         already_flushed.end(),
-                                         [](bool f) { return f; });
-  if (all_already_flushed) {
-    stats_.ops_ignored++;
-    for (auto& op : *commit->mutable_result()->mutable_ops()) {
-      op.Clear();
-      op.set_flushed(true);
-    }
-  } else {
-    if (write->has_row_operations()) {
-      // TODO(todd): get rid of redundant params below - they can be gotten from the Request
-      // Rather than RETURN_NOT_OK() here, we need to just save the status and do the
-      // RETURN_NOT_OK() down below the Commit() call below. Even though it seems wrong
-      // to commit the transaction when in fact it failed to apply, we would throw a CHECK
-      // failure if we attempted to 'Abort()' after entering the applying stage. Allowing
it to
-      // Commit isn't problematic because we don't expose the results anyway, and the bad
-      // Status returned below will cause us to fail the entire tablet bootstrap anyway.
-      play_status = PlayRowOperations(&tx_state,
-                                      write->schema(),
-                                      write->row_operations(),
-                                      commit_msg.result(),
-                                      already_flushed);
-    }
+  if (!all_flushed && write->has_row_operations()) {
+    // Rather than RETURN_NOT_OK() here, we need to just save the status and do the
+    // RETURN_NOT_OK() down below the Commit() call below. Even though it seems wrong
+    // to commit the transaction when in fact it failed to apply, we would throw a CHECK
+    // failure if we attempted to 'Abort()' after entering the applying stage. Allowing it
to
+    // Commit isn't problematic because we don't expose the results anyway, and the bad
+    // Status returned below will cause us to fail the entire tablet bootstrap anyway.
+    play_status = PlayRowOperations(&tx_state, commit_msg.result(), new_result);
 
     if (play_status.ok()) {
-      // Replace the original commit message's result with the new one from
-      // the replayed operation.
-      tx_state.ReleaseTxResultPB(commit->mutable_result());
+      // Replace the original commit message's result with the new one from the replayed
operation.
+      tx_state.ReleaseTxResultPB(new_commit->mutable_result());
     }
   }
 
@@ -1424,43 +1439,33 @@ Status TabletBootstrap::PlayNoOpRequest(ReplicateMsg* replicate_msg,
const Commi
 }
 
 Status TabletBootstrap::PlayRowOperations(WriteTransactionState* tx_state,
-                                          const SchemaPB& schema_pb,
-                                          const RowOperationsPB& ops_pb,
-                                          const TxResultPB& result,
-                                          const vector<bool>& already_flushed)
{
+                                          const TxResultPB& orig_result,
+                                          TxResultPB* new_result) {
   Schema inserts_schema;
-  RETURN_NOT_OK_PREPEND(SchemaFromPB(schema_pb, &inserts_schema),
+  RETURN_NOT_OK_PREPEND(SchemaFromPB(tx_state->request()->schema(), &inserts_schema),
                         "Couldn't decode client schema");
 
   RETURN_NOT_OK_PREPEND(tablet_->DecodeWriteOperations(&inserts_schema, tx_state),
                         Substitute("Could not decode row operations: $0",
-                                   SecureShortDebugString(ops_pb)));
-  DCHECK_EQ(tx_state->row_ops().size(), already_flushed.size());
-
-  // Propagate the 'already_flushed' information into the decoded operations.
-  // This signals to ApplyOperations() below that it doesn't need to actually
-  // apply these ops again.
-  for (int i = 0; i < already_flushed.size(); i++) {
-    if (already_flushed[i]) {
-      tx_state->row_ops()[i]->SetAlreadyFlushed();
-    }
-  }
+                                   SecureDebugString(tx_state->request()->row_operations())));
 
   // Run AcquireRowLocks, Apply, etc!
   RETURN_NOT_OK_PREPEND(tablet_->AcquireRowLocks(tx_state),
                         "Failed to acquire row locks");
 
-  RETURN_NOT_OK(ApplyOperations(tx_state, result));
+  RETURN_NOT_OK(ApplyOperations(tx_state, orig_result, new_result));
 
   return Status::OK();
 }
 
 Status TabletBootstrap::ApplyOperations(WriteTransactionState* tx_state,
-                                        const TxResultPB& orig_result) {
+                                        const TxResultPB& orig_result,
+                                        TxResultPB* new_result) {
+  DCHECK_EQ(tx_state->row_ops().size(), orig_result.ops_size());
+  DCHECK_EQ(tx_state->row_ops().size(), new_result->ops_size());
   int32_t op_idx = 0;
   for (RowOp* op : tx_state->row_ops()) {
-    const OperationResultPB& orig_op_result = orig_result.ops(op_idx++);
-
+    int32_t curr_op_idx = op_idx++;
     // Increment the seen/ignored stats.
     switch (op->decoded_op.type) {
       case RowOperationsPB::INSERT:
@@ -1485,26 +1490,19 @@ Status TabletBootstrap::ApplyOperations(WriteTransactionState* tx_state,
         break;
     }
 
-    // If the op is already flushed, no need to replay it.
-    if (op->has_result()) {
-      DCHECK(op->result->flushed());
+    const OperationResultPB& new_op_result = new_result->ops(curr_op_idx);
+    // If the op is already flushed or had previously failed, no need to replay it.
+    // TODO(dralves) this back and forth is weird. We're first setting the flushed/failed
+    // status on the rewritten message's commit entry. Then we pass it here to
+    // set the status on the op, then we set it back on the commit entry with
+    // ReleaseTxResultPB(). This could be simplified if we build the RowOps on
+    // demand and just created DecodedRowOperation/RowOp for the replayed stuff.
+    if (new_op_result.skip_on_replay()) {
+      op->SetSkippedResult(new_op_result);
       continue;
     }
 
-    op->set_original_result_from_log(&orig_op_result);
-
-    // check if the operation failed in the original transaction
-    if (PREDICT_FALSE(orig_op_result.has_failed_status())) {
-      Status status = StatusFromPB(orig_op_result.failed_status());
-      if (VLOG_IS_ON(1)) {
-        VLOG_WITH_PREFIX(1) << "Skipping operation that originally resulted in error.
OpId: "
-                            << SecureDebugString(tx_state->op_id()) << " op
index: "
-                            << op_idx - 1 << " original error: "
-                            << status.ToString();
-      }
-      op->SetFailed(status);
-      continue;
-    }
+    op->set_original_result_from_log(&orig_result.ops(curr_op_idx));
 
     // Actually apply it.
     ProbeStats stats; // we don't use this, but tablet internals require non-NULL.
@@ -1527,10 +1525,16 @@ Status TabletBootstrap::ApplyOperations(WriteTransactionState* tx_state,
 }
 
 Status TabletBootstrap::FilterOperation(const OperationResultPB& op_result,
-                                        bool* already_flushed) {
+                                        OpAction* action) {
+
   // If the operation failed or was skipped, originally, no need to re-apply it.
-  if (op_result.has_failed_status() || op_result.flushed()) {
-    *already_flushed = true;
+  if (op_result.has_failed_status()) {
+    *action = SKIP_PREVIOUSLY_FAILED;
+    return Status::OK();
+  }
+
+  if (op_result.skip_on_replay()) {
+    *action = SKIP_PREVIOUSLY_FLUSHED;
     return Status::OK();
   }
 
@@ -1554,7 +1558,7 @@ Status TabletBootstrap::FilterOperation(const OperationResultPB&
op_result,
 
   if (num_active_stores == 0) {
     // The mutation was fully flushed.
-    *already_flushed = true;
+    *action = SKIP_PREVIOUSLY_FLUSHED;
     return Status::OK();
   }
 
@@ -1568,7 +1572,7 @@ Status TabletBootstrap::FilterOperation(const OperationResultPB&
op_result,
                               SecureShortDebugString(op_result));
   }
 
-  *already_flushed = false;
+  *action = NEEDS_REPLAY;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4df9f596/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 43f01a6..4aa6417 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -951,7 +951,7 @@ TEST_F(TabletServerTest, TestKUDU_1341) {
   ANFF(VerifyRows(schema_, { KeyValue(1, 12345) }));
 }
 
-TEST_F(TabletServerTest, DISABLED_TestExactlyOnceForErrorsAcrossRestart) {
+TEST_F(TabletServerTest, TestExactlyOnceForErrorsAcrossRestart) {
   WriteRequestPB req;
   WriteResponsePB resp;
   RpcController rpc;


Mime
View raw message