qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenta lee <che...@gmail.com>
Subject Re: Remove liner queue functions
Date Thu, 26 Nov 2009 02:35:24 GMT
Carl,
Did those patches merge into the current source tree? And could you tell me
which patches they are?

Chenta

On Wed, Nov 25, 2009 at 9:49 PM, Carl Trieloff <cctrieloff@redhat.com>wrote:

>
> Chenta,
>
> Two things to note, Gordon has now put a patch in that corrects the order
> of the messages on requeue, i.e. they now stay in order by sequence number
> even after rollback. Thus with his patch and my seek() seekAt() we should
> now be able to resolve the acquire case quite easily and update the
> position.
>
> Take a look and let me know if you need some help with that.
>
> (i.e. before these two patches it would not have been possible, but now I
> believe it is)
>
> Carl.
>
>
>
> chenta lee wrote:
>
> I did the consumer sequence number wrap-around is because that when we
> requeue the message, I cannot know which consumer consume it. Therefore, I
> cannot not update the sequence number of consumer and the messages rollback.
> The consequence is that when a consumer rollback (requeue) messages, they
> can not acquire them anymore (because requeue_msg.position is always larger
> than consumer.position ).
>
>  However, my patch is not that dirty :), I didn't change the original
> algorithm. We do not update the consumer sequence in consumeNextMessage at
> the very beginning. From my point of view, the only concern is that when a
> user decide to use selector in their messages, they might suffer from
> performance issue, however, the other users who do not use selector will be
> just fine.
>
>  Chenta
>
> On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff <cctrieloff@redhat.com>wrote:
>
>>
>> Thanks, the one remaining issue I know of with the selector patch is that
>> consumer
>> sequence number wrap-around I don't think works.
>>
>> We need a test there and maybe change the comp operators in your patch. I
>> was looking into
>> that last week on the selector patch, I'm itching to get the patch in.
>>
>> Carl.
>>
>>
>>
>> chenta lee wrote:
>>
>> Hi Carl,
>> This patch looks great, I will update the selector patch later.
>>
>>  Chenta
>>
>> On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <cctrieloff@redhat.com>wrote:
>>
>>>
>>> I created a patch which seems to work well, it targets querying the
>>> queue, count, acquire making the
>>> queue access faster for large queues (best 1 {if no requeue or acquire},
>>> worst case binary-search) . In
>>> most cases if it faster then binary search even if requeue or selector is
>>> used.
>>>
>>> It does require that the re-queue order be corrected - which should be
>>> done regardless.
>>>
>>> The remaining function that could use some similar dressing would be
>>> Queue::seek()
>>>
>>> Any thoughts on the patch... This patch opens the way for reasonable
>>> selector performance.
>>> Carl.
>>>
>>>
>>> Index: qpid/broker/Queue.cpp
>>> ===================================================================
>>> --- qpid/broker/Queue.cpp       (revision 833135)
>>> +++ qpid/broker/Queue.cpp       (working copy)
>>> @@ -243,18 +243,18 @@
>>>  {
>>>     Mutex::ScopedLock locker(messageLock);
>>>     QPID_LOG(debug, "Attempting to acquire message at " << position);
>>> -    for (Messages::iterator i = messages.begin(); i != messages.end();
>>> i++) {
>>> -        if (i->position == position) {
>>> -            message = *i;
>>> -            if (lastValueQueue) {
>>> -                clearLVQIndex(*i);
>>> -            }
>>> -            QPID_LOG(debug,
>>> -                     "Acquired message at " << i->position <<
" from "
>>> << name);
>>> -            messages.erase(i);
>>> -            return true;
>>> +
>>> +    Messages::iterator i = findAt(position);
>>> +    if (i != messages.end() ) {
>>> +        message = *i;
>>> +        if (lastValueQueue) {
>>> +            clearLVQIndex(*i);
>>>         }
>>> -    }
>>> +        QPID_LOG(debug,
>>> +                 "Acquired message at " << i->position << " from
" <<
>>> name);
>>> +        messages.erase(i);
>>> +        return true;
>>> +    }
>>>     QPID_LOG(debug, "Could not acquire message at " << position <<
" from
>>> " << name << "; no message at that position");
>>>     return false;
>>>  }
>>> @@ -262,21 +262,21 @@
>>>  bool Queue::acquire(const QueuedMessage& msg) {
>>>     Mutex::ScopedLock locker(messageLock);
>>>     QPID_LOG(debug, "attempting to acquire " << msg.position);
>>> -    for (Messages::iterator i = messages.begin(); i != messages.end();
>>> i++) {
>>> -        if ((i->position == msg.position && !lastValueQueue) // note
>>> that in some cases payload not be set
>>> -            || (lastValueQueue && (i->position == msg.position) &&
>>> -                msg.payload.get() == checkLvqReplace(*i).payload.get())
>>> )  {
>>> +    Messages::iterator i = findAt(msg.position);
>>> +    if ((i != messages.end() && !lastValueQueue) // note that in some
>>> cases payload not be set
>>> +        || (lastValueQueue && (i->position == msg.position) &&
>>> +            msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
>>>
>>> -            clearLVQIndex(msg);
>>> -            QPID_LOG(debug,
>>> -                     "Match found, acquire succeeded: " <<
>>> -                     i->position << " == " << msg.position);
>>> -            messages.erase(i);
>>> -            return true;
>>> -        } else {
>>> -            QPID_LOG(debug, "No match: " << i->position << "
!= " <<
>>> msg.position);
>>> -        }
>>> +        clearLVQIndex(msg);
>>> +        QPID_LOG(debug,
>>> +                 "Match found, acquire succeeded: " <<
>>> +                 i->position << " == " << msg.position);
>>> +        messages.erase(i);
>>> +        return true;
>>> +    } else {
>>> +        QPID_LOG(debug, "No match: " << i->position << " != "
<<
>>> msg.position);
>>>     }
>>> +
>>>     QPID_LOG(debug, "Acquire failed for " << msg.position);
>>>     return false;
>>>  }
>>> @@ -445,19 +445,35 @@
>>>     return false;
>>>  }
>>>
>>> -namespace {
>>> -struct PositionEquals {
>>> -    SequenceNumber pos;
>>> -    PositionEquals(SequenceNumber p) : pos(p) {}
>>> -    bool operator()(const QueuedMessage& msg) const { return
>>> msg.position == pos; }
>>> -};
>>> -}// namespace
>>> +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
>>>
>>> +    if(!messages.empty()){
>>> +        QueuedMessage compM;
>>> +        compM.position = pos;
>>> +        unsigned long diff = pos.getValue() -
>>> messages.front().position.getValue();
>>> +        long maxEnd = diff < messages.size()? diff : messages.size();
>>> +
>>> +        Messages::iterator i =
>>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>> +        if (i->position == pos)
>>> +            return i;
>>> +    }
>>> +    return messages.end(); // no match found.
>>> +}
>>> +
>>> +
>>>  QueuedMessage Queue::find(SequenceNumber pos) const {
>>> +
>>>     Mutex::ScopedLock locker(messageLock);
>>> -    Messages::const_iterator i = std::find_if(messages.begin(),
>>> messages.end(), PositionEquals(pos));
>>> -    if (i != messages.end())
>>> -        return *i;
>>> +    if(!messages.empty()){
>>> +        QueuedMessage compM;
>>> +        compM.position = pos;
>>> +        unsigned long diff = pos.getValue() -
>>> messages.front().position.getValue();
>>> +        long maxEnd = diff < messages.size()? diff : messages.size();
>>> +
>>> +        Messages::const_iterator i =
>>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>>> +        if (i != messages.end())
>>> +            return *i;
>>> +    }
>>>     return QueuedMessage();
>>>  }
>>>
>>> @@ -642,10 +658,9 @@
>>>  }
>>>
>>>  /** function only provided for unit tests, or code not in critical
>>> message path */
>>> -uint32_t Queue::getMessageCount() const
>>> +uint32_t Queue::getEnqueueCompleteMessageCount() const
>>>  {
>>>     Mutex::ScopedLock locker(messageLock);
>>> -
>>>     uint32_t count = 0;
>>>     for ( Messages::const_iterator i = messages.begin(); i !=
>>> messages.end(); ++i ) {
>>>         //NOTE: don't need to use checkLvqReplace() here as it
>>> @@ -657,6 +672,12 @@
>>>     return count;
>>>  }
>>>
>>> +uint32_t Queue::getMessageCount() const
>>> +{
>>> +    Mutex::ScopedLock locker(messageLock);
>>> +    return messages.size();
>>> +}
>>> +
>>>  uint32_t Queue::getConsumerCount() const
>>>  {
>>>     Mutex::ScopedLock locker(consumerLock);
>>> Index: qpid/broker/QueuedMessage.h
>>> ===================================================================
>>> --- qpid/broker/QueuedMessage.h (revision 833135)
>>> +++ qpid/broker/QueuedMessage.h (working copy)
>>> @@ -38,7 +38,9 @@
>>>     QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg,
>>> framing::SequenceNumber sn) :
>>>         payload(msg), position(sn), queue(q) {}
>>>     QueuedMessage(Queue* q) : queue(q) {}
>>> +
>>>  };
>>> +    inline bool operator<(const QueuedMessage& a, const QueuedMessage&
>>> b) { return a.position < b.position; }
>>>
>>>  }}
>>>
>>> Index: qpid/broker/Queue.h
>>> ===================================================================
>>> --- qpid/broker/Queue.h (revision 833135)
>>> +++ qpid/broker/Queue.h (working copy)
>>> @@ -148,6 +148,8 @@
>>>                     }
>>>                 }
>>>             }
>>> +
>>> +            Messages::iterator findAt(framing::SequenceNumber pos);
>>>
>>>         public:
>>>
>>> @@ -221,6 +223,7 @@
>>>             uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
>>>
>>>             QPID_BROKER_EXTERN uint32_t getMessageCount() const;
>>> +            QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount()
>>> const;
>>>             QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
>>>             inline const string& getName() const { return name; }
>>>             bool isExclusiveOwner(const OwnershipToken* const o) const;
>>> Index: tests/QueueTest.cpp
>>> ===================================================================
>>> --- tests/QueueTest.cpp (revision 833135)
>>> +++ tests/QueueTest.cpp (working copy)
>>> @@ -120,9 +120,10 @@
>>>     queue->process(msg1);
>>>     sleep(2);
>>>     uint32_t compval=0;
>>> -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>>>     msg1->enqueueComplete();
>>>     compval=1;
>>> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>>>     BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>>  }
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> Apache Qpid - AMQP Messaging Implementation
>>> Project:      http://qpid.apache.org
>>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>>
>>
>>
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message