trafficserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chen...@apache.org
Subject git commit: TS-2201: split drainIncomingChannel two thread, one handle Broadcast message and other handle Reliable(TCP) request.
Date Tue, 17 Sep 2013 03:51:47 GMT
Updated Branches:
  refs/heads/master f47c6be30 -> 3a903f2bc


TS-2201: split drainIncomingChannel two thread, one handle Broadcast message and other handle
Reliable(TCP) request.


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/3a903f2b
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/3a903f2b
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/3a903f2b

Branch: refs/heads/master
Commit: 3a903f2bc69c0fb5623fa212f2f1ef64b5455c8b
Parents: f47c6be
Author: Chen Bin <kuotai@taobao.com>
Authored: Tue Sep 17 11:51:11 2013 +0800
Committer: Chen Bin <kuotai@taobao.com>
Committed: Tue Sep 17 11:51:11 2013 +0800

----------------------------------------------------------------------
 CHANGES                    |   3 ++
 mgmt/cluster/ClusterCom.cc | 109 +++++++++++++++++++++++++++-------------
 2 files changed, 77 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e9c056d..f2e9fd4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache Traffic Server 4.1.0
 
+  *) [TS-2201] split drainIncomingChannel two thread, one handle Broadcast message and other
handle Reliable(TCP)
+   request for supporing large cluster.
+
   *) [TS-2144] Avoid race on e.g. "traffic_server -Cclear" which would crash
    the process intermittently.
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/mgmt/cluster/ClusterCom.cc
----------------------------------------------------------------------
diff --git a/mgmt/cluster/ClusterCom.cc b/mgmt/cluster/ClusterCom.cc
index 430e01b..fa173cc 100644
--- a/mgmt/cluster/ClusterCom.cc
+++ b/mgmt/cluster/ClusterCom.cc
@@ -48,6 +48,76 @@
 int MultiCastMessages = 0;
 long LastHighestDelta = -1L;
 
+
+void *
+drainIncomingChannel_broadcast(void *arg)
+{
+  char message[61440];
+  fd_set fdlist;
+  void *ret = arg;
+
+  time_t t;
+  time_t last_multicast_receive_time = time(NULL);
+  struct timeval tv;
+
+  /* Avert race condition, thread spun during constructor */
+  while (!lmgmt->ccom || !lmgmt->ccom->init) {
+    mgmt_sleep_sec(1);
+  }
+
+  lmgmt->syslogThrInit();
+
+  for (;;) {                    /* Loop draining mgmt network channels */
+    // linux: set tv.tv_set in select() loop, since linux's select()
+    // will update tv with the amount of time not slept (most other
+    // implementations do not do this)
+    tv.tv_sec = lmgmt->ccom->mc_poll_timeout;             // interface not-responding
timeout
+    tv.tv_usec = 0;
+
+    memset(message, 0, 61440);
+    FD_ZERO(&fdlist);
+
+    if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
+      if (lmgmt->ccom->receive_fd > 0) {
+        FD_SET(lmgmt->ccom->receive_fd, &fdlist);       /* Multicast fd */
+      }
+    }
+
+    mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv);
+
+    if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
+      // Multicast timeout considerations
+      if ((lmgmt->ccom->receive_fd < 0) || !FD_ISSET(lmgmt->ccom->receive_fd,
&fdlist)) {
+        t = time(NULL);
+        if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) {
+          // Timeout on multicast receive channel, reset channel.
+          if (lmgmt->ccom->receive_fd > 0) {
+            close(lmgmt->ccom->receive_fd);
+          }
+          lmgmt->ccom->receive_fd = -1;
+          Debug("ccom", "Timeout, resetting multicast receive channel");
+          if (lmgmt->ccom->establishReceiveChannel(0)) {
+            Debug("ccom", "establishReceiveChannel failed");
+            lmgmt->ccom->receive_fd = -1;
+          }
+          last_multicast_receive_time = t;      // next action at next interval
+        }
+      } else {
+        last_multicast_receive_time = time(NULL);       // valid multicast msg
+      }
+    }
+
+    /* Broadcast message */
+    if (lmgmt->ccom->cluster_type != NO_CLUSTER &&
+        lmgmt->ccom->receive_fd > 0 &&
+        FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) &&
+        (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) {
+      lmgmt->ccom->handleMultiCastMessage(message);
+    }
+  }
+  return ret;
+}                               /* End drainIncomingChannel */
+
 /*
  * drainIncomingChannel
  *   This function is blocking, it never returns. It is meant to allow for
@@ -89,8 +159,6 @@ drainIncomingChannel(void *arg)
   // to reopen the channel (e.g. opening the socket would fail if the
   // interface was down).  In this case, the ccom->receive_fd is set
   // to '-1' and the open is retried until it succeeds.
-  time_t t;
-  time_t last_multicast_receive_time = time(NULL);
   struct timeval tv;
 
   /* Avert race condition, thread spun during constructor */
@@ -111,43 +179,12 @@ drainIncomingChannel(void *arg)
     FD_ZERO(&fdlist);
 
     if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
-      if (lmgmt->ccom->receive_fd > 0) {
-        FD_SET(lmgmt->ccom->receive_fd, &fdlist);       /* Multicast fd */
-      }
       FD_SET(lmgmt->ccom->reliable_server_fd, &fdlist);   /* TCP Server fd */
     }
 
     mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv);
 
-    if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
-      // Multicast timeout considerations
-      if ((lmgmt->ccom->receive_fd < 0) || !FD_ISSET(lmgmt->ccom->receive_fd,
&fdlist)) {
-        t = time(NULL);
-        if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) {
-          // Timeout on multicast receive channel, reset channel.
-          if (lmgmt->ccom->receive_fd > 0) {
-            close(lmgmt->ccom->receive_fd);
-          }
-          lmgmt->ccom->receive_fd = -1;
-          Debug("ccom", "Timeout, resetting multicast receive channel");
-          if (lmgmt->ccom->establishReceiveChannel(0)) {
-            Debug("ccom", "establishReceiveChannel failed");
-            lmgmt->ccom->receive_fd = -1;
-          }
-          last_multicast_receive_time = t;      // next action at next interval
-        }
-      } else {
-        last_multicast_receive_time = time(NULL);       // valid multicast msg
-      }
-    }
-
-    /* Broadcast message */
-    if (lmgmt->ccom->cluster_type != NO_CLUSTER &&
-        lmgmt->ccom->receive_fd > 0 &&
-        FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) &&
-        (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) {
-      lmgmt->ccom->handleMultiCastMessage(message);
-    } else if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) {
+    if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) {
       /* Reliable(TCP) request */
       int clilen = sizeof(cli_addr);
       int req_fd = mgmt_accept(lmgmt->ccom->reliable_server_fd, (struct sockaddr *)
&cli_addr, &clilen);
@@ -442,8 +479,10 @@ ClusterCom::ClusterCom(unsigned long oip, char *host, int mcport, char
*group, i
   peers = ink_hash_table_create(InkHashTableKeyType_String);
   mismatchLog = ink_hash_table_create(InkHashTableKeyType_String);
 
-  if (cluster_type != NO_CLUSTER)
+  if (cluster_type != NO_CLUSTER) {
+    ink_thread_create(drainIncomingChannel_broadcast, 0);   /* Spin drainer thread */
     ink_thread_create(drainIncomingChannel, 0);   /* Spin drainer thread */
+  }
   return;
 }                               /* End ClusterCom::ClusterCom */
 


Mime
View raw message