qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Darryl L. Pierce" <dpie...@redhat.com>
Subject [PATCH 1/6] added timeouts to messenger API; added messenger test suite; tweaked logging to identify connection; added PN_TRACE_DRV log flag
Date Wed, 25 Jul 2012 18:05:59 GMT
From: rhs <rhs@13f79535-47bb-0310-9956-ffa450edef68>

git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/trunk@1356935 13f79535-47bb-0310-9956-ffa450edef68
---
 proton-c/include/proton/engine.h     |   1 +
 proton-c/include/proton/error.h      |   1 +
 proton-c/include/proton/messenger.h  |  18 +++++
 proton-c/src/dispatcher/dispatcher.c |   4 +-
 proton-c/src/driver.c                |  25 ++++---
 proton-c/src/messenger.c             | 129 +++++++++++++++++++++++------------
 tests/proton_tests/__init__.py       |   1 +
 tests/proton_tests/messenger.py      |  88 ++++++++++++++++++++++++
 8 files changed, 212 insertions(+), 55 deletions(-)
 create mode 100644 tests/proton_tests/messenger.py

diff --git a/proton-c/include/proton/engine.h b/proton-c/include/proton/engine.h
index b8873bb..610ab47 100644
--- a/proton-c/include/proton/engine.h
+++ b/proton-c/include/proton/engine.h
@@ -76,6 +76,7 @@ typedef int pn_trace_t;
 #define PN_TRACE_OFF (0)
 #define PN_TRACE_RAW (1)
 #define PN_TRACE_FRM (2)
+#define PN_TRACE_DRV (4)
 
 #define PN_SESSION_WINDOW (1024)
 
diff --git a/proton-c/include/proton/error.h b/proton-c/include/proton/error.h
index 86115c1..ee317ab 100644
--- a/proton-c/include/proton/error.h
+++ b/proton-c/include/proton/error.h
@@ -32,6 +32,7 @@ typedef struct pn_error_t pn_error_t;
 #define PN_UNDERFLOW (-4)
 #define PN_STATE_ERR (-5)
 #define PN_ARG_ERR (-6)
+#define PN_TIMEOUT (-7)
 
 const char *pn_code(int code);
 
diff --git a/proton-c/include/proton/messenger.h b/proton-c/include/proton/messenger.h
index 42d8cc4..c53bc4a 100644
--- a/proton-c/include/proton/messenger.h
+++ b/proton-c/include/proton/messenger.h
@@ -48,6 +48,24 @@ pn_messenger_t *pn_messenger(const char *name);
  */
 const char *pn_messenger_name(pn_messenger_t *messenger);
 
