qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DISPATCH-825) Corrupted data on larger (>100Kb) messages
Date Wed, 13 Sep 2017 19:05:02 GMT

    [ https://issues.apache.org/jira/browse/DISPATCH-825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16165118#comment-16165118
] 

ASF GitHub Bot commented on DISPATCH-825:
-----------------------------------------

Github user ChugR commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/194#discussion_r138710888
  
    --- Diff: src/message.c ---
    @@ -1109,95 +1146,97 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         }
     
         //
    -    // The discard flag indicates if we should continue receiving the message.
    -    // This is pertinent in the case of large messages. When large messages are being
received, we try to send out part of the
    -    // message that has been received so far. If we not able to send it anywhere, there
is no need to keep creating buffers
    -    //
    -    bool discard = qd_message_is_discard((qd_message_t*)msg);
    -
    -    //
    -    // Get a reference to the tail buffer on the message.  This is the buffer into which
    -    // we will store incoming message data.  If there is no buffer in the message, this
is the
    -    // first time we are here and we need to allocate an empty one and add it to the
message.
    +    // The discard flag indicates we should keep reading the input stream
    +    // but not process the message for delivery.
         //
    -    if (!discard) {
    -        buf = DEQ_TAIL(msg->content->buffers);
    -        if (!buf) {
    -            buf = qd_buffer();
    -            DEQ_INSERT_TAIL(msg->content->buffers, buf);
    -        }
    +    if (qd_message_is_discard((qd_message_t*)msg)) {
    +        return discard_receive(delivery, link, (qd_message_t *)msg);
         }
     
    +
    +    // Loop until msg is complete, error seen, or incoming bytes are consumed
    +    bool recv_error = false;
         while (1) {
    -        if (discard) {
    -            char dummy[BUFFER_SIZE];
    -            rc = pn_link_recv(link, dummy, BUFFER_SIZE);
    -        }
    -        else {
    -            //
    -            // Try to receive enough data to fill the remaining space in the tail buffer.
    -            //
    +        //
    +        // handle EOS and clean up after pn receive errors
    +        //
    +        bool at_eos = (pn_delivery_partial(delivery) == false) &&
    +                      (pn_delivery_pending(delivery) == 0);
    +
    +        if (at_eos || recv_error) {
    +            // Message is complete
    +            sys_mutex_lock(msg->content->lock);
    +            {
    +                // Append last buffer if any with data
    +                if (msg->content->pending) {
    +                    if (qd_buffer_size(msg->content->pending) > 0) {
    +                        // pending buffer has bytes that are port of message
    +                        DEQ_INSERT_TAIL(msg->content->buffers,
    +                                        msg->content->pending);
    +                    } else {
    +                        // pending buffer is empty
    +                        qd_buffer_free(msg->content->pending);
    +                    }
    +                    msg->content->pending = 0;
    +                } else {
    +                    // pending buffer is absent
    +                }
    +
    +                msg->content->receive_complete = true;
     
    -            rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
    +                // unlink message and delivery
    +                pn_record_set(record, PN_DELIVERY_CTX, 0);
    +            }
    +            sys_mutex_unlock(msg->content->lock);
    +            return (qd_message_t*) msg;
             }
     
             //
    -        // If we receive PN_EOS, we have come to the end of the message.
    +        // Handle a missing or full pending buffer
             //
    -        if (rc == PN_EOS) {
    -            //
    -            // We have received the entire message since rc == PN_EOS, set the receive_complete
flag to true
    -            //
    -            msg->content->receive_complete = true;
    -
    -            //
    -            // Clear the value in the record with key PN_DELIVERY_CTX
    -            //
    -            pn_record_set(record, PN_DELIVERY_CTX, 0);
    -
    -            //
    -            // If the last buffer in the list is empty, remove it and free it.  This
    -            // will only happen if the size of the message content is an exact multiple
    -            // of the buffer size.
    -            //
    -            if (buf && qd_buffer_size(buf) == 0) {
    +        if (!msg->content->pending) {
    +            // Pending buffer is absent: get a new one
    +            msg->content->pending = qd_buffer();
    +        } else {
    +            // Pending buffer exists
    +            if (qd_buffer_capacity(msg->content->pending) == 0) {
    +                // Pending buffer is full
                     sys_mutex_lock(msg->content->lock);
    -                DEQ_REMOVE_TAIL(msg->content->buffers);
    +                DEQ_INSERT_TAIL(msg->content->buffers, msg->content->pending);
                     sys_mutex_unlock(msg->content->lock);
    -                qd_buffer_free(buf);
    +                msg->content->pending = qd_buffer();
    +            } else {
    +                // Pending buffer still has capacity
                 }
    -
    -            return (qd_message_t*) msg;
             }
     
    -        if (rc > 0) {
    -            if (discard)
    -                continue;
    +        //
    +        // Try to fill the remaining space in the pending buffer.
    +        //
    +        rc = pn_link_recv(link,
    +                          (char*) qd_buffer_cursor(msg->content->pending),
    +                          qd_buffer_capacity(msg->content->pending));
    +
    +        assert (rc != PN_EOS); // Just checked for this moments ago
    --- End diff --
    
    This assert is a mistake. Between sensing it a moment ago and now it may have become true.


> Corrupted data on larger (>100Kb) messages
> ------------------------------------------
>
>                 Key: DISPATCH-825
>                 URL: https://issues.apache.org/jira/browse/DISPATCH-825
>             Project: Qpid Dispatch
>          Issue Type: Bug
>         Environment: Fedora 25. Master branch qpid-dispatch and qpid-cpp tools: qpid-send,
qpid-receive
>            Reporter: Chuck Rolke
>
> h1. Setup
> h3. Start a dispatch router with this conf file:
> {noformat}
> # Router to run qpid-interop-test
> router {
>     mode: interior
>     id: Router.A
>     workerThreads: 4
>     allowUnsettledMulticast: yes
> }
> listener {
>     host: 0.0.0.0
>     port: 5672
>     authenticatePeer: no
>     saslMechanisms: ANONYMOUS
> }
> listener {
>     host: localhost
>     port: 5672
>     authenticatePeer: no
>     saslMechanisms: ANONYMOUS
> }
> address {
>     prefix: jms.queue.qpid-interop.#
>     distribution: balanced
> }
> log {
>     module: DEFAULT
>     enable: debug+
> }
> ({noformat}
> h3. Start a receiver to receive 1000 messages:
> {noformat}
> qpid-receive -a jms.queue.qpid-interop.test --connection-options "{protocol:amqp1.0}"
-m 1000 -f --print-content no --print-headers yes --ack-frequency 1
> {noformat}
> h3. Start 1000 senders each with a different length message
> {noformat}
> #!/bin/bash
> for i in `seq 100512 101511`;
>         do
>             qpid-send -a jms.queue.qpid-interop.test --connection-options "{protocol:amqp1.0}"
-m 1 --content-size $i
>         done    
> {noformat}
> h1. Result
> Eventually the receive program will exit with an error:
> {noformat}
> qpid-receive: Out of Bounds: 
>   requested advance of 100552 at 42 but only 100444 available 
>   (/home/chug/git/qpid-cpp/src/qpid/amqp/Decoder.cpp:307)
> {noformat}
> h2. Observations
> * Putting qd_log statements in the qd_message_send path, one at each pn_link_send() invocation,
allows the setup to run the 1000 messages repeatedly. Probably it would fail eventually but
in this condition it is harder to debug.
> * I suspect a interlock issue between sending and receiving a single message but adding
a dozen or so assert has not revealed anything yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message