trafficserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bc...@apache.org
Subject trafficserver git commit: TS-3313: Added active queue for incoming connections
Date Fri, 19 Jun 2015 02:56:04 GMT
Repository: trafficserver
Updated Branches:
  refs/heads/master bec6dd64a -> 974e8e3ab


TS-3313: Added active queue for incoming connections


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/974e8e3a
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/974e8e3a
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/974e8e3a

Branch: refs/heads/master
Commit: 974e8e3abc9a3f60f516b24302edde5618a3cfc5
Parents: bec6dd6
Author: Bryan Call <bcall@apache.org>
Authored: Thu Jun 18 19:55:29 2015 -0700
Committer: Bryan Call <bcall@apache.org>
Committed: Thu Jun 18 19:55:29 2015 -0700

----------------------------------------------------------------------
 iocore/net/I_NetVConnection.h     |   6 +-
 iocore/net/Net.cc                 |   8 +-
 iocore/net/P_Net.h                |   4 +-
 iocore/net/P_UnixNet.h            |  26 ++-
 iocore/net/P_UnixNetVConnection.h |  32 ++-
 iocore/net/UnixConnection.cc      |  31 +--
 iocore/net/UnixNet.cc             | 354 +++++++++++++++++++++++----------
 iocore/net/UnixNetVConnection.cc  |  33 +--
 mgmt/RecordsConfig.cc             |   4 +-
 proxy/PluginVC.cc                 |  11 +-
 proxy/PluginVC.h                  |   5 +-
 proxy/http/HttpClientSession.cc   |  10 +-
 proxy/spdy/SpdyCallbacks.cc       |   2 +-
 proxy/spdy/SpdyClientSession.cc   |   2 +-
 proxy/spdy/SpdyClientSession.h    |   2 +-
 15 files changed, 367 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/I_NetVConnection.h
----------------------------------------------------------------------
diff --git a/iocore/net/I_NetVConnection.h b/iocore/net/I_NetVConnection.h
index 5509a35..e9697be 100644
--- a/iocore/net/I_NetVConnection.h
+++ b/iocore/net/I_NetVConnection.h
@@ -429,9 +429,11 @@ public:
   */
   virtual void cancel_inactivity_timeout() = 0;
 
-  virtual void add_to_keep_alive_lru() = 0;
+  virtual void add_to_keep_alive_queue() = 0;
 
-  virtual void remove_from_keep_alive_lru() = 0;
+  virtual void remove_from_keep_alive_queue() = 0;
+
+  virtual bool add_to_active_queue() = 0;
 
   /** @return the current active_timeout value in nanosecs */
   virtual ink_hrtime get_active_timeout() = 0;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/Net.cc
----------------------------------------------------------------------
diff --git a/iocore/net/Net.cc b/iocore/net/Net.cc
index 5c8acc3..fbdf03d 100644
--- a/iocore/net/Net.cc
+++ b/iocore/net/Net.cc
@@ -125,12 +125,12 @@ register_net_stats()
                      (int)inactivity_cop_lock_acquire_failure_stat, RecRawStatSyncSum);
 
   RecRegisterRawStat(net_rsb, RECT_PROCESS, "proxy.process.net.dynamic_keep_alive_timeout_in_total",
RECD_INT, RECP_NON_PERSISTENT,
-                     (int)keep_alive_lru_timeout_total_stat, RecRawStatSyncSum);
-  NET_CLEAR_DYN_STAT(keep_alive_lru_timeout_total_stat);
+                     (int)keep_alive_queue_timeout_total_stat, RecRawStatSyncSum);
+  NET_CLEAR_DYN_STAT(keep_alive_queue_timeout_total_stat);
 
   RecRegisterRawStat(net_rsb, RECT_PROCESS, "proxy.process.net.dynamic_keep_alive_timeout_in_count",
RECD_INT, RECP_NON_PERSISTENT,
-                     (int)keep_alive_lru_timeout_count_stat, RecRawStatSyncSum);
-  NET_CLEAR_DYN_STAT(keep_alive_lru_timeout_count_stat);
+                     (int)keep_alive_queue_timeout_count_stat, RecRawStatSyncSum);
+  NET_CLEAR_DYN_STAT(keep_alive_queue_timeout_count_stat);
 
   RecRegisterRawStat(net_rsb, RECT_PROCESS, "proxy.process.net.default_inactivity_timeout_applied",
RECD_INT, RECP_NON_PERSISTENT,
                      (int)default_inactivity_timeout_stat, RecRawStatSyncSum);

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/P_Net.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_Net.h b/iocore/net/P_Net.h
index c80365f..78aac91 100644
--- a/iocore/net/P_Net.h
+++ b/iocore/net/P_Net.h
@@ -50,8 +50,8 @@ enum Net_Stats {
   socks_connections_unsuccessful_stat,
   socks_connections_currently_open_stat,
   inactivity_cop_lock_acquire_failure_stat,
-  keep_alive_lru_timeout_total_stat,
-  keep_alive_lru_timeout_count_stat,
+  keep_alive_queue_timeout_total_stat,
+  keep_alive_queue_timeout_count_stat,
   default_inactivity_timeout_stat,
   Net_Stat_Count
 };

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/P_UnixNet.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h
index 112c357..c3794e6 100644
--- a/iocore/net/P_UnixNet.h
+++ b/iocore/net/P_UnixNet.h
@@ -185,8 +185,19 @@ public:
   DList(UnixNetVConnection, cop_link) cop_list;
   ASLLM(UnixNetVConnection, NetState, read, enable_link) read_enable_list;
   ASLLM(UnixNetVConnection, NetState, write, enable_link) write_enable_list;
