kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject kudu git commit: tablet copy: Refactor to avoid separate expiration map
Date Tue, 02 May 2017 20:54:19 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 50b84ae6b -> b2e664a37


tablet copy: Refactor to avoid separate expiration map

This simplifies the implementation a bit by removing extra data
structures and making it easier to understand.

Also add an assert on the session lock being taken and take the lock at
shutdown time.

Change-Id: Idcf4e57722e4e34e99064713a6e8d246d3ebf920
Reviewed-on: http://gerrit.cloudera.org:8080/6731
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b2e664a3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b2e664a3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b2e664a3

Branch: refs/heads/master
Commit: b2e664a3746279ca2ee08818bf613d8af758d3a9
Parents: 50b84ae
Author: Mike Percy <mpercy@apache.org>
Authored: Mon Apr 24 10:47:04 2017 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue May 2 20:54:07 2017 +0000

----------------------------------------------------------------------
 src/kudu/tserver/tablet_copy_service.cc | 41 +++++++++++++++++-----------
 src/kudu/tserver/tablet_copy_service.h  | 13 +++++----
 2 files changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b2e664a3/src/kudu/tserver/tablet_copy_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service.cc b/src/kudu/tserver/tablet_copy_service.cc
index c405470..37e686e 100644
--- a/src/kudu/tserver/tablet_copy_service.cc
+++ b/src/kudu/tserver/tablet_copy_service.cc
@@ -72,6 +72,10 @@ using tablet::TabletPeer;
 
 namespace tserver {
 
+static MonoTime GetNewExpireTime() {
+  return MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_tablet_copy_idle_timeout_ms);
+}
+
 TabletCopyServiceImpl::TabletCopyServiceImpl(
     ServerBase* server,
     TabletPeerLookupIf* tablet_peer_lookup)
