kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/2] kudu git commit: thread: improve performance of starting threads
Date Tue, 17 Oct 2017 16:39:12 GMT
Repository: kudu
Updated Branches:
  refs/heads/master fe23710c6 -> d76220245


thread: improve performance of starting threads

This improves the performance of starting a new thread using
kudu::Thread::StartThread(). Previously, the process of starting the
thread was a little bit more complicated than necessary, involving a
sequence of the child waiting for the parent to do something, then the
parent waiting for the child to do something. Now, neither one has to
wait for the other to proceed.

The one wrinkle here is that for the parent to know the child's tid, it
does have to wait for it to publish it. However, usage of the 'tid()'
function is relatively rare, so we can defer that waiting until someone
accesses it, and then only wait in the odd case that it hasn't yet
published.

This patch includes a very simple benchmark which starts 1000 threads
and then checks the pid for each.

Timings based on the 'Time spent' log output:
---------------------------------------------
Before:
      wall              user             sys
 Min.   :0.06500   Min.   :0.0080   Min.   :0.0160
 1st Qu.:0.06975   1st Qu.:0.0160   1st Qu.:0.0200
 Median :0.07100   Median :0.0200   Median :0.0240
 Mean   :0.07080   Mean   :0.0198   Mean   :0.0234
 3rd Qu.:0.07200   3rd Qu.:0.0240   3rd Qu.:0.0280
 Max.   :0.08200   Max.   :0.0280   Max.   :0.0320

After:
      wall              user             sys
 Min.   :0.01800   Min.   :0.0040   Min.   :0.000
 1st Qu.:0.02575   1st Qu.:0.0080   1st Qu.:0.012
 Median :0.02700   Median :0.0120   Median :0.016
 Mean   :0.02680   Mean   :0.0126   Mean   :0.016
 3rd Qu.:0.02925   3rd Qu.:0.0160   3rd Qu.:0.020
 Max.   :0.03200   Max.   :0.0280   Max.   :0.028

perf-stat results
------------------
Before:
 Performance counter stats for 'build/latest/bin/thread-test --gtest_filter=*Benchmark* --gtest_repeat=10'
(10 runs):

       1095.617237      task-clock (msec)         #    0.863 CPUs utilized            ( +-
 0.54% )
            29,552      context-switches          #    0.027 M/sec                    ( +-
 0.75% )
               354      cpu-migrations            #    0.323 K/sec                    ( +-
 4.55% )
            20,848      page-faults               #    0.019 M/sec                    ( +-
 0.11% )
     2,424,700,945      cycles                    #    2.213 GHz                      ( +-
 0.35% )
     1,369,266,675      instructions              #    0.56  insn per cycle           ( +-
 0.12% )
       286,851,480      branches                  #  261.817 M/sec                    ( +-
 0.11% )
         5,926,839      branch-misses             #    2.07% of all branches          ( +-
 0.39% )

       1.269172342 seconds time elapsed                                          ( +-  0.33%
)

After:
 Performance counter stats for 'build/latest/bin/thread-test --gtest_filter=*Benchmark* --gtest_repeat=10'
(10 runs):

        790.618424      task-clock (msec)         #    1.677 CPUs utilized            ( +-
 0.77% )
            17,178      context-switches          #    0.022 M/sec                    ( +-
 0.91% )
             3,869      cpu-migrations            #    0.005 M/sec                    ( +-
 0.76% )
            20,921      page-faults               #    0.026 M/sec                    ( +-
 0.05% )
     2,149,909,940      cycles                    #    2.719 GHz                      ( +-
 0.41% )
     1,149,982,930      instructions              #    0.53  insn per cycle           ( +-
 0.09% )
       239,776,764      branches                  #  303.277 M/sec                    ( +-
 0.08% )
         4,890,927      branch-misses             #    2.04% of all branches          ( +-
 0.62% )

       0.471358038 seconds time elapsed                                          ( +-  0.76%
)

Change-Id: I5ce7f409ed33548142d1180c9f9653a8f51a7879
Reviewed-on: http://gerrit.cloudera.org:8080/8257
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9fa9cf18
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9fa9cf18
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9fa9cf18