-  Que(UnixNetVConnection, keep_alive_link) keep_alive_list;
-  uint32_t keep_alive_lru_size;
+  Que(UnixNetVConnection, keep_alive_queue_link) keep_alive_queue;
+  uint32_t keep_alive_queue_size;
+  Que(UnixNetVConnection, active_queue_link) active_queue;
+  uint32_t active_queue_size;
+  uint32_t max_connections_per_thread_in;
+  uint32_t max_connections_active_per_thread_in;
+
+  // configuration settings for managing the active and keep-alive queues
+  uint32_t max_connections_in;
+  uint32_t max_connections_active_in;
+  uint32_t inactive_threashold_in;
+  uint32_t transaction_no_activity_timeout_in;
+  uint32_t keep_alive_no_activity_timeout_in;
 
   time_t sec;
   int cycles;
@@ -195,8 +206,19 @@ public:
   int mainNetEvent(int event, Event *data);
   int mainNetEventExt(int event, Event *data);
   void process_enabled_list(NetHandler *);
+  void manage_keep_alive_queue();
+  bool manage_active_queue();
+  void add_to_keep_alive_queue(UnixNetVConnection *vc);
+  void remove_from_keep_alive_queue(UnixNetVConnection *vc);
+  bool add_to_active_queue(UnixNetVConnection *vc);
+  void remove_from_active_queue(UnixNetVConnection *vc);
+  void configure_per_thread();
 
   NetHandler();
+
+private:
+  void _close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, int &closed,
int &total_idle_time,
+                 int &total_idle_count);
 };
 
 static inline NetHandler *

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/P_UnixNetVConnection.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_UnixNetVConnection.h b/iocore/net/P_UnixNetVConnection.h
index c8ce1eb..61cf925 100644
--- a/iocore/net/P_UnixNetVConnection.h
+++ b/iocore/net/P_UnixNetVConnection.h
@@ -149,8 +149,10 @@ public:
   virtual void set_inactivity_timeout(ink_hrtime timeout_in);
   virtual void cancel_active_timeout();
   virtual void cancel_inactivity_timeout();
-  virtual void add_to_keep_alive_lru();
-  virtual void remove_from_keep_alive_lru();
+  virtual void add_to_keep_alive_queue();
+  virtual void remove_from_keep_alive_queue();
+  virtual bool add_to_active_queue();
+  virtual void remove_from_active_queue();
 
   // The public interface is VIO::reenable()
   virtual void reenable(VIO *vio);
@@ -225,17 +227,19 @@ public:
   SLINKM(UnixNetVConnection, read, enable_link)
   LINKM(UnixNetVConnection, write, ready_link)
   SLINKM(UnixNetVConnection, write, enable_link)
-  LINK(UnixNetVConnection, keep_alive_link);
+  LINK(UnixNetVConnection, keep_alive_queue_link);
+  LINK(UnixNetVConnection, active_queue_link);
 
   ink_hrtime inactivity_timeout_in;
   ink_hrtime active_timeout_in;
 #ifdef INACTIVITY_TIMEOUT
   Event *inactivity_timeout;
+  Event *activity_timeout;
 #else
   ink_hrtime next_inactivity_timeout_at;
+  ink_hrtime next_activity_timeout_at;
 #endif
 
-  Event *active_timeout;
   EventIO ep;
   NetHandler *nh;
   unsigned int id;
@@ -310,9 +314,8 @@ UnixNetVConnection::set_inactivity_timeout(ink_hrtime timeout)
 {
   Debug("socket", "Set inactive timeout=%" PRId64 ", for NetVC=%p", timeout, this);
   inactivity_timeout_in = timeout;
-#ifndef INACTIVITY_TIMEOUT
-  next_inactivity_timeout_at = ink_get_hrtime() + timeout;
-#else
+#ifdef INACTIVITY_TIMEOUT
+
   if (inactivity_timeout)
     inactivity_timeout->cancel_action(this);
   if (inactivity_timeout_in) {
@@ -332,6 +335,8 @@ UnixNetVConnection::set_inactivity_timeout(ink_hrtime timeout)
       inactivity_timeout = 0;
   } else
     inactivity_timeout = 0;
+#else
+  next_inactivity_timeout_at = ink_get_hrtime() + timeout;
 #endif
 }
 
@@ -340,6 +345,7 @@ UnixNetVConnection::set_active_timeout(ink_hrtime timeout)
 {
   Debug("socket", "Set active timeout=%" PRId64 ", NetVC=%p", timeout, this);
   active_timeout_in = timeout;
+#ifdef INACTIVITY_TIMEOUT
   if (active_timeout)
     active_timeout->cancel_action(this);
   if (active_timeout_in) {
@@ -359,11 +365,15 @@ UnixNetVConnection::set_active_timeout(ink_hrtime timeout)
       active_timeout = 0;
   } else
     active_timeout = 0;
+#else
+  next_activity_timeout_at = ink_get_hrtime() + timeout;
+#endif
 }
 
 TS_INLINE void
 UnixNetVConnection::cancel_inactivity_timeout()
 {
+  Debug("socket", "Cancel inactive timeout for NetVC=%p", this);
   inactivity_timeout_in = 0;
 #ifdef INACTIVITY_TIMEOUT
   if (inactivity_timeout) {
@@ -372,7 +382,6 @@ UnixNetVConnection::cancel_inactivity_timeout()
     inactivity_timeout = NULL;
   }
 #else
-  Debug("socket", "Cancel inactive timeout for NetVC=%p", this);
   next_inactivity_timeout_at = 0;
 #endif
 }
