kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject kudu git commit: rpc: Add min / max negotiation threads
Date Mon, 07 Nov 2016 22:32:35 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 46f52bcde -> a97004a91


rpc: Add min / max negotiation threads

Since we use a thread pool for connection negotiation, it would be
helpful for users to be able to specify the minimum and maximum number
of threads in that pool. Prior to this patch, there were no gflags to
control these parameters, and the only builder parameter was the maximum
(however it was not labelled as such).

This allows tuning of the per-tserver negotiation pool size, both
minimum and maximum threads to allow in the thread pool.

Change-Id: Ife98b39d5f3a340702151ab27dc8026c8bac12ac
Reviewed-on: http://gerrit.cloudera.org:8080/4574
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a97004a9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a97004a9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a97004a9

Branch: refs/heads/master
Commit: a97004a9117b9c46e20a1e34de77af7f9478fe74
Parents: 46f52bc
Author: Mike Percy <mpercy@apache.org>
Authored: Thu Sep 29 17:32:00 2016 +0100
Committer: Adar Dembo <adar@cloudera.com>
Committed: Mon Nov 7 22:30:35 2016 +0000

----------------------------------------------------------------------
 .../integration-tests/create-table-stress-test.cc    |  2 +-
 src/kudu/integration-tests/external_mini_cluster.cc  |  2 +-
 src/kudu/rpc/messenger.cc                            | 15 +++++++++++----
 src/kudu/rpc/messenger.h                             | 13 +++++++++----
 src/kudu/server/server_base.cc                       |  8 ++++++++
 5 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/integration-tests/create-table-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/create-table-stress-test.cc b/src/kudu/integration-tests/create-table-stress-test.cc
index c4c63f4..00f3558 100644
--- a/src/kudu/integration-tests/create-table-stress-test.cc
+++ b/src/kudu/integration-tests/create-table-stress-test.cc
@@ -95,7 +95,7 @@ class CreateTableStressTest : public KuduTest {
 
     ASSERT_OK(MessengerBuilder("stress-test-msgr")
               .set_num_reactors(1)
-              .set_negotiation_threads(1)
+              .set_max_negotiation_threads(1)
               .Build(&messenger_));
     master_proxy_.reset(new MasterServiceProxy(messenger_,
                                                cluster_->mini_master()->bound_rpc_addr()));

http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index 621c059..c6d875a 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -129,7 +129,7 @@ Status ExternalMiniCluster::Start() {
 
   RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
                         .set_num_reactors(1)
-                        .set_negotiation_threads(1)
+                        .set_max_negotiation_threads(1)
                         .Build(&messenger_),
                         "Failed to start Messenger for minicluster");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 437add7..c5acc20 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -94,7 +94,8 @@ MessengerBuilder::MessengerBuilder(std::string name)
       connection_keepalive_time_(
           MonoDelta::FromMilliseconds(FLAGS_rpc_default_keepalive_time_ms)),
       num_reactors_(4),
-      num_negotiation_threads_(4),
+      min_negotiation_threads_(0),
+      max_negotiation_threads_(4),
       coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)) {}
 
 MessengerBuilder& MessengerBuilder::set_connection_keepalive_time(const MonoDelta &keepalive)
{
@@ -107,8 +108,13 @@ MessengerBuilder& MessengerBuilder::set_num_reactors(int num_reactors)
{
   return *this;
 }
 
-MessengerBuilder& MessengerBuilder::set_negotiation_threads(int num_negotiation_threads)
{
-  num_negotiation_threads_ = num_negotiation_threads;
+MessengerBuilder& MessengerBuilder::set_min_negotiation_threads(int min_negotiation_threads)
{
+  min_negotiation_threads_ = min_negotiation_threads;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_max_negotiation_threads(int max_negotiation_threads)
{
+  max_negotiation_threads_ = max_negotiation_threads;
   return *this;
 }
 
@@ -271,7 +277,8 @@ Messenger::Messenger(const MessengerBuilder &bld)
     reactors_.push_back(new Reactor(retain_self_, i, bld));
   }
   CHECK_OK(ThreadPoolBuilder("negotiator")
-              .set_max_threads(bld.num_negotiation_threads_)
+              .set_min_threads(bld.min_negotiation_threads_)
+              .set_max_threads(bld.max_negotiation_threads_)
               .Build(&negotiation_pool_));
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 09c8b53..23b39cb 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -83,9 +83,13 @@ class MessengerBuilder {
   // receiving.
   MessengerBuilder &set_num_reactors(int num_reactors);
 
-  // Set the number of connection-negotiation threads that will be used to handle the
-  // blocking connection-negotiation step.
-  MessengerBuilder &set_negotiation_threads(int num_negotiation_threads);
+  // Set the minimum number of connection-negotiation threads that will be used
+  // to handle the blocking connection-negotiation step.
+  MessengerBuilder &set_min_negotiation_threads(int min_negotiation_threads);
+
+  // Set the maximum number of connection-negotiation threads that will be used
+  // to handle the blocking connection-negotiation step.
+  MessengerBuilder &set_max_negotiation_threads(int max_negotiation_threads);
 
   // Set the granularity with which connections are checked for keepalive.
   MessengerBuilder &set_coarse_timer_granularity(const MonoDelta &granularity);
@@ -99,7 +103,8 @@ class MessengerBuilder {
   const std::string name_;
   MonoDelta connection_keepalive_time_;
   int num_reactors_;
-  int num_negotiation_threads_;
+  int min_negotiation_threads_;
+  int max_negotiation_threads_;
   MonoDelta coarse_timer_granularity_;
   scoped_refptr<MetricEntity> metric_entity_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 4d5fb46..bbef89b 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -59,6 +59,12 @@
 DEFINE_int32(num_reactor_threads, 4, "Number of libev reactor threads to start.");
 TAG_FLAG(num_reactor_threads, advanced);
 
+DEFINE_int32(min_negotiation_threads, 0, "Minimum number of connection negotiation threads.");
+TAG_FLAG(min_negotiation_threads, advanced);
+
+DEFINE_int32(max_negotiation_threads, 50, "Maximum number of connection negotiation threads.");
+TAG_FLAG(max_negotiation_threads, advanced);
+
 DECLARE_bool(use_hybrid_clock);
 
 using std::ostringstream;
@@ -177,6 +183,8 @@ Status ServerBase::Init() {
   rpc::MessengerBuilder builder(name_);
 
   builder.set_num_reactors(FLAGS_num_reactor_threads);
+  builder.set_min_negotiation_threads(FLAGS_min_negotiation_threads);
+  builder.set_max_negotiation_threads(FLAGS_max_negotiation_threads);
   builder.set_metric_entity(metric_entity());
   RETURN_NOT_OK(builder.Build(&messenger_));
 


Mime
View raw message