Branch: refs/heads/master
Commit: 9fa9cf181ded1736c6014792fe619f5e0b9a94b4
Parents: fe23710
Author: Todd Lipcon <todd@apache.org>
Authored: Tue Oct 10 20:38:03 2017 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Tue Oct 17 16:36:27 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc |  4 ++
 src/kudu/util/thread-test.cc   | 22 ++++++++-
 src/kudu/util/thread.cc        | 98 ++++++++++++++++++++-----------------
 src/kudu/util/thread.h         | 34 ++++++++++---
 4 files changed, 105 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9fa9cf18/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 450449e..2b8ae5f 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -4799,6 +4799,10 @@ TEST_F(ClientTest, TestServerTooBusyRetry) {
                                    &ClientTest::CheckRowCount, this, client_table_.get(),
kNumRows,
                                    &thread));
     threads.push_back(thread);
+    // Don't start threads too fast - otherwise we could accumulate tens or hundreds
+    // of threads before any of them starts their actual scans, and then it would
+    // take a long time to join on them all eventually finishing down below.
+    SleepFor(MonoDelta::FromMilliseconds(100));
 
     for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
       scoped_refptr<Counter> counter = METRIC_rpcs_queue_overflow.Instantiate(

http://git-wip-us.apache.org/repos/asf/kudu/blob/9fa9cf18/src/kudu/util/thread-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/thread-test.cc b/src/kudu/util/thread-test.cc
index a7d3d9c..69b9cf0 100644
--- a/src/kudu/util/thread-test.cc
+++ b/src/kudu/util/thread-test.cc
@@ -22,6 +22,7 @@
 
 #include <ostream>
 #include <string>
+#include <vector>
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -33,8 +34,9 @@
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
 #include "kudu/util/status.h"
-#include "kudu/util/test_util.h"
+#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
 #include "kudu/util/thread_restrictions.h"
 
 using std::string;
@@ -117,6 +119,24 @@ TEST_F(ThreadTest, TestCallOnExit) {
   ASSERT_EQ("hello 1, hello 2", s);
 }
 
+TEST_F(ThreadTest, ThreadStartBenchmark) {
+  std::vector<scoped_refptr<Thread>> threads(1000);
+  LOG_TIMING(INFO, "starting threads") {
+    for (auto& t : threads) {
+      ASSERT_OK(Thread::Create("test", "TestCallOnExit", usleep, 0, &t));
+    }
+  }
+  LOG_TIMING(INFO, "waiting for all threads to publish TIDs") {
+    for (auto& t : threads) {
+      t->tid();
+    }
+  }
+
+  for (auto& t : threads) {
+    t->Join();
+  }
+}
+
 // The following tests only run in debug mode, since thread restrictions are no-ops
 // in release builds.
 #ifndef NDEBUG

http://git-wip-us.apache.org/repos/asf/kudu/blob/9fa9cf18/src/kudu/util/thread.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/thread.cc b/src/kudu/util/thread.cc
index 3674f06..0378bb6 100644
--- a/src/kudu/util/thread.cc
+++ b/src/kudu/util/thread.cc
@@ -50,7 +50,6 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug-util.h"
-#include "kudu/util/errno.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/kernel_stack_watchdog.h"
 #include "kudu/util/logging.h"
@@ -58,6 +57,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/os-util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/trace.h"
 #include "kudu/util/url-coding.h"
@@ -515,20 +515,57 @@ void Thread::CallAtExit(const Closure& cb) {
 }
 
 std::string Thread::ToString() const {
-  return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid_, name_, category_);
+  return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), name_, category_);
 }
 