@@ -380,12 +389,17 @@ UnixNetVConnection::cancel_inactivity_timeout()
 TS_INLINE void
 UnixNetVConnection::cancel_active_timeout()
 {
+  Debug("socket", "Cancel active timeout for NetVC=%p", this);
+  active_timeout_in = 0;
+#ifdef INACTIVITY_TIMEOUT
   if (active_timeout) {
     Debug("socket", "Cancel active timeout for NetVC=%p", this);
     active_timeout->cancel_action(this);
     active_timeout = NULL;
-    active_timeout_in = 0;
   }
+#else
+  next_activity_timeout_at = 0;
+#endif
 }
 
 TS_INLINE int

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/UnixConnection.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixConnection.cc b/iocore/net/UnixConnection.cc
index 303175a..bcf560e 100644
--- a/iocore/net/UnixConnection.cc
+++ b/iocore/net/UnixConnection.cc
@@ -392,24 +392,25 @@ Connection::apply_options(NetVCOptions const &opt)
 }
 
 void
-UnixNetVConnection::add_to_keep_alive_lru()
+UnixNetVConnection::add_to_keep_alive_queue()
 {
-  Debug("socket", "UnixNetVConnection::add_to_keep_alive_lru NetVC=%p", this);
-  if (nh->keep_alive_list.in(this)) {
-    nh->keep_alive_list.remove(this);
-    nh->keep_alive_list.enqueue(this);
-  } else {
-    nh->keep_alive_list.enqueue(this);
-    ++nh->keep_alive_lru_size;
-  }
+  nh->add_to_keep_alive_queue(this);
 }
 
 void
-UnixNetVConnection::remove_from_keep_alive_lru()
+UnixNetVConnection::remove_from_keep_alive_queue()
 {
-  Debug("socket", "UnixNetVConnection::remove_from_keep_alive_lru NetVC=%p", this);
-  if (nh->keep_alive_list.in(this)) {
-    nh->keep_alive_list.remove(this);
-    --nh->keep_alive_lru_size;
-  }
+  nh->remove_from_keep_alive_queue(this);
+}
+
+bool
+UnixNetVConnection::add_to_active_queue()
+{
+  return nh->add_to_active_queue(this);
+}
+
+void
+UnixNetVConnection::remove_from_active_queue()
+{
+  nh->remove_from_active_queue(this);
 }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/UnixNet.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc
index ff2f0d6..214ab12 100644
--- a/iocore/net/UnixNet.cc
+++ b/iocore/net/UnixNet.cc
@@ -43,15 +43,12 @@ int update_cop_config(const char *name, RecDataT data_type, RecData data,
void *
 class InactivityCop : public Continuation
 {
 public:
-  InactivityCop(ProxyMutex *m)
-    : Continuation(m), default_inactivity_timeout(0), total_connections_in(0), max_connections_in(0),
connections_per_thread_in(0)
+  InactivityCop(ProxyMutex *m) : Continuation(m), default_inactivity_timeout(0)
   {
     SET_HANDLER(&InactivityCop::check_inactivity);
     REC_ReadConfigInteger(default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
     Debug("inactivity_cop", "default inactivity timeout is set to: %d", default_inactivity_timeout);
-    REC_ReadConfigInt32(max_connections_in, "proxy.config.net.max_connections_in");
 
-    RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_cop_config, (void
*)this);
     RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_cop_config,
(void *)this);
   }
 
@@ -63,14 +60,10 @@ public:
     NetHandler &nh = *get_NetHandler(this_ethread());
 
     Debug("inactivity_cop_check", "Checking inactivity on Thread-ID #%d", this_ethread()->id);
-    total_connections_in = 0;
     // Copy the list and use pop() to catch any closes caused by callbacks.
     forl_LL(UnixNetVConnection, vc, nh.open_list)
     {
       if (vc->thread == this_ethread()) {
-        if (vc->from_accept_thread == true) {
-          ++total_connections_in;
-        }
         nh.cop_list.push(vc);
       }
     }
@@ -98,11 +91,11 @@ public:
       }
 
       if (vc->next_inactivity_timeout_at && vc->next_inactivity_timeout_at
< now) {
-        if (nh.keep_alive_list.in(vc)) {
+        if (nh.keep_alive_queue.in(vc)) {
           // only stat if the connection is in keep-alive, there can be other inactivity
timeouts
           ink_hrtime diff = (now - (vc->next_inactivity_timeout_at - vc->inactivity_timeout_in))
/ HRTIME_SECOND;
-          NET_SUM_DYN_STAT(keep_alive_lru_timeout_total_stat, diff);
-          NET_INCREMENT_DYN_STAT(keep_alive_lru_timeout_count_stat);
+          NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
+          NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
         }
         Debug("inactivity_cop_verbose", "vc: %p now: %" PRId64 " timeout at: %" PRId64 "
timeout in: %" PRId64, vc, now,
               vc->next_inactivity_timeout_at, vc->inactivity_timeout_in);
@@ -110,34 +103,21 @@ public:
       }
     }
 