+/** Sets the timeout for a Messenger. A negative timeout means
+ * infinite.
+ *
+ * @param[in] messenger the messenger
+ * @param[timeout] the new timeout for the messenger, in milliseconds
+ *
+ * @return an error code or zero if there is no error
+ */
+int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout);
+
+/** Retrieves the timeout for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ *
+ * @return the timeout for the messenger, in milliseconds
+ */
+int pn_messenger_get_timeout(pn_messenger_t *messenger);
+
 /** Frees a Messenger.
  *
  * @param[in] messenger the messenger to free, no longer valid on
diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c
index a9733ae..671a791 100644
--- a/proton-c/src/dispatcher/dispatcher.c
+++ b/proton-c/src/dispatcher/dispatcher.c
@@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir,
     uint8_t code = scanned ? code64 : 0;
     size_t n = SCRATCH;
     pn_data_format(args, disp->scratch, &n);
-    fprintf(stderr, "[%u] %s %s %s", ch, dir == OUT ? "->" : "<-",
-            disp->names[code], disp->scratch);
+    fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch,
+            dir == OUT ? "->" : "<-", disp->names[code], disp->scratch);
     if (size) {
       size_t capacity = 4*size + 1;
       char buf[capacity];
diff --git a/proton-c/src/driver.c b/proton-c/src/driver.c
index af99fe4..dda169b 100644
--- a/proton-c/src/driver.c
+++ b/proton-c/src/driver.c
@@ -68,11 +68,13 @@ struct pn_listener_t {
 };
 
 #define IO_BUF_SIZE (4*1024)
+#define NAME_MAX (256)
 
 struct pn_connector_t {
   pn_driver_t *driver;
   pn_connector_t *connector_next;
   pn_connector_t *connector_prev;
+  char name[256];
   int idx;
   bool pending_tick;
   bool pending_read;
@@ -165,7 +167,8 @@ pn_listener_t *pn_listener(pn_driver_t *driver, const char *host,
 
   pn_listener_t *l = pn_listener_fd(driver, sock, context);
 
-  printf("Listening on %s:%s\n", host, port);
+  if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+    printf("Listening on %s:%s\n", host, port);
   return l;
 }
 
@@ -226,9 +229,10 @@ pn_connector_t *pn_listener_accept(pn_listener_t *l)
       return NULL;
     } else {
       pn_configure_sock(sock);
-      if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
-        printf("accepted from %s:%s\n", host, serv);
+      if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+        fprintf(stderr, "Accepted from %s:%s\n", host, serv);
       pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
+      snprintf(c->name, NAME_MAX, "%s:%s", host, serv);
       c->listener = l;
       return c;
     }
@@ -303,7 +307,9 @@ pn_connector_t *pn_connector(pn_driver_t *driver, const char *host,
   freeaddrinfo(addr);
 
   pn_connector_t *c = pn_connector_fd(driver, sock, context);
-  printf("Connected to %s:%s\n", host, port);
+  snprintf(c->name, NAME_MAX, "%s:%s", host, port);
+  if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+    fprintf(stderr, "Connected to %s\n", c->name);
   return c;
 }
 
@@ -332,6 +338,7 @@ pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
   c->pending_tick = false;
   c->pending_read = false;
   c->pending_write = false;
+  c->name[0] = '\0';
   c->idx = 0;
   c->fd = fd;
   c->status = PN_SEL_RD | PN_SEL_WR;
@@ -460,7 +467,7 @@ static void pn_connector_process_input(pn_connector_t *ctor)
       if (n == PN_EOS) {
         pn_connector_consume(ctor, ctor->input_size);
       } else {
-        printf("error in process_input: %s\n", pn_code(n));
+        fprintf(stderr, "error in process_input: %s\n", pn_code(n));
       }
       ctor->input_done = true;
       break;
@@ -658,8 +665,9 @@ void pn_connector_process(pn_connector_t *c) {
       c->pending_write = false;
     }
     if (c->output_size == 0 && c->input_done && c->output_done)
{
-      if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
-        fprintf(stderr, "closed\n");
+      if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
+        fprintf(stderr, "Closed %s\n", c->name);
+      }
       pn_connector_close(c);
     }
   }
@@ -686,7 +694,8 @@ pn_driver_t *pn_driver()
   d->ctrl[0] = 0;
   d->ctrl[1] = 0;
   d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
-              (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF));
+              (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
+              (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
 
   // XXX
   if (pipe(d->ctrl)) {
diff --git a/proton-c/src/messenger.c b/proton-c/src/messenger.c
index a12025a..56da348 100644
--- a/proton-c/src/messenger.c
+++ b/proton-c/src/messenger.c
@@ -21,6 +21,7 @@
 
 #include <proton/messenger.h>
 #include <proton/driver.h>
+#include <proton/util.h>
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
@@ -29,6 +30,7 @@
 
 struct pn_messenger_t {
   char *name;
+  int timeout;
   pn_driver_t *driver;
   pn_connector_t *connectors[1024];
   size_t size;
@@ -57,6 +59,7 @@ pn_messenger_t *pn_messenger(const char *name)
 
   if (m) {
     m->name = build_name(name);
+    m->timeout = -1;
     m->driver = pn_driver();
     m->size = 0;
     m->listeners = 0;
@@ -73,6 +76,18 @@ const char *pn_messenger_name(pn_messenger_t *messenger)
   return messenger->name;
 }
 
+int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout)
+{
+  if (!messenger) return PN_ARG_ERR;
+  messenger->timeout = timeout;
+  return 0;
+}
+
+int pn_messenger_get_timeout(pn_messenger_t *messenger)
+{
+  return messenger ? messenger->timeout : 0;
+}
+
 void pn_messenger_free(pn_messenger_t *messenger)
 {
   if (messenger) {
@@ -171,14 +186,28 @@ void pn_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t
*conn)
   }
 }
 
-int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
+long int millis(struct timeval tv)
+{
+  return tv.tv_sec * 1000 + tv.tv_usec/1000;
+}
+
+int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int
timeout)
 {
   for (int i = 0; i < messenger->size; i++) {
     pn_connector_process(messenger->connectors[i]);
   }
 
-  while (!predicate(messenger)) {
-    pn_driver_wait(messenger->driver, -1);
+  struct timeval now;
+  if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+  long int deadline = millis(now) + timeout;
+  bool pred;
+
+  while (true) {
+    pred = predicate(messenger);
+    int remaining = deadline - millis(now);
+    if (pred || (timeout >= 0 && remaining < 0)) break;
+
+    pn_driver_wait(messenger->driver, remaining);
 
     pn_listener_t *l;
     while ((l = pn_driver_listener(messenger->driver))) {
@@ -214,47 +243,30 @@ int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_
         pn_connector_process(c);
       }
     }
+
+    if (timeout >= 0) {
+      if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+    }
   }
 
-  return 0;
+  return pred ? 0 : PN_TIMEOUT;
 }
 
-bool pn_messenger_linked(pn_messenger_t *messenger)
+int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
-    pn_connection_t *conn = pn_connector_connection(ctor);
-    pn_state_t state = pn_connection_state(conn);
-    if ((state == (PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT)) ||
-        (state == (PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE))) {
-      return false;
-    }
-
-    if (pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT) ||
-        pn_link_head(conn, PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE)) {
-      return false;
-    }
-  }
-
-  return true;
+  return pn_messenger_tsync(messenger, predicate, messenger->timeout);
 }
 
 int pn_messenger_start(pn_messenger_t *messenger)
 {
   if (!messenger) return PN_ARG_ERR;
-  return pn_messenger_sync(messenger, pn_messenger_linked);
+  // right now this is a noop
+  return 0;
 }
 
-bool pn_messenger_unlinked(pn_messenger_t *messenger)
+bool pn_messenger_stopped(pn_messenger_t *messenger)
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
-    pn_connection_t *conn = pn_connector_connection(ctor);
-    pn_state_t state = pn_connection_state(conn);
-    if (state != (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED))
-      return false;
-  }
-  return true;
+  return messenger->size == 0;
 }
 
 int pn_messenger_stop(pn_messenger_t *messenger)
@@ -272,7 +284,7 @@ int pn_messenger_stop(pn_messenger_t *messenger)
     pn_connection_close(conn);
   }
 
-  return pn_messenger_sync(messenger, pn_messenger_unlinked);
+  return pn_messenger_sync(messenger, pn_messenger_stopped);
 }
 
 static void parse_address(char *address, char **domain, char **name)
@@ -301,6 +313,18 @@ bool pn_streq(const char *a, const char *b)
 
 pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char *domain)
 {
+  char buf[strlen(domain) + 1];
+  if (domain) {
+    strcpy(buf, domain);
+  } else {
+    buf[0] = '\0';
+  }
+  char *user = NULL;
+  char *pass = NULL;
+  char *host = "0.0.0.0";
+  char *port = "5672";
+  parse_url(buf, &user, &pass, &host, &port);
+
   for (int i = 0; i < messenger->size; i++) {
     pn_connection_t *connection = pn_connector_connection(messenger->connectors[i]);
     const char *container = pn_connection_remote_container(connection);
@@ -309,12 +333,16 @@ pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const
char *doma
       return connection;
   }
 
-  pn_connector_t *connector = pn_connector(messenger->driver, domain, "5672", NULL);
+  pn_connector_t *connector = pn_connector(messenger->driver, host, port, NULL);
   if (!connector) return NULL;
   messenger->connectors[messenger->size++] = connector;
   pn_sasl_t *sasl = pn_connector_sasl(connector);
-  pn_sasl_mechanisms(sasl, "ANONYMOUS");
-  pn_sasl_client(sasl);
+  if (user) {
+    pn_sasl_plain(sasl, user, pass);
+  } else {
+    pn_sasl_mechanisms(sasl, "ANONYMOUS");
+    pn_sasl_client(sasl);
+  }
   pn_connection_t *connection = pn_connection();
   pn_connection_set_container(connection, messenger->name);
   pn_connection_set_hostname(connection, domain);
@@ -378,11 +406,15 @@ pn_listener_t *pn_messenger_isource(pn_messenger_t *messenger, const
char *sourc
 {
   char buf[strlen(source) + 1];
   strcpy(buf, source);
-  char *domain;
-  char *name;
+  char *domain, *name;
   parse_address(buf, &domain, &name);
+  char *user = NULL;
+  char *pass = NULL;
+  char *host = "0.0.0.0";
+  char *port = "5672";
+  parse_url(domain + 1, &user, &pass, &host, &port);
 
-  pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, "5672", NULL);
+  pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL);
   if (listener) {
     messenger->listeners++;
   }
@@ -428,6 +460,8 @@ static void outward_munge(pn_messenger_t *mng, pn_message_t *msg)
   }
 }
 
+bool false_pred(pn_messenger_t *messenger) { return false; }
+
 int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
 {
   if (!messenger) return PN_ARG_ERR;
@@ -459,6 +493,7 @@ int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
         return n;
       } else {
         pn_advance(sender);
+        pn_messenger_tsync(messenger, false_pred, 0);
         return 0;
       }
     }
@@ -469,8 +504,6 @@ int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
 
 bool pn_messenger_sent(pn_messenger_t *messenger)
 {
-  //  if (!pn_messenger_linked(messenger)) return false;
-
   for (int i = 0; i < messenger->size; i++) {
     pn_connector_t *ctor = messenger->connectors[i];
     pn_connection_t *conn = pn_connector_connection(ctor);
@@ -497,8 +530,6 @@ bool pn_messenger_sent(pn_messenger_t *messenger)
 
 bool pn_messenger_rcvd(pn_messenger_t *messenger)
 {
-  //  if (!pn_messenger_linked(messenger)) return false;
-
   for (int i = 0; i < messenger->size; i++) {
     pn_connector_t *ctor = messenger->connectors[i];
     pn_connection_t *conn = pn_connector_connection(ctor);
@@ -532,6 +563,8 @@ int pn_messenger_recv(pn_messenger_t *messenger, int n)
 
 int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
 {
+  if (!messenger) return PN_ARG_ERR;
+
   for (int i = 0; i < messenger->size; i++) {
     pn_connector_t *ctor = messenger->connectors[i];
     pn_connection_t *conn = pn_connector_connection(ctor);
@@ -545,10 +578,14 @@ int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
         ssize_t n = pn_recv(l, buf, 1024);
         pn_settle(d);
         if (n < 0) return n;
-        int err = pn_message_decode(msg, buf, n);
-        if (err) {
-          return pn_error_format(messenger->error, err, "error decoding message: %s",
+        if (msg) {
+          int err = pn_message_decode(msg, buf, n);
+          if (err) {
+            return pn_error_format(messenger->error, err, "error decoding message: %s",
                                  pn_message_error(msg));
+          } else {
+            return 0;
+          }
         } else {
           return 0;
         }
@@ -564,6 +601,8 @@ int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
 
 int pn_messenger_queued(pn_messenger_t *messenger, bool sender)
 {
+  if (!messenger) return 0;
+
   int result = 0;
 
   for (int i = 0; i < messenger->size; i++) {
diff --git a/tests/proton_tests/__init__.py b/tests/proton_tests/__init__.py
index a8a4d52..b467cf5 100644
--- a/tests/proton_tests/__init__.py
+++ b/tests/proton_tests/__init__.py
@@ -19,3 +19,4 @@
 
 import proton_tests.engine
 import proton_tests.message
+import proton_tests.messenger
diff --git a/tests/proton_tests/messenger.py b/tests/proton_tests/messenger.py
new file mode 100644
index 0000000..17161cd
--- /dev/null
+++ b/tests/proton_tests/messenger.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, common, xproton
+from xproton import *
+from threading import Thread
+
+class Test(common.Test):
+
+  def setup(self):
+    self.server = pn_messenger("server")
+    pn_messenger_set_timeout(self.server, 10000)
+    pn_messenger_start(self.server)
+    pn_messenger_subscribe(self.server, "//~0.0.0.0:12345")
+    self.thread = Thread(target=self.run)
+    self.running = True
+    self.thread.start()
+
+    self.client = pn_messenger("client")
+    pn_messenger_set_timeout(self.client, 10000)
+    pn_messenger_start(self.client)
+
+  def teardown(self):
+    self.running = False
+    msg = pn_message()
+    pn_message_set_address(msg, "//0.0.0.0:12345")
+    pn_messenger_put(self.client, msg)
+    pn_messenger_send(self.client)
+    pn_messenger_stop(self.client)
+    self.thread.join()
+    pn_messenger_free(self.client)
+    pn_messenger_free(self.server)
+    self.client = None
+    self.server = None
+
+class MessengerTest(Test):
+
+  def run(self):
+    msg = pn_message()
+    while self.running:
+      pn_messenger_recv(self.server, 10)
+      while pn_messenger_incoming(self.server):
+        if pn_messenger_get(self.server, msg):
+          print pn_messenger_error(self.server)
+        else:
+          reply_to = pn_message_get_reply_to(msg)
+          if reply_to:
+            pn_message_set_address(msg, reply_to)
+            pn_messenger_put(self.server, msg)
+    pn_messenger_stop(self.server)
+
+  def testSendReceive(self):
+    msg = pn_message()
+    pn_message_set_address(msg, "//0.0.0.0:12345")
+    pn_message_set_subject(msg, "Hello World!")
+    body = "First the world, then the galaxy!"
+    pn_message_load(msg, body)
+    pn_messenger_put(self.client, msg)
+    pn_messenger_send(self.client)
+
+    reply = pn_message()
+    assert not pn_messenger_recv(self.client, 1)
+    assert pn_messenger_incoming(self.client) == 1
+    assert not pn_messenger_get(self.client, reply)
+
+    assert pn_message_get_subject(reply) == "Hello World!"
+    cd, rbod = pn_message_save(reply, 1024)
+    assert not cd
+    assert rbod == body
+
+    pn_message_free(msg)
+    pn_message_free(reply)
-- 
1.7.11.2


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message