kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [1/2] kudu git commit: consensus: Improve contract for API to fetch last-logged op id
Date Tue, 22 Aug 2017 00:27:21 GMT
Repository: kudu
Updated Branches:
  refs/heads/master fa2757763 -> a7d589635


consensus: Improve contract for API to fetch last-logged op id

It's important that we differentiate between when we have a known
last-logged op and when we don't actually know what it is or whether we
have ever appended something to the local WAL. This applies both to the
TABLET_DATA_READY case, where this information is stored in the WAL, and
the TABLET_DATA_TOMBSTONED case, where this information is stored in the
superblock.

Cases where we are unable to determine the last-logged OpId from the WAL
when a replica is in TABLET_DATA_READY state:

* Early in the tablet replica lifecycle (before Raft is started).
* When a replica encounters an error during initialization.

Cases where we are unable to determine the last-logged OpId from the
TabletMetadata when a replica is in TABLET_DATA_TOMBSTONED state:

* If the replica was tombstoned while in a failed state.

Included in this patch are the following API improvements:

1. Delete Log::GetLatestEntryOpId(). Previously, this method would only
   return something other than MinimumOpId() if a log entry had been
   appended during the object's lifetime. It is abandoned in favor of
   RaftConsensus::GetLastOpId(RECEIVED_OPID) which delegates to
   PeerMessageQueue::GetLastOpIdInLog().