-    // Keep-alive LRU for incoming connections
-    keep_alive_lru(nh, now, e);
+    // Cleanup the active and keep-alive queues periodically
+    nh.manage_active_queue();
+    nh.manage_keep_alive_queue();
 
     return 0;
   }
 
   void
-  set_max_connections(const int32_t x)
-  {
-    max_connections_in = x;
-  }
-  void
-  set_connections_per_thread(const int32_t x)
-  {
-    connections_per_thread_in = x;
-  }
-  void
   set_default_timeout(const int x)
   {
     default_inactivity_timeout = x;
   }
 
 private:
-  void keep_alive_lru(NetHandler &nh, ink_hrtime now, Event *e);
   int default_inactivity_timeout; // only used when one is not set for some bad reason
-  int32_t total_connections_in;
-  int32_t max_connections_in;
-  int32_t connections_per_thread_in;
 };
 
 int
@@ -147,12 +127,6 @@ update_cop_config(const char *name, RecDataT data_type ATS_UNUSED, RecData
data,
   ink_assert(cop != NULL);
 
   if (cop != NULL) {
-    if (strcmp(name, "proxy.config.net.max_connections_in") == 0) {
-      Debug("inactivity_cop_dynamic", "proxy.config.net.max_connections_in updated to %"
PRId64, data.rec_int);
-      cop->set_max_connections(data.rec_int);
-      cop->set_connections_per_thread(0);
-    }
-
     if (strcmp(name, "proxy.config.net.default_inactivity_timeout") == 0) {
       Debug("inactivity_cop_dynamic", "proxy.config.net.default_inactivity_timeout updated
to %" PRId64, data.rec_int);
       cop->set_default_timeout(data.rec_int);
@@ -162,77 +136,6 @@ update_cop_config(const char *name, RecDataT data_type ATS_UNUSED, RecData
data,
   return REC_ERR_OKAY;
 }
 
-void
-InactivityCop::keep_alive_lru(NetHandler &nh, const ink_hrtime now, Event *e)
-{
-  // maximum incoming connections is set to 0 then the feature is disabled
-  if (max_connections_in == 0) {
-    return;
-  }
-
-  if (connections_per_thread_in == 0) {
-    // figure out the number of threads and calculate the number of connections per thread
-    const int event_threads = eventProcessor.n_threads_for_type[ET_NET];
-    const int ssl_threads = (ET_NET == SSLNetProcessor::ET_SSL) ? 0 : eventProcessor.n_threads_for_type[SSLNetProcessor::ET_SSL];
-    connections_per_thread_in = max_connections_in / (event_threads + ssl_threads);
-  }
-
-  // calculate how many connections to close
-  int32_t to_process = total_connections_in - connections_per_thread_in;
-  if (to_process <= 0) {
-    return;
-  }
-  to_process = min((int32_t)nh.keep_alive_lru_size, to_process);
-
-  Debug("inactivity_cop_dynamic", "max cons: %d active: %d idle: %d process: %d"
-                                  " net type: %d ssl type: %d",
-        connections_per_thread_in, total_connections_in - nh.keep_alive_lru_size, nh.keep_alive_lru_size,
to_process, ET_NET,
-        SSLNetProcessor::ET_SSL);
-
-  // loop over the non-active connections and try to close them
-  UnixNetVConnection *vc = nh.keep_alive_list.head;
-  UnixNetVConnection *vc_next = NULL;
-  int closed = 0;
-  int handle_event = 0;
-  int total_idle_time = 0;
-  int total_idle_count = 0;
-  for (int32_t i = 0; i < to_process && vc != NULL; ++i, vc = vc_next) {
-    vc_next = vc->keep_alive_link.next;
-    if (vc->thread != this_ethread()) {
-      continue;
-    }
-    MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread());
-    if (!lock.is_locked()) {
-      continue;
-    }
-    ink_hrtime diff = (now - (vc->next_inactivity_timeout_at - vc->inactivity_timeout_in))
/ HRTIME_SECOND;
-    if (diff > 0) {
-      total_idle_time += diff;
-      ++total_idle_count;
-      NET_SUM_DYN_STAT(keep_alive_lru_timeout_total_stat, diff);
-      NET_INCREMENT_DYN_STAT(keep_alive_lru_timeout_count_stat);
-    }
-    Debug("inactivity_cop_dynamic",
-          "closing connection NetVC=%p idle: %u now: %" PRId64 " at: %" PRId64 " in: %" PRId64
" diff: %" PRId64, vc,
-          nh.keep_alive_lru_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(vc->next_inactivity_timeout_at),
-          ink_hrtime_to_sec(vc->inactivity_timeout_in), diff);
-    if (vc->closed) {
-      close_UnixNetVConnection(vc, e->ethread);
-      ++closed;
-    } else {
-      vc->next_inactivity_timeout_at = now;
-      nh.keep_alive_list.head->handleEvent(EVENT_IMMEDIATE, e);
-      ++handle_event;
-    }
-  }
-
-  if (total_idle_count > 0) {
-    Debug("inactivity_cop_dynamic", "max cons: %d active: %d idle: %d already closed: %d,
close event: %d"
-                                    " mean idle: %d\n",
-          connections_per_thread_in, total_connections_in - nh.keep_alive_lru_size - closed
- handle_event, nh.keep_alive_lru_size,
-          closed, handle_event, total_idle_time / total_idle_count);
-  }
-}
 #endif
 
 PollCont::PollCont(ProxyMutex *m, int pt) : Continuation(m), net_handler(NULL), nextPollDescriptor(NULL),
poll_timeout(pt)
@@ -387,11 +290,52 @@ initialize_thread_for_net(EThread *thread)
 
 // NetHandler method definitions
 
-NetHandler::NetHandler() : Continuation(NULL), trigger_event(0), keep_alive_lru_size(0)
+NetHandler::NetHandler() : Continuation(NULL), trigger_event(0), keep_alive_queue_size(0),
active_queue_size(0)
 {
   SET_HANDLER((NetContHandler)&NetHandler::startNetEvent);
 }
 
+
+int
+update_nethandler_config(const char *name, RecDataT data_type ATS_UNUSED, RecData data, void
*cookie)
+{
+  NetHandler *nh = static_cast<NetHandler *>(cookie);
+  ink_assert(nh != NULL);
+  bool update_per_thread_configuration = false;
+
+  if (nh != NULL) {
+    if (strcmp(name, "proxy.config.net.max_connections_in") == 0) {
+      Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
+      nh->max_connections_in = data.rec_int;
+      update_per_thread_configuration = true;
+    }
+    if (strcmp(name, "proxy.config.net.max_active_connections_in") == 0) {
+      Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %" PRId64,
data.rec_int);
+      nh->max_connections_active_in = data.rec_int;
+      update_per_thread_configuration = true;
+    }
+    if (strcmp(name, "proxy.config.net.inactive_threashold_in") == 0) {
+      Debug("net_queue", "proxy.config.net.inactive_threashold_in updated to %" PRId64, data.rec_int);
+      nh->inactive_threashold_in = data.rec_int;
+    }
+    if (strcmp(name, "proxy.config.net.transaction_no_activity_timeout_in") == 0) {
+      Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to
%" PRId64, data.rec_int);
+      nh->transaction_no_activity_timeout_in = data.rec_int;
+    }
+    if (strcmp(name, "proxy.config.net.keep_alive_no_activity_timeout_in") == 0) {
+      Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %"
PRId64, data.rec_int);
+      nh->keep_alive_no_activity_timeout_in = data.rec_int;
+    }
+  }
+
+  if (update_per_thread_configuration == true) {
+    nh->configure_per_thread();
+  }
+
+  return REC_ERR_OKAY;
+}
+
+
 //
 // Initialization here, in the thread in which we will be executing
 // from now on.
@@ -399,6 +343,27 @@ NetHandler::NetHandler() : Continuation(NULL), trigger_event(0), keep_alive_lru_
 int
 NetHandler::startNetEvent(int event, Event *e)
 {
+  // read configuration values and setup callbacks for when they change
+  REC_ReadConfigInt32(max_connections_in, "proxy.config.net.max_connections_in");
+  REC_ReadConfigInt32(max_connections_active_in, "proxy.config.net.max_connections_active_in");
+  REC_ReadConfigInt32(inactive_threashold_in, "proxy.config.net.inactive_threashold_in");
+  REC_ReadConfigInt32(transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in");
+  REC_ReadConfigInt32(keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in");
+
+  RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config,
(void *)this);
+  RecRegisterConfigUpdateCb("proxy.config.net.max_active_connections_in", update_nethandler_config,
(void *)this);
+  RecRegisterConfigUpdateCb("proxy.config.net.inactive_threashold_in", update_nethandler_config,
(void *)this);
+  RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config,
(void *)this);
+  RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config,
(void *)this);
+
+  Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", max_connections_in);
+  Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %d", max_connections_active_in);
+  Debug("net_queue", "proxy.config.net.inactive_threashold_in updated to %d", inactive_threashold_in);
+  Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d",
transaction_no_activity_timeout_in);
+  Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d",
keep_alive_no_activity_timeout_in);
+
+  configure_per_thread();
+
   (void)event;
   SET_HANDLER((NetContHandler)&NetHandler::mainNetEvent);
   e->schedule_every(-HRTIME_MSECONDS(net_event_period));
