qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carl Trieloff <cctriel...@redhat.com>
Subject Re: Remove liner queue functions
Date Wed, 25 Nov 2009 13:49:56 GMT

Chenta,

Two things to note, Gordon has now put a patch in that corrects the 
order of the messages on requeue, i.e. they now stay in order by 
sequence number even after rollback. Thus with his patch and my seek() 
seekAt() we should now be able to resolve the acquire case quite easily 
and update the position.

Take a look and let me know if you need some help with that.

(i.e. before these two patches it would not have been possible, but now 
I believe it is)

Carl.


chenta lee wrote:
> I did the consumer sequence number wrap-around is because that when we 
> requeue the message, I cannot know which consumer consume it. 
> Therefore, I cannot not update the sequence number of consumer and the 
> messages rollback. The consequence is that when a consumer rollback 
> (requeue) messages, they can not acquire them anymore (because 
> requeue_msg.position is always larger than consumer.position ).
>
> However, my patch is not that dirty :), I didn't change the original 
> algorithm. We do not update the consumer sequence in 
> consumeNextMessage at the very beginning. From my point of view, the 
> only concern is that when a user decide to use selector in their 
> messages, they might suffer from performance issue, however, the other 
> users who do not use selector will be just fine.
>
> Chenta
>
> On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff <cctrieloff@redhat.com 
> <mailto:cctrieloff@redhat.com>> wrote:
>
>
>     Thanks, the one remaining issue I know of with the selector patch
>     is that consumer
>     sequence number wrap-around I don't think works.
>
>     We need a test there and maybe change the comp operators in your
>     patch. I was looking into
>     that last week on the selector patch, I'm itching to get the patch in.
>
>     Carl.
>
>
>
>     chenta lee wrote:
>>     Hi Carl,
>>     This patch looks great, I will update the selector patch later.
>>
>>     Chenta
>>
>>     On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff
>>     <cctrieloff@redhat.com <mailto:cctrieloff@redhat.com>> wrote:
>>
>>
>>         I created a patch which seems to work well, it targets
>>         querying the queue, count, acquire making the
>>         queue access faster for large queues (best 1 {if no requeue
>>         or acquire}, worst case binary-search) . In
>>         most cases if it faster then binary search even if requeue or
>>         selector is used.
>>
>>         It does require that the re-queue order be corrected - which
>>         should be done regardless.
>>
>>         The remaining function that could use some similar dressing
>>         would be Queue::seek()
>>
>>         Any thoughts on the patch... This patch opens the way for
>>         reasonable selector performance.
>>         Carl.
>>
>>
>>         Index: qpid/broker/Queue.cpp
>>         ===================================================================
>>         --- qpid/broker/Queue.cpp       (revision 833135)
>>         +++ qpid/broker/Queue.cpp       (working copy)
>>         @@ -243,18 +243,18 @@
>>          {
>>             Mutex::ScopedLock locker(messageLock);
>>             QPID_LOG(debug, "Attempting to acquire message at " <<
>>         position);
>>         -    for (Messages::iterator i = messages.begin(); i !=
>>         messages.end(); i++) {
>>         -        if (i->position == position) {
>>         -            message = *i;
>>         -            if (lastValueQueue) {
>>         -                clearLVQIndex(*i);
>>         -            }
>>         -            QPID_LOG(debug,
>>         -                     "Acquired message at " << i->position
>>         << " from " << name);
>>         -            messages.erase(i);
>>         -            return true;
>>         +
>>         +    Messages::iterator i = findAt(position);
>>         +    if (i != messages.end() ) {
>>         +        message = *i;
>>         +        if (lastValueQueue) {
>>         +            clearLVQIndex(*i);
>>                 }
>>         -    }
>>         +        QPID_LOG(debug,
>>         +                 "Acquired message at " << i->position <<
"
>>         from " << name);
>>         +        messages.erase(i);
>>         +        return true;
>>         +    }
>>             QPID_LOG(debug, "Could not acquire message at " <<
>>         position << " from " << name << "; no message at that position");
>>             return false;
>>          }
>>         @@ -262,21 +262,21 @@
>>          bool Queue::acquire(const QueuedMessage& msg) {
>>             Mutex::ScopedLock locker(messageLock);
>>             QPID_LOG(debug, "attempting to acquire " << msg.position);
>>         -    for (Messages::iterator i = messages.begin(); i !=
>>         messages.end(); i++) {
>>         -        if ((i->position == msg.position && !lastValueQueue)
>>         // note that in some cases payload not be set
>>         -            || (lastValueQueue && (i->position ==
>>         msg.position) &&
>>         -                msg.payload.get() ==
>>         checkLvqReplace(*i).payload.get()) )  {
>>         +    Messages::iterator i = findAt(msg.position);
>>         +    if ((i != messages.end() && !lastValueQueue) // note
>>         that in some cases payload not be set
>>         +        || (lastValueQueue && (i->position == msg.position) &&
>>         +            msg.payload.get() ==
>>         checkLvqReplace(*i).payload.get()) )  {
>>
>>         -            clearLVQIndex(msg);
>>         -            QPID_LOG(debug,
>>         -                     "Match found, acquire succeeded: " <<
>>         -                     i->position << " == " << msg.position);
>>         -            messages.erase(i);
>>         -            return true;
>>         -        } else {
>>         -            QPID_LOG(debug, "No match: " << i->position <<
"
>>         != " << msg.position);
>>         -        }
>>         +        clearLVQIndex(msg);
>>         +        QPID_LOG(debug,
>>         +                 "Match found, acquire succeeded: " <<
>>         +                 i->position << " == " << msg.position);
>>         +        messages.erase(i);
>>         +        return true;
>>         +    } else {
>>         +        QPID_LOG(debug, "No match: " << i->position << "
!=
>>         " << msg.position);
>>             }
>>         +
>>             QPID_LOG(debug, "Acquire failed for " << msg.position);
>>             return false;
>>          }
>>         @@ -445,19 +445,35 @@
>>             return false;
>>          }
>>
>>         -namespace {
>>         -struct PositionEquals {
>>         -    SequenceNumber pos;
>>         -    PositionEquals(SequenceNumber p) : pos(p) {}
>>         -    bool operator()(const QueuedMessage& msg) const { return
>>         msg.position == pos; }
>>         -};
>>         -}// namespace
>>         +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
>>
>>         +    if(!messages.empty()){
>>         +        QueuedMessage compM;
>>         +        compM.position = pos;
>>         +        unsigned long diff = pos.getValue() -
>>         messages.front().position.getValue();
>>         +        long maxEnd = diff < messages.size()? diff :
>>         messages.size();
>>         +
>>         +        Messages::iterator i =
>>         lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>         +        if (i->position == pos)
>>         +            return i;
>>         +    }
>>         +    return messages.end(); // no match found.
>>         +}
>>         +
>>         +
>>          QueuedMessage Queue::find(SequenceNumber pos) const {
>>         +
>>             Mutex::ScopedLock locker(messageLock);
>>         -    Messages::const_iterator i =
>>         std::find_if(messages.begin(), messages.end(),
>>         PositionEquals(pos));
>>         -    if (i != messages.end())
>>         -        return *i;
>>         +    if(!messages.empty()){
>>         +        QueuedMessage compM;
>>         +        compM.position = pos;
>>         +        unsigned long diff = pos.getValue() -
>>         messages.front().position.getValue();
>>         +        long maxEnd = diff < messages.size()? diff :
>>         messages.size();
>>         +
>>         +        Messages::const_iterator i =
>>         lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>         +        if (i != messages.end())
>>         +            return *i;
>>         +    }
>>             return QueuedMessage();
>>          }
>>
>>         @@ -642,10 +658,9 @@
>>          }
>>
>>          /** function only provided for unit tests, or code not in
>>         critical message path */
>>         -uint32_t Queue::getMessageCount() const
>>         +uint32_t Queue::getEnqueueCompleteMessageCount() const
>>          {
>>             Mutex::ScopedLock locker(messageLock);
>>         -
>>             uint32_t count = 0;
>>             for ( Messages::const_iterator i = messages.begin(); i !=
>>         messages.end(); ++i ) {
>>                 //NOTE: don't need to use checkLvqReplace() here as it
>>         @@ -657,6 +672,12 @@
>>             return count;
>>          }
>>
>>         +uint32_t Queue::getMessageCount() const
>>         +{
>>         +    Mutex::ScopedLock locker(messageLock);
>>         +    return messages.size();
>>         +}
>>         +
>>          uint32_t Queue::getConsumerCount() const
>>          {
>>             Mutex::ScopedLock locker(consumerLock);
>>         Index: qpid/broker/QueuedMessage.h
>>         ===================================================================
>>         --- qpid/broker/QueuedMessage.h (revision 833135)
>>         +++ qpid/broker/QueuedMessage.h (working copy)
>>         @@ -38,7 +38,9 @@
>>             QueuedMessage(Queue* q, boost::intrusive_ptr<Message>
>>         msg, framing::SequenceNumber sn) :
>>                 payload(msg), position(sn), queue(q) {}
>>             QueuedMessage(Queue* q) : queue(q) {}
>>         +
>>          };
>>         +    inline bool operator<(const QueuedMessage& a, const
>>         QueuedMessage& b) { return a.position < b.position; }
>>
>>          }}
>>
>>         Index: qpid/broker/Queue.h
>>         ===================================================================
>>         --- qpid/broker/Queue.h (revision 833135)
>>         +++ qpid/broker/Queue.h (working copy)
>>         @@ -148,6 +148,8 @@
>>                             }
>>                         }
>>                     }
>>         +
>>         +            Messages::iterator
>>         findAt(framing::SequenceNumber pos);
>>
>>                 public:
>>
>>         @@ -221,6 +223,7 @@
>>                     uint32_t move(const Queue::shared_ptr destq,
>>         uint32_t qty);
>>
>>                     QPID_BROKER_EXTERN uint32_t getMessageCount() const;
>>         +            QPID_BROKER_EXTERN uint32_t
>>         getEnqueueCompleteMessageCount() const;
>>                     QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
>>                     inline const string& getName() const { return name; }
>>                     bool isExclusiveOwner(const OwnershipToken* const
>>         o) const;
>>         Index: tests/QueueTest.cpp
>>         ===================================================================
>>         --- tests/QueueTest.cpp (revision 833135)
>>         +++ tests/QueueTest.cpp (working copy)
>>         @@ -120,9 +120,10 @@
>>             queue->process(msg1);
>>             sleep(2);
>>             uint32_t compval=0;
>>         -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>         +    BOOST_CHECK_EQUAL(compval,
>>         queue->getEnqueueCompleteMessageCount());
>>             msg1->enqueueComplete();
>>             compval=1;
>>         +    BOOST_CHECK_EQUAL(compval,
>>         queue->getEnqueueCompleteMessageCount());
>>             BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>          }
>>
>>
>>
>>         ---------------------------------------------------------------------
>>         Apache Qpid - AMQP Messaging Implementation
>>         Project:      http://qpid.apache.org
>>         Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>         <mailto:dev-subscribe@qpid.apache.org>
>>
>>
>
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message