2. Merge PeerMessageQueue::Init() into the PeerMessageQueue constructor.
   This allows us to remove one lifecycle state and allows us to
   guarantee that, once the queue is constructed, we can always get a
   valid last-logged opid from it (see #1).
3. Make TabletMetadata::tombstone_last_logged_opid() return a
   boost::optional<OpId>. We need to clearly differentiate between when
   we know the last-logged opid and when we don't. We also consider
   MinimumOpId() to be equal to boost::none at superblock load time,
   since previous versions of Kudu may have written (0,0) into the
   TabletMetadata 'tombstone_last_logged_opid' field.

Change-Id: Ia4e4501a61cd40fdee0dc918b77675a0bc2515e7
Reviewed-on: http://gerrit.cloudera.org:8080/7717
Reviewed-by: Todd Lipcon <todd@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/dc65abba
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/dc65abba
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/dc65abba

Branch: refs/heads/master
Commit: dc65abbab208120288a8c26b1099f45c488b865e
Parents: fa27577
Author: Mike Percy <mpercy@apache.org>
Authored: Tue Aug 15 19:58:39 2017 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue Aug 22 00:26:52 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |  18 +--
 src/kudu/consensus/consensus_peers-test.cc      |  17 +--
 src/kudu/consensus/consensus_queue-test.cc      |  50 ++++-----
 src/kudu/consensus/consensus_queue.cc           |  41 +++----
 src/kudu/consensus/consensus_queue.h            |  17 ++-
 src/kudu/consensus/log.cc                       |  21 ----
 src/kudu/consensus/log.h                        |  12 --
 src/kudu/consensus/mt-log-test.cc               |   4 +-
 src/kudu/consensus/raft_consensus.cc            | 109 ++++++++++---------
 src/kudu/consensus/raft_consensus.h             |  14 +--
 .../consensus/raft_consensus_quorum-test.cc     |  11 +-
 src/kudu/tablet/tablet_metadata.cc              |  26 +++--
 src/kudu/tablet/tablet_metadata.h               |   7 +-
 src/kudu/tablet/tablet_replica-test.cc          |  24 ++--
 src/kudu/tools/kudu-tool-test.cc                |  16 +--
 src/kudu/tserver/tablet_copy_client.cc          |  33 ++++--
 src/kudu/tserver/tablet_copy_source_session.cc  |   8 +-
 src/kudu/tserver/tablet_service.cc              |  13 ++-
 src/kudu/tserver/ts_tablet_manager.cc           |  47 ++++----
 19 files changed, 245 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index b554dcd..ac4408b 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -47,14 +47,16 @@
 #define TOKENPASTE2(x, y) TOKENPASTE(x, y)
 
 #define ASSERT_OPID_EQ(left, right) \
-  OpId TOKENPASTE2(_left, __LINE__) = (left); \
-  OpId TOKENPASTE2(_right, __LINE__) = (right); \
-  if (!consensus::OpIdEquals(TOKENPASTE2(_left, __LINE__), TOKENPASTE2(_right,__LINE__))) { \
-    FAIL() << "Expected: " \
-           << pb_util::SecureShortDebugString(TOKENPASTE2(_right,__LINE__)) << "\n" \
-           << "Value: " \
-           << pb_util::SecureShortDebugString(TOKENPASTE2(_left,__LINE__)) << "\n"; \
-  }
+  do { \
+    const OpId& TOKENPASTE2(_left, __LINE__) = (left); \
+    const OpId& TOKENPASTE2(_right, __LINE__) = (right); \
+    if (!consensus::OpIdEquals(TOKENPASTE2(_left, __LINE__), TOKENPASTE2(_right, __LINE__))) { \
+      FAIL() << "Expected: " \
+            << pb_util::SecureShortDebugString(TOKENPASTE2(_left, __LINE__)) << "\n" \
+            << "Value: " \
+            << pb_util::SecureShortDebugString(TOKENPASTE2(_right, __LINE__)) << "\n"; \
+    } \
+  } while (false)
 
 namespace kudu {
 namespace consensus {

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index aac7346..8641e75 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -103,7 +103,9 @@ class ConsensusPeersTest : public KuduTest {
         time_manager,
         FakeRaftPeerPB(kLeaderUuid),
         kTabletId,
-        raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
+        raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+        MinimumOpId(),
+        MinimumOpId()));
 
     MessengerBuilder bld("test");
     ASSERT_OK(bld.Build(&messenger_));
@@ -133,13 +135,6 @@ class ConsensusPeersTest : public KuduTest {
     return proxy_ptr;
   }
 
-  void CheckLastLogEntry(int term, int index) {
-    OpId id;
-    log_->GetLatestEntryOpId(&id);
-    ASSERT_EQ(id.term(), term);
-    ASSERT_EQ(id.index(), index);
-  }
-
   void CheckLastRemoteEntry(DelayablePeerProxy<NoOpTestPeerProxy>* proxy, int term, int index) {
     OpId id;
     id.CopyFrom(proxy->proxy()->last_received());
@@ -179,7 +174,6 @@ class ConsensusPeersTest : public KuduTest {
 TEST_F(ConsensusPeersTest, TestRemotePeer) {
   // We use a majority size of 2 since we make one fake remote peer
   // in addition to our real local log.
-  message_queue_->Init(MinimumOpId(), MinimumOpId());
   message_queue_->SetLeaderMode(kMinimumOpIdIndex,
                                 kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
@@ -203,7 +197,6 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
 }
 
 TEST_F(ConsensusPeersTest, TestRemotePeers) {
-  message_queue_->Init(MinimumOpId(), MinimumOpId());
   message_queue_->SetLeaderMode(kMinimumOpIdIndex,
                                 kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
@@ -233,7 +226,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
   // of remote-peer1 and the local log.
   WaitForCommitIndex(first.index());
 
-  CheckLastLogEntry(first.term(), first.index());
+  ASSERT_OPID_EQ(first, message_queue_->GetLastOpIdInLog());
   CheckLastRemoteEntry(remote_peer1_proxy, first.term(), first.index());
 
   remote_peer2_proxy->Respond(TestPeerProxy::kUpdate);
@@ -262,7 +255,6 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
 // Regression test for KUDU-699: even if a peer isn't making progress,
 // and thus always has data pending, we should be able to close the peer.
 TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
-  message_queue_->Init(MinimumOpId(), MinimumOpId());
   message_queue_->SetLeaderMode(kMinimumOpIdIndex,
                                 kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
@@ -301,7 +293,6 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
 }
 
 TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
-  message_queue_->Init(MinimumOpId(), MinimumOpId());
   message_queue_->SetLeaderMode(kMinimumOpIdIndex,
                                 kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 8d6f30b..8722dde 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -96,19 +96,22 @@ class ConsensusQueueTest : public KuduTest {
     ASSERT_OK(clock_->Init());
 
     ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
-    CloseAndReopenQueue();
+    CloseAndReopenQueue(MinimumOpId(), MinimumOpId());
   }
 
-  void CloseAndReopenQueue() {
+  void CloseAndReopenQueue(const OpId& replicated_opid, const OpId& committed_opid) {
     scoped_refptr<clock::Clock> clock(new clock::HybridClock());
     ASSERT_OK(clock->Init());
     scoped_refptr<TimeManager> time_manager(new TimeManager(clock, Timestamp::kMin));
-    queue_.reset(new PeerMessageQueue(metric_entity_,
-                                      log_.get(),
-                                      time_manager,
-                                      FakeRaftPeerPB(kLeaderUuid),
-                                      kTestTablet,
-                                      raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
+    queue_.reset(new PeerMessageQueue(
+        metric_entity_,
+        log_.get(),
+        time_manager,
+        FakeRaftPeerPB(kLeaderUuid),
+        kTestTablet,
+        raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+        replicated_opid,
+        committed_opid));
   }
 
   virtual void TearDown() OVERRIDE {
@@ -229,7 +232,6 @@ class ConsensusQueueTest : public KuduTest {
 // with several messages and then starts to track a peer whose watermark
 // falls in the middle of the current messages in the queue.
 TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
-  queue_->Init(MinimumOpId(), MinimumOpId());
   queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
 
@@ -269,7 +271,6 @@ TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
 // Tests that the peers gets the messages pages, with the size of a page
 // being 'consensus_max_batch_size_bytes'
 TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
-  queue_->Init(MinimumOpId(), MinimumOpId());
   queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
 
   // helper to estimate request size so that we can set the max batch size appropriately
@@ -338,7 +339,6 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
 }
 
 TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
-  queue_->Init(MinimumOpId(), MinimumOpId());
   queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
 
@@ -405,7 +405,6 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
 }
 
 TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
-  queue_->Init(MinimumOpId(), MinimumOpId());
   queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(5));
   // Track 4 additional peers (in addition to the local peer)
   queue_->TrackPeer("peer-1");
@@ -488,7 +487,8 @@ TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
 
   OpId opid = MakeOpId(1, 1);
 
-  for (int i = 1; i <= 100; i++) {
+  const int kOpsToAppend = 100;
+  for (int i = 1; i <= kOpsToAppend; i++) {
     ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid));
     // Roll the log every 10 ops
     if (i % 10 == 0) {
@@ -497,16 +497,15 @@ TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
   }
   ASSERT_OK(log_->WaitUntilAllFlushed());
 
-  OpId leader_last_op;
-  log_->GetLatestEntryOpId(&leader_last_op);
+  ASSERT_OPID_EQ(MakeOpId(1, kOpsToAppend + 1), opid);
+  OpId last_logged_opid = MakeOpId(opid.term(), opid.index() - 1);
 
   // Now reset the queue so that we can pass a new committed index,
   // the last operation in the log.
-  CloseAndReopenQueue();
+  CloseAndReopenQueue(last_logged_opid, last_logged_opid);
 
-  queue_->Init(leader_last_op, leader_last_op);
-  queue_->SetLeaderMode(leader_last_op.index(),
-                        leader_last_op.term(),
+  queue_->SetLeaderMode(last_logged_opid.index(),
+                        last_logged_opid.term(),
                         BuildRaftConfigPBForTests(3));
 
   ConsensusRequestPB request;
@@ -570,14 +569,12 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
     }
   }
 
+  OpId last_in_log = MakeOpId(opid.term(), opid.index() - 1);
+  int64_t committed_index = 15;
 
   // Now reset the queue so that we can pass a new committed index (15).
-  CloseAndReopenQueue();
+  CloseAndReopenQueue(last_in_log, MakeOpId(2, committed_index));
 
-  OpId last_in_log;
-  log_->GetLatestEntryOpId(&last_in_log);
-  int64_t committed_index = 15;
-  queue_->Init(last_in_log, MakeOpId(2, committed_index));
   queue_->SetLeaderMode(committed_index,
                         last_in_log.term(),
                         BuildRaftConfigPBForTests(3));
@@ -657,7 +654,6 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
 // operations, which would cause a check failure on the write immediately
 // following the overwriting write.
 TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
-  queue_->Init(MinimumOpId(), MinimumOpId());
   queue_->SetNonLeaderMode();
   // Append a bunch of messages and update as if they were also appeneded to the leader.
   queue_->UpdateLastIndexAppendedToLeader(10);
@@ -725,7 +721,7 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   FLAGS_consensus_max_batch_size_bytes = 1024 * 10;
 
   const int kInitialCommittedIndex = 30;
-  queue_->Init(MakeOpId(72, 30), MakeOpId(82, 30));
+  CloseAndReopenQueue(MakeOpId(72, 30), MakeOpId(82, 30));
   queue_->SetLeaderMode(kInitialCommittedIndex, 76, BuildRaftConfigPBForTests(3));
 
   ConsensusRequestPB request;
@@ -819,7 +815,6 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
 
 // Test that Tablet Copy is triggered when a "tablet not found" error occurs.
 TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
-  queue_->Init(MinimumOpId(), MinimumOpId());
   queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
 
@@ -859,7 +854,6 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
 }
 
 TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
-  queue_->Init(MinimumOpId(), MinimumOpId());
   queue_->SetNonLeaderMode();
 
   // Emulate a follower sending a request to replicate 10 messages.

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index b22b539..4445137 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -114,40 +114,38 @@ PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_ent
 }
 #undef INSTANTIATE_METRIC
 
-PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
-                                   const scoped_refptr<log::Log>& log,
+PeerMessageQueue::PeerMessageQueue(scoped_refptr<MetricEntity> metric_entity,
+                                   scoped_refptr<log::Log> log,
                                    scoped_refptr<TimeManager> time_manager,
-                                   const RaftPeerPB& local_peer_pb,
-                                   const string& tablet_id,
-                                   unique_ptr<ThreadPoolToken> raft_pool_observers_token)
+                                   RaftPeerPB local_peer_pb,
+                                   string tablet_id,
+                                   unique_ptr<ThreadPoolToken> raft_pool_observers_token,
+                                   OpId last_locally_replicated,
+                                   const OpId& last_locally_committed)
     : raft_pool_observers_token_(std::move(raft_pool_observers_token)),
-      local_peer_pb_(local_peer_pb),
-      tablet_id_(tablet_id),
-      log_cache_(metric_entity, log, local_peer_pb.permanent_uuid(), tablet_id),
+      local_peer_pb_(std::move(local_peer_pb)),
+      tablet_id_(std::move(tablet_id)),
+      log_cache_(metric_entity, log, local_peer_pb_.permanent_uuid(), tablet_id_),
       metrics_(metric_entity),
       time_manager_(std::move(time_manager)) {
   DCHECK(local_peer_pb_.has_permanent_uuid());
   DCHECK(local_peer_pb_.has_last_known_addr());
+  DCHECK(last_locally_replicated.IsInitialized());
+  DCHECK(last_locally_committed.IsInitialized());
   queue_state_.current_term = 0;
   queue_state_.first_index_in_current_term = boost::none;
   queue_state_.committed_index = 0;
   queue_state_.all_replicated_index = 0;
   queue_state_.majority_replicated_index = 0;
   queue_state_.last_idx_appended_to_leader = 0;
-  queue_state_.state = kQueueConstructed;
   queue_state_.mode = NON_LEADER;
   queue_state_.majority_size_ = -1;
-}
-
-void PeerMessageQueue::Init(const OpId& last_locally_replicated,
-                            const OpId& last_locally_committed) {
-  std::lock_guard<simple_spinlock> lock(queue_lock_);
-  CHECK_EQ(queue_state_.state, kQueueConstructed);
-  log_cache_.Init(last_locally_replicated);
-  queue_state_.last_appended = last_locally_replicated;
-  queue_state_.state = kQueueOpen;
+  queue_state_.last_appended = std::move(last_locally_replicated);
   queue_state_.committed_index = last_locally_committed.index();
-  TrackPeerUnlocked(local_peer_pb_.permanent_uuid());
+  queue_state_.state = kQueueOpen;
+  // TODO(mpercy): Merge LogCache::Init() with its constructor.
+  log_cache_.Init(queue_state_.last_appended);
+  TrackPeer(local_peer_pb_.permanent_uuid());
 }
 
 void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
@@ -322,6 +320,7 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
                                                  last_id,
                                                  log_append_callback)));
   lock.lock();
+  DCHECK(last_id.IsInitialized());
   queue_state_.last_appended = last_id;
   UpdateMetricsUnlocked();
 
@@ -337,6 +336,7 @@ void PeerMessageQueue::TruncateOpsAfter(int64_t index) {
                               index));
   {
     std::unique_lock<simple_spinlock> lock(queue_lock_);
+    DCHECK(op.IsInitialized());
     queue_state_.last_appended = op;
   }
   log_cache_.TruncateOpsAfter(op.index());
@@ -344,11 +344,13 @@ void PeerMessageQueue::TruncateOpsAfter(int64_t index) {
 
 OpId PeerMessageQueue::GetLastOpIdInLog() const {
   std::unique_lock<simple_spinlock> lock(queue_lock_);
+  DCHECK(queue_state_.last_appended.IsInitialized());
   return queue_state_.last_appended;
 }
 
 OpId PeerMessageQueue::GetNextOpId() const {
   std::unique_lock<simple_spinlock> lock(queue_lock_);
+  DCHECK(queue_state_.last_appended.IsInitialized());
   return MakeOpId(queue_state_.current_term,
                   queue_state_.last_appended.index() + 1);
 }
@@ -632,7 +634,6 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
   Mode mode_copy;
   {
     std::lock_guard<simple_spinlock> scoped_lock(queue_lock_);
-    DCHECK_NE(kQueueConstructed, queue_state_.state);
 
     TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
     if (PREDICT_FALSE(queue_state_.state != kQueueOpen || peer == nullptr)) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index dfb3712..976dd1d 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -139,16 +139,14 @@ class PeerMessageQueue {
     int64_t last_seen_term_;
   };
 
-  PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
-                   const scoped_refptr<log::Log>& log,
+  PeerMessageQueue(scoped_refptr<MetricEntity> metric_entity,
+                   scoped_refptr<log::Log> log,
                    scoped_refptr<TimeManager> time_manager,
-                   const RaftPeerPB& local_peer_pb,
-                   const std::string& tablet_id,
-                   std::unique_ptr<ThreadPoolToken> raft_pool_observers_token);
-
-  // Initialize the queue.
-  void Init(const OpId& last_locally_replicated,
-            const OpId& last_locally_committed);
+                   RaftPeerPB local_peer_pb,
+                   std::string tablet_id,
+                   std::unique_ptr<ThreadPoolToken> raft_pool_observers_token,
+                   OpId last_locally_replicated,
+                   const OpId& last_locally_committed);
 
   // Changes the queue to leader mode, meaning it tracks majority replicated
   // operations and notifies observers when those change.
@@ -324,7 +322,6 @@ class PeerMessageQueue {
   };
 
   enum State {
-    kQueueConstructed,
     kQueueOpen,
     kQueueClosed
   };

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 1df37d9..7100630 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -637,18 +637,6 @@ Status Log::DoAppend(LogEntryBatch* entry_batch) {
     return Status::OK();
   }
 
-  // We keep track of the last-written OpId here.
-  // This is needed to initialize Consensus on startup.
-  if (entry_batch->type_ == REPLICATE) {
-    // TODO Probably remove the code below as it looks suspicious: TabletReplica uses this
-    // as 'safe' anchor as it believes it in the log, when it actually isn't, i.e. this
-    // is not the last durable operation. Either move this to TabletReplica (since we're
-    // using in flights anyway no need to scan for ids here) or actually delay doing this
-    // until fsync() has been done. See KUDU-527.
-    std::lock_guard<rw_spinlock> write_lock(last_entry_op_id_lock_);
-    last_entry_op_id_.CopyFrom(entry_batch->MaxReplicateOpId());
-  }
-
   // if the size of this entry overflows the current segment, get a new one
   if (allocation_state() == kAllocationNotStarted) {
     if ((active_segment_->Size() + entry_batch_bytes + 4) > max_segment_size_) {
@@ -828,15 +816,6 @@ Status Log::WaitUntilAllFlushed() {
   return s.Wait();
 }
 
-void Log::GetLatestEntryOpId(consensus::OpId* op_id) const {
-  shared_lock<rw_spinlock> l(last_entry_op_id_lock_);
-  if (last_entry_op_id_.IsInitialized()) {
-    DCHECK_NOTNULL(op_id)->CopyFrom(last_entry_op_id_);
-  } else {
-    *op_id = consensus::MinimumOpId();
-  }
-}
-
 Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
   CHECK_GE(retention_indexes.for_durability, 0);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 9f4d77c..c71dbbd 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -169,10 +169,6 @@ class Log : public RefCountedThreadSafe<Log> {
     return tablet_id_;
   }
 
-  // Gets the last-used OpId written to the log.
-  // If no entry has ever been written to the log, returns (0, 0)
-  void GetLatestEntryOpId(consensus::OpId* op_id) const;
-
   // Runs the garbage collector on the set of previous segments. Segments that
   // only refer to in-mem state that has been flushed are candidates for
   // garbage collection.
@@ -369,14 +365,6 @@ class Log : public RefCountedThreadSafe<Log> {
   // of the operation in the log.
   scoped_refptr<LogIndex> log_index_;
 
-  // Lock to protect last_entry_op_id_, which is constantly written but
-  // read occasionally by things like consensus and log GC.
-  mutable rw_spinlock last_entry_op_id_lock_;
-
-  // The last known OpId for a REPLICATE message appended to this log
-  // (any segment). NOTE: this op is not necessarily durable.
-  consensus::OpId last_entry_op_id_;
-
   // A footer being prepared for the current segment.
   // When the segment is closed, it will be written.
   LogSegmentFooterPB footer_builder_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/mt-log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc
index b5502e6..6f4ea43 100644
--- a/src/kudu/consensus/mt-log-test.cc
+++ b/src/kudu/consensus/mt-log-test.cc
@@ -184,11 +184,9 @@ class MultiThreadedLogTest : public LogTestBase {
     for (int i = 0; i < FLAGS_num_reader_threads; i++) {
       reader_threads.emplace_back([&]() {
           std::map<int64_t, int64_t> map;
-          OpId opid;
           while (!stop_reader) {
-            log_->GetLatestEntryOpId(&opid);
             log_->GetReplaySizeMap(&map);
-            IgnoreResult(log_->GetGCableDataSize(RetentionIndexes(FLAGS_num_batches_per_thread)));
+            log_->GetGCableDataSize(RetentionIndexes(FLAGS_num_batches_per_thread));
           }
         });
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 2c900a5..c082b89 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -149,6 +149,7 @@ METRIC_DEFINE_gauge_int64(tablet, raft_term,
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tserver::TabletServerErrorPB;
 using std::string;
+using std::unique_ptr;
 using strings::Substitute;
 
 namespace  {
@@ -220,17 +221,24 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                             ReplicaTransactionFactory* txn_factory,
                             scoped_refptr<MetricEntity> metric_entity,
                             Callback<void(const std::string& reason)> mark_dirty_clbk) {
+  DCHECK(metric_entity);
+
   peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
   log_ = DCHECK_NOTNULL(std::move(log));
   time_manager_ = DCHECK_NOTNULL(std::move(time_manager));
   txn_factory_ = DCHECK_NOTNULL(txn_factory);
   mark_dirty_clbk_ = std::move(mark_dirty_clbk);
-  DCHECK(metric_entity);
 
   term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term, cmeta_->current_term());
   follower_memory_pressure_rejections_ =
       metric_entity->FindOrCreateCounter(&METRIC_follower_memory_pressure_rejections);
 
+  // A single Raft thread pool token is shared between RaftConsensus and
+  // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
+  // raw pointer to the token, to emphasize that RaftConsensus is responsible
+  // for destroying the token.
+  raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
+
   // The message queue that keeps track of which operations need to be replicated
   // where.
   //
@@ -240,30 +248,27 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
   //
   // TODO(adar): the token is SERIAL to match the previous single-thread
   // observer pool behavior, but CONCURRENT may be safe here.
-  queue_.reset(new PeerMessageQueue(std::move(metric_entity),
-                                    log_,
-                                    time_manager_,
-                                    local_peer_pb_,
-                                    options_.tablet_id,
-                                    raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
-
-  // A single Raft thread pool token is shared between RaftConsensus and
-  // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
-  // raw pointer to the token, to emphasize that RaftConsensus is responsible
-  // for destroying the token.
-  raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
+  unique_ptr<PeerMessageQueue> queue(new PeerMessageQueue(
+      std::move(metric_entity),
+      log_,
+      time_manager_,
+      local_peer_pb_,
+      options_.tablet_id,
+      raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+      info.last_id,
+      info.last_committed_id));
 
   // A manager for the set of peers that actually send the operations both remotely
   // and to the local wal.
-  peer_manager_.reset(new PeerManager(options_.tablet_id,
-                                      peer_uuid(),
-                                      peer_proxy_factory_.get(),
-                                      queue_.get(),
-                                      raft_pool_token_.get(),
-                                      log_));
-
-  pending_.reset(new PendingRounds(Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid()),
-                                   time_manager_));
+  unique_ptr<PeerManager> peer_manager(new PeerManager(options_.tablet_id,
+                                                       peer_uuid(),
+                                                       peer_proxy_factory_.get(),
+                                                       queue.get(),
+                                                       raft_pool_token_.get(),
+                                                       log_));
+
+  unique_ptr<PendingRounds> pending(new PendingRounds(LogPrefixThreadSafe(), time_manager_));
+
   failure_monitor_.reset(new RandomizedFailureMonitor(GetRandomSeed32(),
                                                       GetFailureMonitorCheckMeanMs(),
                                                       GetFailureMonitorCheckStddevMs()));
@@ -281,7 +286,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
   // That happens separately via the helper functions
   // EnsureFailureDetector(Enabled/Disabled)Unlocked();
   RETURN_NOT_OK(failure_monitor_->MonitorFailureDetector(options_.tablet_id,
-                                                        failure_detector_));
+                                                         failure_detector_));
 
   {
     ThreadRestrictions::AssertWaitAllowed();
@@ -289,7 +294,12 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
     CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): "
                                    << State_Name(state_);
 
+    queue_ = std::move(queue);
+    peer_manager_ = std::move(peer_manager);
+    pending_ = std::move(pending);
+
     ClearLeaderUnlocked();
+    RETURN_NOT_OK(EnsureFailureDetectorEnabled());
 
     // Our last persisted term can be higher than the last persisted operation
     // (i.e. if we called an election) but reverse should never happen.
@@ -302,8 +312,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
           GetCurrentTermUnlocked()));
     }
 
-    state_ = kRunning;
-
+    // Append any uncommitted replicate messages found during log replay to the queue.
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
                                    << info.orphaned_replicates.size()
                                    << " pending transactions. Active config: "
@@ -313,18 +322,10 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
       RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
     }
 
+    // Set the initial committed opid for the PendingRounds only after
+    // appending any uncommitted replicate messages to the queue.
     pending_->SetInitialCommittedOpId(info.last_committed_id);
 
-    queue_->Init(info.last_id, info.last_committed_id);
-  }
-
-  {
-    ThreadRestrictions::AssertWaitAllowed();
-    LockGuard l(lock_);
-    RETURN_NOT_OK(CheckRunningUnlocked());
-
-    RETURN_NOT_OK(EnsureFailureDetectorEnabled());
-
     // If this is the first term expire the FD immediately so that we have a fast first
     // election, otherwise we just let the timer expire normally.
     if (GetCurrentTermUnlocked() == 0) {
@@ -344,6 +345,8 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
 
     // Now assume "follower" duties.
     RETURN_NOT_OK(BecomeReplicaUnlocked());
+
+    state_ = kRunning;
   }
 
   if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
@@ -650,6 +653,7 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu
 
 Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round) {
   DCHECK(lock_.is_locked());
+  DCHECK(pending_);
 
   // If we are adding a pending config change, we need to propagate it to the
   // metadata.
@@ -1509,7 +1513,7 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
 
   // Candidate must have last-logged OpId at least as large as our own to get
   // our vote.
-  OpId local_last_logged_opid = GetLatestOpIdFromLog();
+  OpId local_last_logged_opid = queue_->GetLastOpIdInLog();
   bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(),
                                 local_last_logged_opid);
 
@@ -1826,12 +1830,6 @@ void RaftConsensus::Shutdown() {
   shutdown_.Store(true, kMemOrderRelease);
 }
 
-OpId RaftConsensus::GetLatestOpIdFromLog() {
-  OpId id;
-  log_->GetLatestEntryOpId(&id);
-  return id;
-}
-
 Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg) {
   DCHECK(lock_.is_locked());
   OperationType op_type = msg->get()->op_type();
@@ -2241,18 +2239,27 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
   }
 }
 
-Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
+boost::optional<OpId> RaftConsensus::GetLastOpId(OpIdType type) {
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
-  if (type == RECEIVED_OPID) {
-    *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
-  } else if (type == COMMITTED_OPID) {
-    id->set_term(pending_->GetTermWithLastCommittedOp());
-    id->set_index(pending_->GetCommittedIndex());
-  } else {
-    return Status::InvalidArgument("Unsupported OpIdType", OpIdType_Name(type));
+  return GetLastOpIdUnlocked(type);
+}
+
+boost::optional<OpId> RaftConsensus::GetLastOpIdUnlocked(OpIdType type) {
+  // Return early if this method is called on an instance of RaftConsensus that
+  // has not yet been started, failed during Init(), or failed during Start().
+  if (!queue_ || !pending_) return boost::none;
+
+  switch (type) {
+    case RECEIVED_OPID:
+      return queue_->GetLastOpIdInLog();
+    case COMMITTED_OPID:
+      return MakeOpId(pending_->GetTermWithLastCommittedOp(),
+                      pending_->GetCommittedIndex());
+    default:
+      LOG(DFATAL) << LogPrefixUnlocked() << "Invalid OpIdType " << type;
+      return boost::none;
   }
-  return Status::OK();
 }
 
 log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 4ecb25e..5d761e0 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -262,8 +262,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   // Returns the last OpId (either received or committed, depending on the
   // 'type' argument) that the Consensus implementation knows about.
-  // Primarily used for testing purposes.
-  Status GetLastOpId(OpIdType type, OpId* id);
+  // Returns boost::none if RaftConsensus was not properly initialized.
+  boost::optional<OpId> GetLastOpId(OpIdType type);
 
   // Returns the current Raft role of this instance.
   RaftPeerPB::Role role() const;
@@ -454,9 +454,6 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // and also truncate the LogCache accordingly.
   void TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index);
 
-  // Returns the most recent OpId written to the Log.
-  OpId GetLatestOpIdFromLog();
-
   // Begin a replica transaction. If the type of message in 'msg' is not a type
   // that uses transactions, delegates to StartConsensusOnlyRoundUnlocked().
   Status StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg);