@@ -597,3 +562,182 @@ NetHandler::mainNetEvent(int event, Event *e)
 
   return EVENT_CONT;
 }
+
+bool
+NetHandler::manage_active_queue()
+{
+  const int total_connections_in = active_queue_size + keep_alive_queue_size;
+  Debug("net_queue", "max_connections_per_thread_in: %d max_connections_active_per_thread_in:
%d total_connections_in: %d "
+                     "active_queue_size: %d keep_alive_queue_size: %d",
+        max_connections_per_thread_in, max_connections_active_per_thread_in, total_connections_in,
active_queue_size,
+        keep_alive_queue_size);
+
+  if (max_connections_active_per_thread_in > active_queue_size) {
+    return true;
+  }
+
+  ink_hrtime now = ink_get_hrtime();
+
+  // loop over the non-active connections and try to close them
+  UnixNetVConnection *vc = active_queue.head;
+  UnixNetVConnection *vc_next = NULL;
+  int closed = 0;
+  int handle_event = 0;
+  int total_idle_time = 0;
+  int total_idle_count = 0;
+  for (; vc != NULL; vc = vc_next) {
+    if ((vc->next_inactivity_timeout_at > now) || (vc->next_activity_timeout_at
> now)) {
+      _close_vc(vc, now, handle_event, closed, total_idle_time, total_idle_count);
+    }
+    if (max_connections_active_per_thread_in > active_queue_size) {
+      return true;
+    }
+  }
+
+  return false; // failed to make room in the queue, all connections are active
+}
+
+void
+NetHandler::configure_per_thread()
+{
+  // figure out the number of threads and calculate the number of connections per thread
+  int threads = eventProcessor.n_threads_for_type[ET_NET];
+  threads += (ET_NET == SSLNetProcessor::ET_SSL) ? 0 : eventProcessor.n_threads_for_type[SSLNetProcessor::ET_SSL];
+  max_connections_per_thread_in = max_connections_in / threads;
+  max_connections_active_per_thread_in = max_connections_active_in / threads;
+  Debug("net_queue", "max_connections_per_thread_in updated to %d threads: %d", max_connections_per_thread_in,
threads);
+  Debug("net_queue", "max_connections_active_per_thread_in updated to %d threads: %d", max_connections_active_per_thread_in,
+        threads);
+}
+
+void
+NetHandler::manage_keep_alive_queue()
+{
+  uint32_t total_connections_in = active_queue_size + keep_alive_queue_size;
+  ink_hrtime now = ink_get_hrtime();
+
+  Debug("net_queue", "max_connections_per_thread_in: %d total_connections_in: %d active_queue_size:
%d keep_alive_queue_size: %d",
+        max_connections_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size);
+
+  if (total_connections_in <= max_connections_per_thread_in) {
+    return;
+  }
+
+  // loop over the non-active connections and try to close them
+  UnixNetVConnection *vc_next = NULL;
+  int closed = 0;
+  int handle_event = 0;
+  int total_idle_time = 0;
+  int total_idle_count = 0;
+  for (UnixNetVConnection *vc = keep_alive_queue.head; vc != NULL; vc = vc_next) {
+    vc_next = vc->active_queue_link.next;
+    _close_vc(vc, now, handle_event, closed, total_idle_time, total_idle_count);
+
+    total_connections_in = active_queue_size + keep_alive_queue_size;
+    if (total_connections_in <= max_connections_per_thread_in) {
+      break;
+    }
+  }
+
+  if (total_idle_count > 0) {
+    Debug("net_queue", "max cons: %d active: %d idle: %d already closed: %d, close event:
%d mean idle: %d\n",
+          max_connections_per_thread_in, total_connections_in, keep_alive_queue_size, closed,
handle_event,
+          total_idle_time / total_idle_count);
+  }
+}
+
+void
+NetHandler::_close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, int
&closed, int &total_idle_time,
+                      int &total_idle_count)
+{
+  if (vc->thread != this_ethread()) {
+    return;
+  }
+  MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread());
+  if (!lock.is_locked()) {
+    return;
+  }
+  ink_hrtime diff = (now - (vc->next_inactivity_timeout_at - vc->inactivity_timeout_in))
/ HRTIME_SECOND;
+  if (diff > 0) {
+    total_idle_time += diff;
+    ++total_idle_count;
+    NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
+    NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
+  }
+  Debug("net_queue", "closing connection NetVC=%p idle: %u now: %" PRId64 " at: %" PRId64
" in: %" PRId64 " diff: %" PRId64, vc,
+        keep_alive_queue_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(vc->next_inactivity_timeout_at),
+        ink_hrtime_to_sec(vc->inactivity_timeout_in), diff);
+  if (vc->closed) {
+    close_UnixNetVConnection(vc, this_ethread());
+    ++closed;
+  } else {
+    vc->next_inactivity_timeout_at = now;
+    keep_alive_queue.head->handleEvent(EVENT_IMMEDIATE, NULL);
+    ++handle_event;
+  }
+}
+
+void
+NetHandler::add_to_keep_alive_queue(UnixNetVConnection *vc)
+{
+  Debug("net_queue", "NetVC: %p", vc);
+
+  if (keep_alive_queue.in(vc)) {
+    // already in the keep-alive queue, move the head
+    keep_alive_queue.remove(vc);
+  } else {
+    // in the active queue or no queue, new to this queue
+    remove_from_active_queue(vc);
+    ++keep_alive_queue_size;
+  }
+  keep_alive_queue.enqueue(vc);
+
+  // if keep-alive queue is over size then close connections
+  manage_keep_alive_queue();
+}
+
+void
+NetHandler::remove_from_keep_alive_queue(UnixNetVConnection *vc)
+{
+  Debug("net_queue", "NetVC: %p", vc);
+  if (keep_alive_queue.in(vc)) {
+    keep_alive_queue.remove(vc);
+    --keep_alive_queue_size;
+  }
+}
+
+bool
+NetHandler::add_to_active_queue(UnixNetVConnection *vc)
+{
+  Debug("net_queue", "NetVC: %p", vc);
+  Debug("net_queue", "max_connections_per_thread_in: %d active_queue_size: %d keep_alive_queue_size:
%d",
+        max_connections_per_thread_in, active_queue_size, keep_alive_queue_size);
+
+  // if active queue is over size then close inactive connections
+  if (manage_active_queue() == false) {
+    // there is no room left in the queue
+    return false;
+  }
+
+  if (active_queue.in(vc)) {
+    // already in the active queue, move the head
+    active_queue.remove(vc);
+  } else {
+    // in the keep-alive queue or no queue, new to this queue
+    remove_from_keep_alive_queue(vc);
+    ++active_queue_size;
+  }
+  active_queue.enqueue(vc);
+
+  return true;
+}
+
+void
+NetHandler::remove_from_active_queue(UnixNetVConnection *vc)
+{
+  Debug("net_queue", "NetVC: %p", vc);
+  if (active_queue.in(vc)) {
+    active_queue.remove(vc);
+    --active_queue_size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/UnixNetVConnection.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc
index 1a3ccea..c4ec29c 100644
--- a/iocore/net/UnixNetVConnection.cc
+++ b/iocore/net/UnixNetVConnection.cc
@@ -103,14 +103,16 @@ close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
     vc->inactivity_timeout->cancel_action(vc);
     vc->inactivity_timeout = NULL;
   }
-#else
-  vc->next_inactivity_timeout_at = 0;
-#endif
-  vc->inactivity_timeout_in = 0;
   if (vc->active_timeout) {
     vc->active_timeout->cancel_action(vc);
     vc->active_timeout = NULL;
   }
+#else
+  vc->next_inactivity_timeout_at = 0;
+  vc->next_activity_timeout_at = 0;
+#endif
+  vc->inactivity_timeout_in = 0;
+
   vc->active_timeout_in = 0;
   nh->open_list.remove(vc);
   nh->cop_list.remove(vc);
@@ -124,7 +126,8 @@ close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
     nh->write_enable_list.remove(vc);
     vc->write.in_enabled_list = 0;
   }
