qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DISPATCH-767) Message Cut-Through/Streaming for efficient handling of large messages
Date Fri, 07 Jul 2017 15:42:00 GMT

    [ https://issues.apache.org/jira/browse/DISPATCH-767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078265#comment-16078265
] 

ASF GitHub Bot commented on DISPATCH-767:
-----------------------------------------

Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126170901
  
    --- Diff: src/message.c ---
    @@ -1151,89 +1287,140 @@ void qd_message_send(qd_message_t *in_msg,
     {
         qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
         qd_message_content_t *content = msg->content;
    -    qd_buffer_t          *buf     = DEQ_HEAD(content->buffers);
    -    unsigned char        *cursor;
    +    qd_buffer_t          *buf     = 0;
         pn_link_t            *pnl     = qd_link_pn(link);
     
    -    qd_buffer_list_t new_ma;
    -    DEQ_INIT(new_ma);
    +    // How many receivers does this message have?
    +    int                  fanout   = qd_message_fanout(in_msg);
     
    -    // Process  the message annotations if any
    -    compose_message_annotations(msg, &new_ma, strip_annotations);
    +    if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
    +        //
    +        // Start with the very first buffer;
    +        //
    +        buf = DEQ_HEAD(content->buffers);
     
    -    //
    -    // This is the case where the message annotations have been modified.
    -    // The message send must be divided into sections:  The existing header;
    -    // the new message annotations; the rest of the existing message.
    -    // Note that the original message annotations that are still in the
    -    // buffer chain must not be sent.
    -    //
    -    // Start by making sure that we've parsed the message sections through
    -    // the message annotations
    -    //
    -    // ??? NO LONGER NECESSARY???
    -    if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
    -        qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
    -        return;
    -    }
    +        if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
    +            qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
    +            return;
    +        }
     
    -    //
    -    // Send header if present
    -    //
    -    cursor = qd_buffer_base(buf);
    -    if (content->section_message_header.length > 0) {
    -        buf    = content->section_message_header.buffer;
    -        cursor = content->section_message_header.offset + qd_buffer_base(buf);
    -        advance(&cursor, &buf,
    -                content->section_message_header.length + content->section_message_header.hdr_length,
    -                send_handler, (void*) pnl);
    -    }
    +        //
    +        // Send header if present
    +        //
    +        unsigned char *cursor = qd_buffer_base(buf);
    +        int header_consume = content->section_message_header.length + content->section_message_header.hdr_length;
    +        if (content->section_message_header.length > 0) {
    +            buf    = content->section_message_header.buffer;
    +            cursor = content->section_message_header.offset + qd_buffer_base(buf);
    +            advance(&cursor, &buf, header_consume, send_handler, (void*) pnl);
    +        }
     
    -    //
    -    // Send delivery annotation if present
    -    //
    -    if (content->section_delivery_annotation.length > 0) {
    -        buf    = content->section_delivery_annotation.buffer;
    -        cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf);
    -        advance(&cursor, &buf,
    -                content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length,
    -                send_handler, (void*) pnl);
    -    }
    +        //
    +        // Send delivery annotation if present
    +        //
    +        int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length;
    +        if (content->section_delivery_annotation.length > 0) {
    +            buf    = content->section_delivery_annotation.buffer;
    +            cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf);
    +            advance(&cursor, &buf, da_consume, send_handler, (void*) pnl);
    +        }
     
    -    //
    -    // Send new message annotations
    -    //
    -    qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
    -    while (da_buf) {
    -        char *to_send = (char*) qd_buffer_base(da_buf);
    -        pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
    -        da_buf = DEQ_NEXT(da_buf);
    -    }
    -    qd_buffer_list_free_buffers(&new_ma);
    +        qd_buffer_list_t new_ma;
    +        DEQ_INIT(new_ma);
     
    -    //
    -    // Skip over replaced message annotations
    -    //
    -    if (content->section_message_annotation.length > 0)
    -        advance(&cursor, &buf,
    -                content->section_message_annotation.hdr_length + content->section_message_annotation.length,
    -                0, 0);
    +        // Process  the message annotations if any
    +        compose_message_annotations(msg, &new_ma, strip_annotations);
    +
    +        //
    +        // Send new message annotations
    +        //
    +        qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
    +        while (da_buf) {
    +            char *to_send = (char*) qd_buffer_base(da_buf);
    +            pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
    +            da_buf = DEQ_NEXT(da_buf);
    +        }
    +        qd_buffer_list_free_buffers(&new_ma);
    +
    +        //
    +        // Skip over replaced message annotations
    +        //
    +        int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length;
    +        if (content->section_message_annotation.length > 0)
    +            advance(&cursor, &buf, ma_consume, 0, 0);
    +
    +        msg->cursor.buffer = buf;
    +
    +        //
    +        // If this message has no header and no delivery annotations and no message annotations,
set the offset to 0.
    +        //
    +        if (header_consume == 0 && da_consume == 0 && ma_consume ==0)
    +            msg->cursor.offset = 0;
    +        else
    +            msg->cursor.offset = cursor - qd_buffer_base(buf);
    +
    +        msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS;
     
    -    //
    -    // Send remaining partial buffer
    -    //
    -    if (buf) {
    -        size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
    -        advance(&cursor, &buf, len, send_handler, (void*) pnl);
         }
     
    -    // Fall through to process the remaining buffers normally
    -    // Note that 'advance' will have moved us to the next buffer in the chain.
    +    buf = msg->cursor.buffer;
     
    +    if (!buf)
    +        return;
    +
    +    bool receive_complete = qd_message_receive_complete(in_msg);
     
         while (buf) {
    -        pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf));
    -        buf = DEQ_NEXT(buf);
    +        size_t buf_size = qd_buffer_size(buf);
    +
    +        // This will send the remaining data in the buffer if any.
    +        pn_link_send(pnl, (char*) qd_buffer_at(buf, msg->cursor.offset), buf_size
- msg->cursor.offset);
    +
    +        // If the entire message has been received,  there is no need to lock before
sending because no one else is
    +        // trying to modify the data structure.
    +        if (!receive_complete)
    +            sys_mutex_lock(msg->content->lock);
    --- End diff --
    
    I wouldn't bother with the conditional lock  - if there's no contention the cost of the
lock is small, and the condition introduces one more way for a future programmer to screw
up the thread safety logic by mistake.


> Message Cut-Through/Streaming for efficient handling of large messages
> ----------------------------------------------------------------------
>
>                 Key: DISPATCH-767
>                 URL: https://issues.apache.org/jira/browse/DISPATCH-767
>             Project: Qpid Dispatch
>          Issue Type: Improvement
>          Components: Router Node
>            Reporter: Ted Ross
>            Assignee: Ganesh Murthy
>             Fix For: 1.0.0
>
>
> When large, multi-frame messages are sent through the router, there is no need to wait
for the entire message to arrive before starting to send it onward.
> This feature causes the router to route the first frame and allow subsequent frames in
a delivery to be streamed out in pipeline fashion.  Ideally, the memory usage in the router
should only involve pending frames.  This would allow the router to handle arbitrary numbers
of concurrent arbitrarily large messages.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message