+int64_t Thread::WaitForTid() const {
+  const string log_prefix = Substitute("$0 ($1) ", name_, category_);
+  SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix,
+                                   "waiting for new thread to publish its TID");
+  int loop_count = 0;
+  while (true) {
+    int64_t t = Acquire_Load(&tid_);
+    if (t != PARENT_WAITING_TID) return t;
+    boost::detail::yield(loop_count++);
+  }
+}
+
+
 Status Thread::StartThread(const std::string& category, const std::string& name,
                            const ThreadFunctor& functor, uint64_t flags,
                            scoped_refptr<Thread> *holder) {
   TRACE_COUNTER_INCREMENT("threads_started", 1);
   TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us");
+  GoogleOnceInit(&once, &InitThreading);
+
   const string log_prefix = Substitute("$0 ($1) ", name, category);
   SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread");
 
   // Temporary reference for the duration of this function.
   scoped_refptr<Thread> t(new Thread(category, name, functor));
 
+  // Optional, and only set if the thread was successfully created.
+  //
+  // We have to set this before we even start the thread because it's
+  // allowed for the thread functor to access 'holder'.
+  if (holder) {
+    *holder = t;
+  }
+
+  t->tid_ = PARENT_WAITING_TID;
+
+  // Add a reference count to the thread since SuperviseThread() needs to
+  // access the thread object, and we have no guarantee that our caller
+  // won't drop the reference as soon as we return. This is dereferenced
+  // in FinishThread().
+  t->AddRef();
+
+  auto cleanup = MakeScopedCleanup([&]() {
+      // If we failed to create the thread, we need to undo all of our prep work.
+      t->tid_ = INVALID_TID;
+      t->Release();
+    });
+
   if (PREDICT_FALSE(FLAGS_thread_inject_start_latency_ms > 0)) {
     LOG(INFO) << "Injecting " << FLAGS_thread_inject_start_latency_ms <<
"ms sleep on thread start";
     SleepFor(MonoDelta::FromMilliseconds(FLAGS_thread_inject_start_latency_ms));
@@ -549,28 +586,7 @@ Status Thread::StartThread(const std::string& category, const std::string&
name,
   // (or someone communicating with the parent) can join, so joinable must
   // be set before the parent returns.
   t->joinable_ = true;
-
-  // Optional, and only set if the thread was successfully created.
-  if (holder) {
-    *holder = t;
-  }
-
-  // The tid_ member goes through the following states:
-  // 1  CHILD_WAITING_TID: the child has just been spawned and is waiting
-  //    for the parent to finish writing to caller state (i.e. 'holder').
-  // 2. PARENT_WAITING_TID: the parent has updated caller state and is now
-  //    waiting for the child to write the tid.
-  // 3. <value>: both the parent and the child are free to continue. If the
-  //    value is INVALID_TID, the child could not discover its tid.
-  Release_Store(&t->tid_, PARENT_WAITING_TID);
-  {
-    SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix,
-                                     "waiting for new thread to publish its TID");
-    int loop_count = 0;
-    while (Acquire_Load(&t->tid_) == PARENT_WAITING_TID) {
-      boost::detail::yield(loop_count++);
-    }
-  }
+  cleanup.cancel();
 
   VLOG(2) << "Started thread " << t->tid()<< " - " << category
<< ":" << name;
   return Status::OK();
@@ -579,14 +595,9 @@ Status Thread::StartThread(const std::string& category, const std::string&
name,
 void* Thread::SuperviseThread(void* arg) {
   Thread* t = static_cast<Thread*>(arg);
   int64_t system_tid = Thread::CurrentThreadId();
-  if (system_tid == -1) {
-    string error_msg = ErrnoToString(errno);
-    KLOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg;
-  }
-  string name = strings::Substitute("$0-$1", t->name(), system_tid);
+  PCHECK(system_tid != -1);
 
   // Take an additional reference to the thread manager, which we'll need below.
-  GoogleOnceInit(&once, &InitThreading);
   ANNOTATE_IGNORE_SYNC_BEGIN();
   shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
   ANNOTATE_IGNORE_SYNC_END();
@@ -594,21 +605,17 @@ void* Thread::SuperviseThread(void* arg) {
   // Set up the TLS.
   //
   // We could store a scoped_refptr in the TLS itself, but as its
-  // lifecycle is poorly defined, we'll use a bare pointer and take an
-  // additional reference on t out of band, in thread_ref.
-  scoped_refptr<Thread> thread_ref = t;
-  t->tls_ = t;
+  // lifecycle is poorly defined, we'll use a bare pointer. We
+  // already incremented the reference count in StartThread.
+  Thread::tls_ = t;
 
-  // Wait until the parent has updated all caller-visible state, then write
-  // the TID to 'tid_', thus completing the parent<-->child handshake.
-  int loop_count = 0;
-  while (Acquire_Load(&t->tid_) == CHILD_WAITING_TID) {
-    boost::detail::yield(loop_count++);
-  }
+  // Publish our tid to 'tid_', which unblocks any callers waiting in
+  // WaitForTid().
   Release_Store(&t->tid_, system_tid);
 
-  thread_manager->SetThreadName(name, t->tid());
-  thread_manager->AddThread(pthread_self(), name, t->category(), t->tid());
+  string name = strings::Substitute("$0-$1", t->name(), system_tid);
+  thread_manager->SetThreadName(name, t->tid_);
+  thread_manager->AddThread(pthread_self(), name, t->category(), t->tid_);
 
   // FinishThread() is guaranteed to run (even if functor_ throws an
   // exception) because pthread_cleanup_push() creates a scoped object
@@ -636,8 +643,11 @@ void Thread::FinishThread(void* arg) {
   // Signal any Joiner that we're done.
   t->done_.CountDown();
 
-  VLOG(2) << "Ended thread " << t->tid() << " - "
-          << t->category() << ":" << t->name();
+  VLOG(2) << "Ended thread " << t->tid_ << " - " << t->category()
<< ":" << t->name();
+  t->Release();
+  // NOTE: the above 'Release' call could be the last reference to 'this',
+  // so 'this' could be destructed at this point. Do not add any code
+  // following here!
 }
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9fa9cf18/src/kudu/util/thread.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/thread.h b/src/kudu/util/thread.h
index c8b5191..f404c30 100644
--- a/src/kudu/util/thread.h
+++ b/src/kudu/util/thread.h
@@ -36,6 +36,7 @@
 #include <boost/bind.hpp>     // IWYU pragma: keep
 #include <boost/function.hpp> // IWYU pragma: keep
 
+#include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
@@ -215,9 +216,18 @@ class Thread : public RefCountedThreadSafe<Thread> {
   // This callback is guaranteed to be called except in the case of a process crash.
   void CallAtExit(const Closure& cb);
 
-  // The thread ID assigned to this thread by the operating system. If the OS does not
-  // support retrieving the tid, returns Thread::INVALID_TID.
-  int64_t tid() const { return tid_; }
+  // The thread ID assigned to this thread by the operating system. If the thread
+  // has not yet started running, returns INVALID_TID.
+  //
+  // NOTE: this may block for a short amount of time if the thread has just been
+  // started.
+  int64_t tid() const {
+    int64_t t = base::subtle::Acquire_Load(&tid_);
+    if (t != PARENT_WAITING_TID) {
+      return tid_;
+    }
+    return WaitForTid();
+  }
 
   // Returns the thread's pthread ID.
   pthread_t pthread_id() const { return thread_; }
@@ -279,12 +289,10 @@ class Thread : public RefCountedThreadSafe<Thread> {
  private:
   friend class ThreadJoiner;
 
-  // The various special values for tid_ that describe the various steps
-  // in the parent<-->child handshake.
+  // See 'tid_' docs.
   enum {
     INVALID_TID = -1,
-    CHILD_WAITING_TID = -2,
-    PARENT_WAITING_TID = -3,
+    PARENT_WAITING_TID = -2,
   };
 
   // Function object that wraps the user-supplied function to run in a separate thread.
@@ -294,7 +302,7 @@ class Thread : public RefCountedThreadSafe<Thread> {
       : thread_(0),
         category_(std::move(category)),
         name_(std::move(name)),
-        tid_(CHILD_WAITING_TID),
+        tid_(INVALID_TID),
         functor_(std::move(functor)),
         done_(1),
         joinable_(false) {}
@@ -308,6 +316,13 @@ class Thread : public RefCountedThreadSafe<Thread> {
 
   // OS-specific thread ID. Once the constructor finishes StartThread(),
   // guaranteed to be set either to a non-negative integer, or to INVALID_TID.
+  //
+  // The tid_ member goes through the following states:
+  // 1. INVALID_TID: the thread has not been started, or has already exited.
+  // 2. PARENT_WAITING_TID: the parent has started the thread, but the
+  //    thread has not yet begun running. Therefore the TID is not yet known
+  //    but it will be set once the thread starts.
+  // 3. <positive value>: the thread is running.
   int64_t tid_;
 
   // User function to be executed by this thread.
@@ -328,6 +343,9 @@ class Thread : public RefCountedThreadSafe<Thread> {
 
   std::vector<Closure> exit_callbacks_;
 
+  // Wait for the running thread to publish its tid.
+  int64_t WaitForTid() const;
+
   // Starts the thread running SuperviseThread(), and returns once that thread has
   // initialised and its TID has been read. Waits for notification from the started
   // thread that initialisation is complete before returning. On success, stores a


Mime
View raw message