@@ -675,6 +672,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   const ConsensusOptions& GetOptions() const;
 
+  // See GetLastOpId().
+  boost::optional<OpId> GetLastOpIdUnlocked(OpIdType type);
+
   std::string LogPrefix() const;
   std::string LogPrefixUnlocked() const;
 
@@ -724,10 +724,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // this factory to start it.
   ReplicaTransactionFactory* txn_factory_;
 
-  gscoped_ptr<PeerManager> peer_manager_;
+  std::unique_ptr<PeerManager> peer_manager_;
 
   // The queue of messages that must be sent to peers.
-  gscoped_ptr<PeerMessageQueue> queue_;
+  std::unique_ptr<PeerMessageQueue> queue_;
 
   // The currently pending rounds that have not yet been committed by
   // consensus. Protected by 'lock_'.

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 0790993..5439cd8 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -339,11 +339,14 @@ class RaftConsensusQuorumTest : public KuduTest {
 
     int backoff_exp = 0;
     const int kMaxBackoffExp = 8;
-    OpId committed;
+    OpId committed = MinimumOpId();
     while (true) {
-      if (peer->GetLastOpId(COMMITTED_OPID, &committed).ok() &&
-          committed.index() >= to_wait_for) {
-        return;
+      boost::optional<OpId> opt_committed = peer->GetLastOpId(COMMITTED_OPID);
+      if (opt_committed) {
+        committed = *opt_committed;
+        if (committed.index() >= to_wait_for) {
+          return;
+        }
       }
       if (MonoTime::Now() > (start + timeout)) {
         break;

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tablet/tablet_metadata.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 93502e8..4d9a197 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -183,6 +183,7 @@ vector<BlockId> TabletMetadata::CollectBlockIds() {
 
 Status TabletMetadata::DeleteTabletData(TabletDataState delete_type,
                                         const boost::optional<OpId>& last_logged_opid) {
+  DCHECK(!last_logged_opid || last_logged_opid->IsInitialized());
   CHECK(delete_type == TABLET_DATA_DELETED ||
         delete_type == TABLET_DATA_TOMBSTONED ||
         delete_type == TABLET_DATA_COPYING)
@@ -202,9 +203,7 @@ Status TabletMetadata::DeleteTabletData(TabletDataState delete_type,
     }
     rowsets_.clear();
     tablet_data_state_ = delete_type;
-    if (last_logged_opid) {
-      tombstone_last_logged_opid_ = *last_logged_opid;
-    }
+    tombstone_last_logged_opid_ = last_logged_opid;
   }
 
   // Keep a copy of the old data dir group in case of flush failure.
@@ -278,7 +277,6 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
       table_name_(std::move(table_name)),
       partition_schema_(std::move(partition_schema)),
       tablet_data_state_(tablet_data_state),
-      tombstone_last_logged_opid_(MinimumOpId()),
       num_flush_pins_(0),
       needs_flush_(false),
       pre_flush_callback_(Bind(DoNothingStatusClosure)) {
@@ -297,7 +295,6 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id)
       fs_manager_(fs_manager),
       next_rowset_idx_(0),
       schema_(nullptr),
-      tombstone_last_logged_opid_(MinimumOpId()),
       num_flush_pins_(0),
       needs_flush_(false),
       pre_flush_callback_(Bind(DoNothingStatusClosure)) {}
@@ -403,10 +400,15 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock)
           fs::DataDirManager::DirDistributionMode::ACROSS_ALL_DIRS));
     }
 
-    if (superblock.has_tombstone_last_logged_opid()) {
+    // Note: Previous versions of Kudu used MinimumOpId() as a "null" value on
+    // disk for the last-logged opid, so we special-case it at load time and
+    // consider it equal to "not present".
+    if (superblock.has_tombstone_last_logged_opid() &&
+        superblock.tombstone_last_logged_opid().IsInitialized() &&
+        !OpIdEquals(MinimumOpId(), superblock.tombstone_last_logged_opid())) {
       tombstone_last_logged_opid_ = superblock.tombstone_last_logged_opid();
     } else {
-      tombstone_last_logged_opid_ = MinimumOpId();
+      tombstone_last_logged_opid_ = boost::none;
     }
   }
 
@@ -587,6 +589,11 @@ Status TabletMetadata::ReplaceSuperBlockUnlocked(const TabletSuperBlockPB &pb) {
   return Status::OK();
 }
 
+boost::optional<consensus::OpId> TabletMetadata::tombstone_last_logged_opid() const {
+  std::lock_guard<LockType> l(data_lock_);
+  return tombstone_last_logged_opid_;
+}
+
 Status TabletMetadata::ReadSuperBlockFromDisk(TabletSuperBlockPB* superblock) const {
   string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
   RETURN_NOT_OK_PREPEND(
@@ -623,8 +630,9 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
                         "Couldn't serialize schema into superblock");
 
   pb.set_tablet_data_state(tablet_data_state_);
-  if (!OpIdEquals(tombstone_last_logged_opid_, MinimumOpId())) {
-    *pb.mutable_tombstone_last_logged_opid() = tombstone_last_logged_opid_;
+  if (tombstone_last_logged_opid_ &&
+      !OpIdEquals(MinimumOpId(), *tombstone_last_logged_opid_)) {
+    *pb.mutable_tombstone_last_logged_opid() = *tombstone_last_logged_opid_;
   }
 
   for (const BlockId& block_id : orphaned_blocks_) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tablet/tablet_metadata.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index afac374..ca524b7 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -224,9 +224,10 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
 
   void SetLastDurableMrsIdForTests(int64_t mrs_id) { last_durable_mrs_id_ = mrs_id; }
 
-  void SetPreFlushCallback(StatusClosure callback) { pre_flush_callback_ = callback; }
+  void SetPreFlushCallback(StatusClosure callback) { pre_flush_callback_ = std::move(callback); }
 
-  consensus::OpId tombstone_last_logged_opid() const { return tombstone_last_logged_opid_; }
+  // Return the last-logged opid of a tombstoned tablet, if known.
+  boost::optional<consensus::OpId> tombstone_last_logged_opid() const;
 
   // Loads the currently-flushed superblock from disk into the given protobuf.
   Status ReadSuperBlockFromDisk(TabletSuperBlockPB* superblock) const;
@@ -353,7 +354,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
 
   // Record of the last opid logged by the tablet before it was last
   // tombstoned. Has no meaning for non-tombstoned tablets.
-  consensus::OpId tombstone_last_logged_opid_;
+  boost::optional<consensus::OpId> tombstone_last_logged_opid_;
 
   // If this counter is > 0 then Flush() will not write any data to
   // disk.

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index 7ebb64e..1f7de80 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -85,7 +85,9 @@ using consensus::ConsensusBootstrapInfo;
 using consensus::ConsensusMetadata;
 using consensus::ConsensusMetadataManager;
 using consensus::OpId;
+using consensus::RECEIVED_OPID;
 using consensus::RaftConfigPB;
+using consensus::RaftConsensus;
 using consensus::RaftPeerPB;
 using log::Log;
 using log::LogOptions;
@@ -276,11 +278,11 @@ class TabletReplicaTest : public KuduTabletTest {
   // Assert that the Log GC() anchor is earlier than the latest OpId in the Log.
   void AssertLogAnchorEarlierThanLogLatest() {
     log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
-    OpId last_log_opid;
-    tablet_replica_->log_->GetLatestEntryOpId(&last_log_opid);
-    CHECK_LT(retention.for_durability, last_log_opid.index())
+    boost::optional<OpId> last_log_opid = tablet_replica_->consensus()->GetLastOpId(RECEIVED_OPID);
+    ASSERT_NE(boost::none, last_log_opid);
+    ASSERT_LT(retention.for_durability, last_log_opid->index())
       << "Expected valid log anchor, got earliest opid: " << retention.for_durability
-      << " (expected any value earlier than last log id: " << SecureShortDebugString(last_log_opid)
+      << " (expected any value earlier than last log id: " << SecureShortDebugString(*last_log_opid)
       << ")";
   }
 
@@ -343,7 +345,7 @@ TEST_F(TabletReplicaTest, TestMRSAnchorPreventsLogGC) {
   ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
   ASSERT_EQ(4, segments.size());
 
-  AssertLogAnchorEarlierThanLogLatest();
+  NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
   ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
 
   // Ensure nothing gets deleted.
@@ -371,6 +373,7 @@ TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
   ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
 
   Log* log = tablet_replica_->log_.get();
+  shared_ptr<RaftConsensus> consensus = tablet_replica_->shared_consensus();
   int32_t num_gced;
 
   AssertNoLogAnchors();
@@ -394,10 +397,9 @@ TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
   ASSERT_EQ(2, segments.size());
   AssertNoLogAnchors();
 
-  OpId id;
-  log->GetLatestEntryOpId(&id);
-  LOG(INFO) << "Before: " << SecureShortDebugString(id);
-
+  boost::optional<OpId> id = consensus->GetLastOpId(consensus::RECEIVED_OPID);
+  ASSERT_NE(boost::none, id);
+  LOG(INFO) << "Before: " << *id;
 
   // We currently have no anchors and the last operation in the log is 0.3
   // Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking
@@ -409,7 +411,7 @@ TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
 
   // Execute a mutation.
   ASSERT_OK(ExecuteDeletesAndRollLogs(2));
-  AssertLogAnchorEarlierThanLogLatest();
+  NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
   ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
   ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
   ASSERT_EQ(4, segments.size());
@@ -529,7 +531,7 @@ TEST_F(TabletReplicaTest, TestActiveTransactionPreventsLogGC) {
   ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
   ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
 
-  AssertLogAnchorEarlierThanLogLatest();
+  NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
 
   // Try to GC(), nothing should be deleted due to the in-flight transaction.
   retention = tablet_replica_->GetRetentionIndexes();

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 661a5ce..0051f91 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -123,6 +123,7 @@ using cfile::StringDataGenerator;
 using cfile::WriterOptions;
 using client::sp::shared_ptr;
 using consensus::OpId;
+using consensus::RECEIVED_OPID;
 using consensus::ReplicateRefPtr;
 using consensus::ReplicateMsg;
 using fs::FsReport;
@@ -1552,15 +1553,14 @@ TEST_F(ToolTest, TestLocalReplicaTombstoneDelete) {
   // so that we can compare the size of the data on disk before and
   // after the deletion of local_replica to verify that the size-on-disk
   // is reduced after the tool operation.
-  OpId last_logged_opid;
-  last_logged_opid.Clear();
+  boost::optional<OpId> last_logged_opid;
   string tablet_id;
   {
     vector<scoped_refptr<TabletReplica>> tablet_replicas;
     ts->server()->tablet_manager()->GetTabletReplicas(&tablet_replicas);
     ASSERT_EQ(1, tablet_replicas.size());
     tablet_id = tablet_replicas[0]->tablet_id();
-    tablet_replicas[0]->log()->GetLatestEntryOpId(&last_logged_opid);
+    last_logged_opid = tablet_replicas[0]->shared_consensus()->GetLastOpId(RECEIVED_OPID);
     Tablet* tablet = tablet_replicas[0]->tablet();
     ASSERT_OK(tablet->Flush());
   }
@@ -1594,10 +1594,12 @@ TEST_F(ToolTest, TestLocalReplicaTombstoneDelete) {
     ASSERT_EQ(tablet_id, tablet_replicas[0]->tablet_id());
     ASSERT_EQ(TabletDataState::TABLET_DATA_TOMBSTONED,
               tablet_replicas[0]->tablet_metadata()->tablet_data_state());
-    OpId tombstoned_opid = tablet_replicas[0]->tablet_metadata()->tombstone_last_logged_opid();
-    ASSERT_TRUE(tombstoned_opid.IsInitialized());
-    ASSERT_EQ(last_logged_opid.term(), tombstoned_opid.term());
-    ASSERT_EQ(last_logged_opid.index(), tombstoned_opid.index());
+    boost::optional<OpId> tombstoned_opid =
+        tablet_replicas[0]->tablet_metadata()->tombstone_last_logged_opid();
+    ASSERT_NE(boost::none, tombstoned_opid);
+    ASSERT_NE(boost::none, last_logged_opid);
+    ASSERT_EQ(last_logged_opid->term(), tombstoned_opid->term());
+    ASSERT_EQ(last_logged_opid->index(), tombstoned_opid->index());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 672840e..a75750c 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -89,6 +89,7 @@ namespace tserver {
 
 using consensus::ConsensusMetadata;
 using consensus::ConsensusMetadataManager;
+using consensus::OpId;
 using env_util::CopyFile;
 using fs::CreateBlockOptions;
 using fs::WritableBlock;
@@ -141,17 +142,28 @@ Status TabletCopyClient::SetTabletToReplace(const scoped_refptr<TabletMetadata>&
                                            data_state));
   }
 
-  replace_tombstoned_tablet_ = true;
-  meta_ = meta;
-
-  int64_t last_logged_term = meta->tombstone_last_logged_opid().term();
-  if (last_logged_term > caller_term) {
+  boost::optional<OpId> last_logged_opid = meta->tombstone_last_logged_opid();
+  if (!last_logged_opid) {
+    // There are certain cases where we can end up with a tombstoned replica
+    // that does not store its last-logged opid. One such case is when there is
+    // WAL corruption at startup time, resulting in a replica being evicted and
+    // deleted. In such a case, it is not possible to determine the last-logged
+    // opid. Another such case (at the time of writing) is initialization
+    // failure due to any number of problems, resulting in the replica going
+    // into an error state. If the replica is tombstoned while in an error
+    // state, the last-logged opid will not be stored. See KUDU-2106.
+    LOG_WITH_PREFIX(INFO) << "overwriting existing tombstoned replica "
+                             "with an unknown last-logged opid";
+  } else if (last_logged_opid->term() > caller_term) {
     return Status::InvalidArgument(
         Substitute("Leader has term $0 but the last log entry written by the tombstoned replica "
                    "for tablet $1 has higher term $2. Refusing tablet copy from leader",
-                   caller_term, tablet_id_, last_logged_term));
+                   caller_term, tablet_id_, last_logged_opid->term()));
   }
 
+  replace_tombstoned_tablet_ = true;
+  meta_ = meta;
+
   // Load the old consensus metadata, if it exists.
   scoped_refptr<ConsensusMetadata> cmeta;
   Status s = cmeta_manager_->Load(tablet_id_, &cmeta);
@@ -243,15 +255,16 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
     // misconfiguration causes us to attempt to copy from an out-of-date
     // source peer, even after passing the term check from the caller in
     // SetTabletToReplace().
-    int64_t last_logged_term = meta_->tombstone_last_logged_opid().term();
-    if (last_logged_term > remote_cstate_->current_term()) {
+
+    boost::optional<OpId> last_logged_opid = meta_->tombstone_last_logged_opid();
+    if (last_logged_opid && last_logged_opid->term() > remote_cstate_->current_term()) {
       return Status::InvalidArgument(
           Substitute("Tablet $0: source peer has term $1 but "
                      "tombstoned replica has last-logged opid with higher term $2. "
                      "Refusing tablet copy from source peer $3",
                      tablet_id_,
                      remote_cstate_->current_term(),
-                     last_logged_term,
+                     last_logged_opid->term(),
                      copy_peer_uuid));
     }
 
@@ -644,7 +657,7 @@ Status TabletCopyClient::VerifyData(uint64_t offset, const DataChunkPB& chunk) {
 }
 
 string TabletCopyClient::LogPrefix() {
-  return Substitute("T $0 P $1: Tablet Copy client: ",
+  return Substitute("T $0 P $1: tablet copy: ",
                     tablet_id_, fs_manager_->uuid());
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tserver/tablet_copy_source_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index 8c57cf0..bf6a313 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -114,8 +114,10 @@ Status TabletCopySourceSession::Init() {
   }
 
   // Get the latest opid in the log at this point in time so we can re-anchor.
-  OpId last_logged_opid;
-  CHECK_NOTNULL(tablet_replica_->log())->GetLatestEntryOpId(&last_logged_opid);
+  // TODO(mpercy): Do we need special handling for boost::none case?
+  boost::optional<OpId> last_logged_opid =
+      tablet_replica_->consensus()->GetLastOpId(consensus::RECEIVED_OPID);
+  if (!last_logged_opid) last_logged_opid = MinimumOpId();
 
   // Get the current segments from the log, including the active segment.
   // The Log doesn't add the active segment to the log reader's list until
@@ -153,7 +155,7 @@ Status TabletCopySourceSession::Init() {
   // leader's log when tablet copy is slow. The remote controls when
   // this anchor is released by ending the tablet copy session.
   RETURN_NOT_OK(tablet_replica_->log_anchor_registry()->UpdateRegistration(
-      last_logged_opid.index(), anchor_owner_token, &log_anchor_));
+      last_logged_opid->index(), anchor_owner_token, &log_anchor_));
 
   LOG(INFO) << Substitute(
       "T $0 P $1: Tablet Copy: opened $2 blocks and $3 log segments",

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index f4cb01d..9f0bfe2 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -143,6 +143,9 @@ using kudu::consensus::GetNodeInstanceRequestPB;
 using kudu::consensus::GetNodeInstanceResponsePB;
 using kudu::consensus::LeaderStepDownRequestPB;
 using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::OpId;
+using kudu::consensus::UnsafeChangeConfigRequestPB;
+using kudu::consensus::UnsafeChangeConfigResponsePB;
 using kudu::consensus::RaftConsensus;
 using kudu::consensus::RunLeaderElectionRequestPB;
 using kudu::consensus::RunLeaderElectionResponsePB;
@@ -1070,13 +1073,15 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re
                        resp, context);
     return;
   }
-  Status s = consensus->GetLastOpId(req->opid_type(), resp->mutable_opid());
-  if (PREDICT_FALSE(!s.ok())) {
-    SetupErrorAndRespond(resp->mutable_error(), s,
-                         TabletServerErrorPB::UNKNOWN_ERROR,
+  boost::optional<OpId> opid = consensus->GetLastOpId(req->opid_type());
+  if (!opid) {
+    SetupErrorAndRespond(resp->mutable_error(),
+                         Status::IllegalState("Cannot fetch last OpId in WAL"),
+                         TabletServerErrorPB::TABLET_NOT_RUNNING,
                          context);
     return;
   }
+  *resp->mutable_opid() = *opid;
   context->RespondSuccess();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 7972380..47b8102 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -126,7 +126,10 @@ class Tablet;
 using consensus::ConsensusMetadata;
 using consensus::ConsensusMetadataManager;
 using consensus::OpId;
+using consensus::OpIdToString;
+using consensus::RECEIVED_OPID;
 using consensus::RaftConfigPB;
+using consensus::RaftConsensus;
 using consensus::StartTabletCopyRequestPB;
 using consensus::kMinimumTerm;
 using fs::DataDirManager;
@@ -470,23 +473,29 @@ void TSTabletManager::RunTabletCopy(
         LOG(FATAL) << LogPrefix(tablet_id) << "Tablet Copy: "
                    << "Found tablet in TABLET_DATA_COPYING state during StartTabletCopy()";
       case TABLET_DATA_TOMBSTONED: {
-        int64_t last_logged_term = meta->tombstone_last_logged_opid().term();
-        CALLBACK_RETURN_NOT_OK_WITH_ERROR(
-            CheckLeaderTermNotLower(tablet_id, leader_term, last_logged_term),
-            TabletServerErrorPB::INVALID_CONFIG);
+        boost::optional<OpId> last_logged_opid = meta->tombstone_last_logged_opid();
+        if (last_logged_opid) {
+          CALLBACK_RETURN_NOT_OK_WITH_ERROR(CheckLeaderTermNotLower(tablet_id, leader_term,
+                                                                    last_logged_opid->term()),
+                                            TabletServerErrorPB::INVALID_CONFIG);
+        }
         break;
       }
       case TABLET_DATA_READY: {
-        Log* log = old_replica->log();
-        if (!log) {
+        shared_ptr<RaftConsensus> consensus = old_replica->shared_consensus();
+        if (!consensus) {
           CALLBACK_AND_RETURN(
-              Status::IllegalState("Log unavailable. Tablet is not running", tablet_id));
+              Status::IllegalState("consensus unavailable: tablet not running", tablet_id));
         }
-        OpId last_logged_opid;
-        log->GetLatestEntryOpId(&last_logged_opid);
-        int64_t last_logged_term = last_logged_opid.term();
+        boost::optional<OpId> opt_last_logged_opid = consensus->GetLastOpId(RECEIVED_OPID);
+        if (!opt_last_logged_opid) {
+          CALLBACK_AND_RETURN(
+              Status::IllegalState("cannot determine last-logged opid: tablet not running",
+                                   tablet_id));
+        }
+        CHECK(opt_last_logged_opid);
         CALLBACK_RETURN_NOT_OK_WITH_ERROR(
-            CheckLeaderTermNotLower(tablet_id, leader_term, last_logged_term),
+            CheckLeaderTermNotLower(tablet_id, leader_term, opt_last_logged_opid->term()),
             TabletServerErrorPB::INVALID_CONFIG);
 
         // Tombstone the tablet and store the last-logged OpId.
@@ -503,7 +512,8 @@ void TSTabletManager::RunTabletCopy(
         // will simply tablet copy this replica again. We could try to
         // check again after calling Shutdown(), and if the check fails, try to
         // reopen the tablet. For now, we live with the (unlikely) race.
-        Status s = DeleteTabletData(meta, cmeta_manager_, TABLET_DATA_TOMBSTONED, last_logged_opid);
+        Status s = DeleteTabletData(meta, cmeta_manager_, TABLET_DATA_TOMBSTONED,
+                                    opt_last_logged_opid);
         if (PREDICT_FALSE(!s.ok())) {
           CALLBACK_AND_RETURN(
               s.CloneAndPrepend(Substitute("Unable to delete on-disk data from tablet $0",
@@ -636,8 +646,8 @@ Status TSTabletManager::DeleteTablet(
   // it's tricky to fix. We could try checking again after the shutdown and
   // restarting the tablet if the local replica committed a higher config
   // change op during that time, or potentially something else more invasive.
+  shared_ptr<RaftConsensus> consensus = replica->shared_consensus();
   if (cas_config_opid_index_less_or_equal && !tablet_deleted) {
-    shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
     if (!consensus) {
       *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
       return Status::IllegalState("Raft Consensus not available. Tablet shutting down");
@@ -655,10 +665,9 @@ Status TSTabletManager::DeleteTablet(
   replica->Shutdown();
 
   boost::optional<OpId> opt_last_logged_opid;
-  if (replica->log()) {
-    OpId last_logged_opid;
-    replica->log()->GetLatestEntryOpId(&last_logged_opid);
-    opt_last_logged_opid = last_logged_opid;
+  if (consensus) {
+    opt_last_logged_opid = consensus->GetLastOpId(RECEIVED_OPID);
+    DCHECK(!opt_last_logged_opid || opt_last_logged_opid->IsInitialized());
   }
 
   Status s = DeleteTabletData(replica->tablet_metadata(), cmeta_manager_, delete_type,
@@ -1063,8 +1072,8 @@ Status TSTabletManager::DeleteTabletData(
   // that was previously in the metadata.
   RETURN_NOT_OK(meta->DeleteTabletData(delete_type, last_logged_opid));
   LOG(INFO) << LogPrefix(tablet_id, meta->fs_manager())
-            << "Tablet deleted. Last logged OpId: "
-            << meta->tombstone_last_logged_opid();
+            << "tablet deleted: last-logged OpId: "
+            << (last_logged_opid ? OpIdToString(*last_logged_opid) : "(unknown)");
   MAYBE_FAULT(FLAGS_fault_crash_after_blocks_deleted);
 
   RETURN_NOT_OK(Log::DeleteOnDiskData(meta->fs_manager(), meta->tablet_id()));


Mime
View raw message