qpid-proton mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Ross <tr...@redhat.com>
Subject Re: problem with multiple senders
Date Thu, 04 Apr 2013 18:13:50 GMT
Any clues from a trace of the receiver?

$ PN_TRACE_FRM=1 ./receiver 6666

-Ted

On 04/04/2013 02:09 PM, Michael Goulish wrote:
>
>    Is this a bug, or am I  Doing  Something  Wrong ?
>
>
>
> Scenario
> {
>    My sender sends a single message, and hopes to see
>    that the receiver has accepted it.
>
>    I launch 3 copies of the sender very close together--
>    they all talk to the same address.
>
>    My receiver receives in a loop, accepts every message
>    that it receives.
> }
>
>
>
>
> Result
> {
>    Sometimes my receiver gets 1 of the 3 messages.
>    Usually it gets 2.
>    It never gets all 3.
>
>    The 3rd sender hangs in pn_messenger_send().
>
>    While the 3rd sender is hanging in send(), the receiver
>    is patiently waiting in recv().
> }
>
>
>
>
>
>
> Sender Code ############################################
>
> /*
>    Launch 3 of these from a script like so:
>    ./sender 6666 &
>    ./sender 6666 &
>    ./sender 6666 &
> */
>
>
> #include "proton/message.h"
> #include "proton/messenger.h"
>
> #include <getopt.h>
> #include <stdio.h>
> #include <stdlib.h>
> #include <string.h>
> #include <ctype.h>
>
>
> char *
> status_2_str ( pn_status_t status )
> {
>    switch ( status )
>    {
>      case PN_STATUS_UNKNOWN:
>        return "unknown";
>        break;
>
>      case PN_STATUS_PENDING:
>        return "pending";
>        break;
>
>      case PN_STATUS_ACCEPTED:
>        return "accepted";
>        break;
>
>      case PN_STATUS_REJECTED:
>        return "rejected";
>        break;
>
>      default:
>        return "bad value";
>        break;
>    }
> }
>
>
>
> pid_t my_pid = 0;
>
>
> void
> check ( char * label, int result )
> {
>    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> }
>
>
>
> int
> main(int argc, char** argv)
> {
>    int c;
>    char addr [ 1000 ];
>    char msgtext [ 100 ];
>    pn_message_t   * message;
>    pn_messenger_t * messenger;
>    pn_data_t      * body;
>    pn_tracker_t     tracker;
>    pn_status_t      status;
>    int              result;
>
>    my_pid = getpid();
>
>    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
>
>
>    message = pn_message ( );
>    messenger = pn_messenger ( NULL );
>    pn_messenger_start ( messenger ) ;
>    pn_messenger_set_outgoing_window ( messenger, 1 );
>
>
>    pn_message_set_address ( message, addr );
>    body = pn_message_body ( message );
>
>
>    sprintf ( msgtext, "Message from %d", getpid() );
>    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
>    pn_messenger_put ( messenger, message );
>    tracker = pn_messenger_outgoing_tracker ( messenger );
>    pn_messenger_send ( messenger );
>
>
>    status = pn_messenger_status ( messenger, tracker );
>    fprintf ( stderr, "status : %s\n", status_2_str(status) );
>
>
>    pn_messenger_stop ( messenger );
>    pn_messenger_free ( messenger );
>    pn_message_free ( message );
>
>    return 0;
> }
>
>
>
>
> Receiver Code ########################################################
>
> /*
>
>    Launch like this:
>    ./receiver 6666
> */
>
> #include <stdio.h>
> #include <stdlib.h>
> #include <ctype.h>
>
> #include "proton/message.h"
> #include "proton/messenger.h"
>
>
>
> #define BUFSIZE 1024
>
>
>
> int
> main(int argc, char** argv)
> {
>    size_t bufsize = BUFSIZE;
>    char buffer [ BUFSIZE ];
>    char addr [ 1000 ];
>    pn_message_t   * message;
>    pn_messenger_t * messenger;
>    pn_data_t      * body;
>    pn_tracker_t     tracker;
>
>
>    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
>
>    message = pn_message();
>    messenger = pn_messenger ( NULL );
>
>    pn_messenger_start(messenger);
>    pn_messenger_subscribe ( messenger, addr );
>    pn_messenger_set_incoming_window ( messenger, 5 );
>
>    /*---------------------------------
>      Receive and accept the message.
>    ---------------------------------*/
>    while ( 1 )
>    {
>      fprintf ( stderr, "receiving...\n" );
>      pn_messenger_recv ( messenger, 3 );
>
>      while ( pn_messenger_incoming ( messenger ) > 0 )
>      {
>        fprintf ( stderr, "getting message...\n" );
>        pn_messenger_get ( messenger, message );
>        tracker = pn_messenger_incoming_tracker ( messenger );
>        pn_messenger_accept ( messenger, tracker, 0 );
>        body = pn_message_body ( message );
>        pn_data_format ( body, buffer, & bufsize );
>        fprintf ( stdout, "Address: %s\n", pn_message_get_address ( message ) );
>        fprintf ( stdout, "Content: %s\n", buffer);
>      }
>    }
>
>    pn_messenger_stop(messenger);
>    pn_messenger_free(messenger);
>
>    return 0;
> }
>
>


Mime
View raw message