-  vc->remove_from_keep_alive_lru();
+  vc->remove_from_keep_alive_queue();
+  vc->remove_from_active_queue();
   vc->free(t);
 }
 
@@ -842,11 +845,11 @@ UnixNetVConnection::reenable_re(VIO *vio)
 UnixNetVConnection::UnixNetVConnection()
   : closed(0), inactivity_timeout_in(0), active_timeout_in(0),
 #ifdef INACTIVITY_TIMEOUT
-    inactivity_timeout(NULL),
+    inactivity_timeout(NULL), active_timeout(NULL),
 #else
-    next_inactivity_timeout_at(0),
+    next_inactivity_timeout_at(0), next_activity_timeout_at(0),
 #endif
-    active_timeout(NULL), nh(NULL), id(0), flags(0), recursion(0), submit_time(0), oob_ptr(0),
from_accept_thread(false)
+    nh(NULL), id(0), flags(0), recursion(0), submit_time(0), oob_ptr(0), from_accept_thread(false)
 {
   memset(&local_addr, 0, sizeof local_addr);
   memset(&server_addr, 0, sizeof server_addr);
@@ -1059,7 +1062,7 @@ UnixNetVConnection::mainEvent(int event, Event *e)
   if (!hlock.is_locked() || !rlock.is_locked() || !wlock.is_locked() ||
       (read.vio.mutex.m_ptr && rlock.get_mutex() != read.vio.mutex.m_ptr) ||
       (write.vio.mutex.m_ptr && wlock.get_mutex() != write.vio.mutex.m_ptr)) {
-#ifndef INACTIVITY_TIMEOUT
+#ifdef INACTIVITY_TIMEOUT
     if (e == active_timeout)
 #endif
       e->schedule_in(HRTIME_MSECONDS(net_retry_delay));
@@ -1081,6 +1084,10 @@ UnixNetVConnection::mainEvent(int event, Event *e)
   if (e == inactivity_timeout) {
     signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
     signal_timeout = &inactivity_timeout;
+  } else if {
+    ink_assert(e == active_timeout);
+    signal_event = VC_EVENT_ACTIVE_TIMEOUT;
+    signal_timeout = &active_timeout;
   }
 #else
   if (event == EVENT_IMMEDIATE) {
@@ -1093,11 +1100,7 @@ UnixNetVConnection::mainEvent(int event, Event *e)
     signal_timeout_at = &next_inactivity_timeout_at;
   }
 #endif
-  else {
-    ink_assert(e == active_timeout);
-    signal_event = VC_EVENT_ACTIVE_TIMEOUT;
-    signal_timeout = &active_timeout;
-  }
+
   *signal_timeout = 0;
   *signal_timeout_at = 0;
   writer_cont = write.vio._cont;
@@ -1231,7 +1234,9 @@ UnixNetVConnection::free(EThread *t)
   ink_assert(!write.ready_link.prev && !write.ready_link.next);
   ink_assert(!write.enable_link.next);
   ink_assert(!link.next && !link.prev);
+#ifdef INACTIVITY_TIMEOUT
   ink_assert(!active_timeout);
+#endif
   ink_assert(con.fd == NO_FD);
   ink_assert(t == this_ethread());
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/mgmt/RecordsConfig.cc
----------------------------------------------------------------------
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index 06a8179..dc02ff0 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -462,7 +462,9 @@ static const RecordElement RecordsConfig[] =
   ,
   {RECT_CONFIG, "proxy.config.http.attach_server_session_to_client", RECD_INT, "0", RECU_DYNAMIC,
RR_NULL, RECC_INT, "[0-1]", RECA_NULL}
   ,
-  {RECT_CONFIG, "proxy.config.net.max_connections_in", RECD_INT, "0", RECU_DYNAMIC, RR_NULL,
RECC_STR, "^[0-9]+$", RECA_NULL}
+  {RECT_CONFIG, "proxy.config.net.max_connections_in", RECD_INT, "30000", RECU_DYNAMIC, RR_NULL,
RECC_STR, "^[0-9]+$", RECA_NULL}
+  ,
+  {RECT_CONFIG, "proxy.config.net.max_connections_active_in", RECD_INT, "10000", RECU_DYNAMIC,
RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
   ,
 
   //       ###########################

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/PluginVC.cc
----------------------------------------------------------------------
diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc
index 20e5888..66f2d1b 100644
--- a/proxy/PluginVC.cc
+++ b/proxy/PluginVC.cc
@@ -865,17 +865,24 @@ PluginVC::get_inactivity_timeout()
 }
 
 void
-PluginVC::add_to_keep_alive_lru()
+PluginVC::add_to_keep_alive_queue()
 {
   // do nothing
 }
 
 void
-PluginVC::remove_from_keep_alive_lru()
+PluginVC::remove_from_keep_alive_queue()
 {
   // do nothing
 }
 
+bool
+PluginVC::add_to_active_queue()
+{
+  // do nothing
+  return false;
+}
+
 SOCKET
 PluginVC::get_socket()
 {

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/PluginVC.h
----------------------------------------------------------------------
diff --git a/proxy/PluginVC.h b/proxy/PluginVC.h
index ef2a5d5..3352ac9 100644
--- a/proxy/PluginVC.h
+++ b/proxy/PluginVC.h
@@ -93,8 +93,9 @@ public:
   virtual void set_inactivity_timeout(ink_hrtime timeout_in);
   virtual void cancel_active_timeout();
   virtual void cancel_inactivity_timeout();
-  virtual void add_to_keep_alive_lru();
-  virtual void remove_from_keep_alive_lru();
+  virtual void add_to_keep_alive_queue();
+  virtual void remove_from_keep_alive_queue();
+  virtual bool add_to_active_queue();
   virtual ink_hrtime get_active_timeout();
   virtual ink_hrtime get_inactivity_timeout();
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/http/HttpClientSession.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpClientSession.cc b/proxy/http/HttpClientSession.cc
index 34afa84..fdf4d37 100644
--- a/proxy/http/HttpClientSession.cc
+++ b/proxy/http/HttpClientSession.cc
@@ -121,6 +121,13 @@ HttpClientSession::new_transaction()
   ink_assert(current_reader == NULL);
   PluginIdentity *pi = dynamic_cast<PluginIdentity *>(client_vc);
 
+  if (client_vc->add_to_active_queue() == false) {
+    // no room in the active queue close the connection
+    this->do_io_close();
+    return;
+  }
+
+
   // Defensive programming, make sure nothing persists across
   // connection re-use
   half_close = false;
@@ -131,7 +138,6 @@ HttpClientSession::new_transaction()
   transact_count++;
   DebugHttpSsn("[%" PRId64 "] Starting transaction %d using sm [%" PRId64 "]", con_id, transact_count,
current_reader->sm_id);
 
-  client_vc->remove_from_keep_alive_lru();
   current_reader->attach_client_session(this, sm_reader);
   if (pi) {
     // it's a plugin VC of some sort with identify information.
@@ -510,7 +516,7 @@ HttpClientSession::release(IOBufferReader *r)
     SET_HANDLER(&HttpClientSession::state_keep_alive);
     ka_vio = this->do_io_read(this, INT64_MAX, read_buffer);
     ink_assert(slave_ka_vio != ka_vio);
-    client_vc->add_to_keep_alive_lru();
+    client_vc->add_to_keep_alive_queue();
     client_vc->set_inactivity_timeout(HRTIME_SECONDS(ka_in));
     client_vc->cancel_active_timeout();
   }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/spdy/SpdyCallbacks.cc
----------------------------------------------------------------------
diff --git a/proxy/spdy/SpdyCallbacks.cc b/proxy/spdy/SpdyCallbacks.cc
index e7b529a..45213a3 100644
--- a/proxy/spdy/SpdyCallbacks.cc
+++ b/proxy/spdy/SpdyCallbacks.cc
@@ -316,7 +316,7 @@ spdy_on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type
type, sp
     req->append_nv(frame->syn_stream.nv);
     req->append_nv(no_keep_alive);
     sm->req_map[stream_id] = req;
-    sm->vc->add_to_keep_alive_lru();
+    sm->vc->add_to_active_queue();
     spdy_process_syn_stream_frame(sm, req);
     break;
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/spdy/SpdyClientSession.cc
----------------------------------------------------------------------
diff --git a/proxy/spdy/SpdyClientSession.cc b/proxy/spdy/SpdyClientSession.cc
index 4317d9a..c6f1822 100644
--- a/proxy/spdy/SpdyClientSession.cc
+++ b/proxy/spdy/SpdyClientSession.cc
@@ -113,7 +113,7 @@ SpdyClientSession::init(NetVConnection *netvc)
   start_time = TShrtime();
 
   this->vc->set_inactivity_timeout(HRTIME_SECONDS(spdy_accept_no_activity_timeout));
-  vc->add_to_keep_alive_lru();
+  vc->add_to_keep_alive_queue();
   SET_HANDLER(&SpdyClientSession::state_session_start);
 }
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/spdy/SpdyClientSession.h
----------------------------------------------------------------------
diff --git a/proxy/spdy/SpdyClientSession.h b/proxy/spdy/SpdyClientSession.h
index 8c3750f..349942a 100644
--- a/proxy/spdy/SpdyClientSession.h
+++ b/proxy/spdy/SpdyClientSession.h
@@ -181,7 +181,7 @@ public:
       this->req_map.erase(streamId);
     }
     if (req_map.empty() == true) {
-      vc->add_to_keep_alive_lru();
+      vc->add_to_keep_alive_queue();
     }
   }
 


Mime
View raw message