@@ -115,19 +119,21 @@ void TabletCopyServiceImpl::BeginTabletCopySession(
   scoped_refptr<TabletCopySourceSession> session;
   {
     MutexLock l(sessions_lock_);
-    if (!FindCopy(sessions_, session_id, &session)) {
+    const SessionEntry* session_entry = FindOrNull(sessions_, session_id);
+    if (!session_entry) {
       LOG_WITH_PREFIX(INFO) << Substitute(
           "Beginning new tablet copy session on tablet $0 from peer $1"
           " at $2: session id = $3",
           tablet_id, requestor_uuid, context->requestor_string(), session_id);
       session.reset(new TabletCopySourceSession(tablet_peer, session_id,
-                                               requestor_uuid, fs_manager_));
+                                                requestor_uuid, fs_manager_));
       RPC_RETURN_NOT_OK(session->Init(),
                         TabletCopyErrorPB::UNKNOWN_ERROR,
                         Substitute("Error beginning tablet copy session for tablet $0", tablet_id),
                         context);
-      InsertOrDie(&sessions_, session_id, session);
+      InsertOrDie(&sessions_, session_id, { session, GetNewExpireTime() });
     } else {
+      session = session_entry->session;
       LOG_WITH_PREFIX(INFO) << Substitute(
           "Re-sending initialization info for existing tablet copy session on tablet $0"
           " from peer $1 at $2: session_id = $3",
@@ -250,11 +256,13 @@ void TabletCopyServiceImpl::Shutdown() {
   session_expiration_thread_->Join();
 
   // Destroy all tablet copy sessions.
-  vector<string> session_ids;
-  for (const MonoTimeMap::value_type& entry : session_expirations_) {
-    session_ids.push_back(entry.first);
-  }
-  for (const string& session_id : session_ids) {
+  MutexLock l(sessions_lock_);
+  auto iter = sessions_.cbegin();
+  while (iter != sessions_.cend()) {
+    const string& session_id = iter->first;
+    // Increment the iterator before erasing the corresponding session from the
+    // map in DoEndTabletCopySessionUnlocked().
+    ++iter; // Don't use until next iteration of the loop.
     LOG_WITH_PREFIX(INFO) << "Destroying tablet copy session " << session_id
                           << " due to service shutdown";
     TabletCopyErrorPB::Code app_error;
@@ -266,11 +274,13 @@ Status TabletCopyServiceImpl::FindSessionUnlocked(
         const string& session_id,
         TabletCopyErrorPB::Code* app_error,
         scoped_refptr<TabletCopySourceSession>* session) const {
-  if (!FindCopy(sessions_, session_id, session)) {
+  const SessionEntry* session_entry = FindOrNull(sessions_, session_id);
+  if (!session_entry) {
     *app_error = TabletCopyErrorPB::NO_SESSION;
     return Status::NotFound(
         Substitute("Tablet Copy session with Session ID \"$0\" not found", session_id));
   }
+  *session = session_entry->session;
   return Status::OK();
 }
 
@@ -307,14 +317,15 @@ Status TabletCopyServiceImpl::ValidateFetchRequestDataId(
 }
 
 void TabletCopyServiceImpl::ResetSessionExpirationUnlocked(const std::string& session_id)
{
-  MonoTime expiration(MonoTime::Now());
-  expiration.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_idle_timeout_ms));
-  InsertOrUpdate(&session_expirations_, session_id, expiration);
+  SessionEntry* session_entry = FindOrNull(sessions_, session_id);
+  if (!session_entry) return;
+  session_entry->expires = GetNewExpireTime();
 }
 
 Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(
         const std::string& session_id,
         TabletCopyErrorPB::Code* app_error) {
+  sessions_lock_.AssertAcquired();
   scoped_refptr<TabletCopySourceSession> session;
   RETURN_NOT_OK(FindSessionUnlocked(session_id, app_error, &session));
   // Remove the session from the map.
@@ -322,8 +333,6 @@ Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(
   LOG_WITH_PREFIX(INFO) << "Ending tablet copy session " << session_id <<
" on tablet "
                         << session->tablet_id() << " with peer " <<
session->requestor_uuid();
   CHECK_EQ(1, sessions_.erase(session_id));
-  CHECK_EQ(1, session_expirations_.erase(session_id));
-
   return Status::OK();
 }
 
@@ -333,9 +342,9 @@ void TabletCopyServiceImpl::EndExpiredSessions() {
     const MonoTime now = MonoTime::Now();
 
     vector<string> expired_session_ids;
-    for (const MonoTimeMap::value_type& entry : session_expirations_) {
+    for (const auto& entry : sessions_) {
       const string& session_id = entry.first;
-      const MonoTime& expiration = entry.second;
+      const MonoTime& expiration = entry.second.expires;
       if (expiration < now) {
         expired_session_ids.push_back(session_id);
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/b2e664a3/src/kudu/tserver/tablet_copy_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service.h b/src/kudu/tserver/tablet_copy_service.h
index 8ece47d..a49772d 100644
--- a/src/kudu/tserver/tablet_copy_service.h
+++ b/src/kudu/tserver/tablet_copy_service.h
@@ -74,8 +74,12 @@ class TabletCopyServiceImpl : public TabletCopyServiceIf {
   virtual void Shutdown() OVERRIDE;
 
  private:
-  typedef std::unordered_map<std::string, scoped_refptr<TabletCopySourceSession>
> SessionMap;
-  typedef std::unordered_map<std::string, MonoTime> MonoTimeMap;
+  struct SessionEntry {
+    scoped_refptr<TabletCopySourceSession> session;
+    MonoTime expires;
+  };
+
+  typedef std::unordered_map<std::string, SessionEntry> SessionMap;
 
   // Look up session in session map.
   Status FindSessionUnlocked(const std::string& session_id,
@@ -109,13 +113,12 @@ class TabletCopyServiceImpl : public TabletCopyServiceIf {
   FsManager* fs_manager_;
   TabletPeerLookupIf* tablet_peer_lookup_;
 
-  // Protects sessions_ and session_expirations_ maps.
+  // Protects sessions_ map.
   mutable Mutex sessions_lock_;
   SessionMap sessions_;
-  MonoTimeMap session_expirations_;
 
   // Session expiration thread.
-  // TODO: this is a hack, replace with some kind of timer impl. See KUDU-286.
+  // TODO(mpercy): This is a hack, replace some kind of timer. See KUDU-286.
   CountDownLatch shutdown_latch_;
   scoped_refptr<Thread> session_expiration_thread_;
 };


Mime
View raw message