qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carl Trieloff <cctriel...@redhat.com>
Subject Re: Remove liner queue functions
Date Mon, 30 Nov 2009 14:00:21 GMT

yes, they are in the head of the trunk.

Gordon's patch now makes sure that messages are put back onto the queue 
in order or sequence number. This is the part I
believe that made life hard for you last time round.

Carl.

chenta lee wrote:
> Carl,
> Did those patches merge into the current source tree? And could you 
> tell me which patches they are?
>
> Chenta 
>
> On Wed, Nov 25, 2009 at 9:49 PM, Carl Trieloff <cctrieloff@redhat.com 
> <mailto:cctrieloff@redhat.com>> wrote:
>
>
>     Chenta,
>
>     Two things to note, Gordon has now put a patch in that corrects
>     the order of the messages on requeue, i.e. they now stay in order
>     by sequence number even after rollback. Thus with his patch and my
>     seek() seekAt() we should now be able to resolve the acquire case
>     quite easily and update the position.
>
>     Take a look and let me know if you need some help with that.
>
>     (i.e. before these two patches it would not have been possible,
>     but now I believe it is)
>
>     Carl.
>
>
>
>     chenta lee wrote:
>>     I did the consumer sequence number wrap-around is because that
>>     when we requeue the message, I cannot know which consumer consume
>>     it. Therefore, I cannot not update the sequence number of
>>     consumer and the messages rollback. The consequence is that when
>>     a consumer rollback (requeue) messages, they can not acquire them
>>     anymore (because requeue_msg.position is always larger than
>>     consumer.position ).
>>
>>     However, my patch is not that dirty :), I didn't change the
>>     original algorithm. We do not update the consumer sequence in
>>     consumeNextMessage at the very beginning. From my point of view,
>>     the only concern is that when a user decide to use selector in
>>     their messages, they might suffer from performance issue,
>>     however, the other users who do not use selector will be just fine.
>>
>>     Chenta
>>
>>     On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff
>>     <cctrieloff@redhat.com <mailto:cctrieloff@redhat.com>> wrote:
>>
>>
>>         Thanks, the one remaining issue I know of with the selector
>>         patch is that consumer
>>         sequence number wrap-around I don't think works.
>>
>>         We need a test there and maybe change the comp operators in
>>         your patch. I was looking into
>>         that last week on the selector patch, I'm itching to get the
>>         patch in.
>>
>>         Carl.
>>
>>
>>
>>         chenta lee wrote:
>>>         Hi Carl,
>>>         This patch looks great, I will update the selector patch later.
>>>
>>>         Chenta
>>>
>>>         On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff
>>>         <cctrieloff@redhat.com <mailto:cctrieloff@redhat.com>> wrote:
>>>
>>>
>>>             I created a patch which seems to work well, it targets
>>>             querying the queue, count, acquire making the
>>>             queue access faster for large queues (best 1 {if no
>>>             requeue or acquire}, worst case binary-search) . In
>>>             most cases if it faster then binary search even if
>>>             requeue or selector is used.
>>>
>>>             It does require that the re-queue order be corrected -
>>>             which should be done regardless.
>>>
>>>             The remaining function that could use some similar
>>>             dressing would be Queue::seek()
>>>
>>>             Any thoughts on the patch... This patch opens the way
>>>             for reasonable selector performance.
>>>             Carl.
>>>
>>>
>>>             Index: qpid/broker/Queue.cpp
>>>             ===================================================================
>>>             --- qpid/broker/Queue.cpp       (revision 833135)
>>>             +++ qpid/broker/Queue.cpp       (working copy)
>>>             @@ -243,18 +243,18 @@
>>>              {
>>>                 Mutex::ScopedLock locker(messageLock);
>>>                 QPID_LOG(debug, "Attempting to acquire message at "
>>>             << position);
>>>             -    for (Messages::iterator i = messages.begin(); i !=
>>>             messages.end(); i++) {
>>>             -        if (i->position == position) {
>>>             -            message = *i;
>>>             -            if (lastValueQueue) {
>>>             -                clearLVQIndex(*i);
>>>             -            }
>>>             -            QPID_LOG(debug,
>>>             -                     "Acquired message at " <<
>>>             i->position << " from " << name);
>>>             -            messages.erase(i);
>>>             -            return true;
>>>             +
>>>             +    Messages::iterator i = findAt(position);
>>>             +    if (i != messages.end() ) {
>>>             +        message = *i;
>>>             +        if (lastValueQueue) {
>>>             +            clearLVQIndex(*i);
>>>                     }
>>>             -    }
>>>             +        QPID_LOG(debug,
>>>             +                 "Acquired message at " << i->position
>>>             << " from " << name);
>>>             +        messages.erase(i);
>>>             +        return true;
>>>             +    }
>>>                 QPID_LOG(debug, "Could not acquire message at " <<
>>>             position << " from " << name << "; no message at
that
>>>             position");
>>>                 return false;
>>>              }
>>>             @@ -262,21 +262,21 @@
>>>              bool Queue::acquire(const QueuedMessage& msg) {
>>>                 Mutex::ScopedLock locker(messageLock);
>>>                 QPID_LOG(debug, "attempting to acquire " <<
>>>             msg.position);
>>>             -    for (Messages::iterator i = messages.begin(); i !=
>>>             messages.end(); i++) {
>>>             -        if ((i->position == msg.position &&
>>>             !lastValueQueue) // note that in some cases payload not
>>>             be set
>>>             -            || (lastValueQueue && (i->position ==
>>>             msg.position) &&
>>>             -                msg.payload.get() ==
>>>             checkLvqReplace(*i).payload.get()) )  {
>>>             +    Messages::iterator i = findAt(msg.position);
>>>             +    if ((i != messages.end() && !lastValueQueue) //
>>>             note that in some cases payload not be set
>>>             +        || (lastValueQueue && (i->position ==
>>>             msg.position) &&
>>>             +            msg.payload.get() ==
>>>             checkLvqReplace(*i).payload.get()) )  {
>>>
>>>             -            clearLVQIndex(msg);
>>>             -            QPID_LOG(debug,
>>>             -                     "Match found, acquire succeeded: " <<
>>>             -                     i->position << " == " <<
>>>             msg.position);
>>>             -            messages.erase(i);
>>>             -            return true;
>>>             -        } else {
>>>             -            QPID_LOG(debug, "No match: " << i->position
>>>             << " != " << msg.position);
>>>             -        }
>>>             +        clearLVQIndex(msg);
>>>             +        QPID_LOG(debug,
>>>             +                 "Match found, acquire succeeded: " <<
>>>             +                 i->position << " == " << msg.position);
>>>             +        messages.erase(i);
>>>             +        return true;
>>>             +    } else {
>>>             +        QPID_LOG(debug, "No match: " << i->position <<
>>>             " != " << msg.position);
>>>                 }
>>>             +
>>>                 QPID_LOG(debug, "Acquire failed for " << msg.position);
>>>                 return false;
>>>              }
>>>             @@ -445,19 +445,35 @@
>>>                 return false;
>>>              }
>>>
>>>             -namespace {
>>>             -struct PositionEquals {
>>>             -    SequenceNumber pos;
>>>             -    PositionEquals(SequenceNumber p) : pos(p) {}
>>>             -    bool operator()(const QueuedMessage& msg) const {
>>>             return msg.position == pos; }
>>>             -};
>>>             -}// namespace
>>>             +Queue::Messages::iterator Queue::findAt(SequenceNumber
>>>             pos) {
>>>
>>>             +    if(!messages.empty()){
>>>             +        QueuedMessage compM;
>>>             +        compM.position = pos;
>>>             +        unsigned long diff = pos.getValue() -
>>>             messages.front().position.getValue();
>>>             +        long maxEnd = diff < messages.size()? diff :
>>>             messages.size();
>>>             +
>>>             +        Messages::iterator i =
>>>             lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>>             +        if (i->position == pos)
>>>             +            return i;
>>>             +    }
>>>             +    return messages.end(); // no match found.
>>>             +}
>>>             +
>>>             +
>>>              QueuedMessage Queue::find(SequenceNumber pos) const {
>>>             +
>>>                 Mutex::ScopedLock locker(messageLock);
>>>             -    Messages::const_iterator i =
>>>             std::find_if(messages.begin(), messages.end(),
>>>             PositionEquals(pos));
>>>             -    if (i != messages.end())
>>>             -        return *i;
>>>             +    if(!messages.empty()){
>>>             +        QueuedMessage compM;
>>>             +        compM.position = pos;
>>>             +        unsigned long diff = pos.getValue() -
>>>             messages.front().position.getValue();
>>>             +        long maxEnd = diff < messages.size()? diff :
>>>             messages.size();
>>>             +
>>>             +        Messages::const_iterator i =
>>>             lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>>             +        if (i != messages.end())
>>>             +            return *i;
>>>             +    }
>>>                 return QueuedMessage();
>>>              }
>>>
>>>             @@ -642,10 +658,9 @@
>>>              }
>>>
>>>              /** function only provided for unit tests, or code not
>>>             in critical message path */
>>>             -uint32_t Queue::getMessageCount() const
>>>             +uint32_t Queue::getEnqueueCompleteMessageCount() const
>>>              {
>>>                 Mutex::ScopedLock locker(messageLock);
>>>             -
>>>                 uint32_t count = 0;
>>>                 for ( Messages::const_iterator i = messages.begin();
>>>             i != messages.end(); ++i ) {
>>>                     //NOTE: don't need to use checkLvqReplace() here
>>>             as it
>>>             @@ -657,6 +672,12 @@
>>>                 return count;
>>>              }
>>>
>>>             +uint32_t Queue::getMessageCount() const
>>>             +{
>>>             +    Mutex::ScopedLock locker(messageLock);
>>>             +    return messages.size();
>>>             +}
>>>             +
>>>              uint32_t Queue::getConsumerCount() const
>>>              {
>>>                 Mutex::ScopedLock locker(consumerLock);
>>>             Index: qpid/broker/QueuedMessage.h
>>>             ===================================================================
>>>             --- qpid/broker/QueuedMessage.h (revision 833135)
>>>             +++ qpid/broker/QueuedMessage.h (working copy)
>>>             @@ -38,7 +38,9 @@
>>>                 QueuedMessage(Queue* q,
>>>             boost::intrusive_ptr<Message> msg,
>>>             framing::SequenceNumber sn) :
>>>                     payload(msg), position(sn), queue(q) {}
>>>                 QueuedMessage(Queue* q) : queue(q) {}
>>>             +
>>>              };
>>>             +    inline bool operator<(const QueuedMessage& a, const
>>>             QueuedMessage& b) { return a.position < b.position; }
>>>
>>>              }}
>>>
>>>             Index: qpid/broker/Queue.h
>>>             ===================================================================
>>>             --- qpid/broker/Queue.h (revision 833135)
>>>             +++ qpid/broker/Queue.h (working copy)
>>>             @@ -148,6 +148,8 @@
>>>                                 }
>>>                             }
>>>                         }
>>>             +
>>>             +            Messages::iterator
>>>             findAt(framing::SequenceNumber pos);
>>>
>>>                     public:
>>>
>>>             @@ -221,6 +223,7 @@
>>>                         uint32_t move(const Queue::shared_ptr destq,
>>>             uint32_t qty);
>>>
>>>                         QPID_BROKER_EXTERN uint32_t
>>>             getMessageCount() const;
>>>             +            QPID_BROKER_EXTERN uint32_t
>>>             getEnqueueCompleteMessageCount() const;
>>>                         QPID_BROKER_EXTERN uint32_t
>>>             getConsumerCount() const;
>>>                         inline const string& getName() const {
>>>             return name; }
>>>                         bool isExclusiveOwner(const OwnershipToken*
>>>             const o) const;
>>>             Index: tests/QueueTest.cpp
>>>             ===================================================================
>>>             --- tests/QueueTest.cpp (revision 833135)
>>>             +++ tests/QueueTest.cpp (working copy)
>>>             @@ -120,9 +120,10 @@
>>>                 queue->process(msg1);
>>>                 sleep(2);
>>>                 uint32_t compval=0;
>>>             -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>>             +    BOOST_CHECK_EQUAL(compval,
>>>             queue->getEnqueueCompleteMessageCount());
>>>                 msg1->enqueueComplete();
>>>                 compval=1;
>>>             +    BOOST_CHECK_EQUAL(compval,
>>>             queue->getEnqueueCompleteMessageCount());
>>>                 BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>>              }
>>>
>>>
>>>
>>>             ---------------------------------------------------------------------
>>>             Apache Qpid - AMQP Messaging Implementation
>>>             Project:      http://qpid.apache.org
>>>             Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>>             <mailto:dev-subscribe@qpid.apache.org>
>>>
>>>
>>
>>
>
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message