qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenta lee <che...@gmail.com>
Subject Re: Remove liner queue functions
Date Wed, 25 Nov 2009 03:26:12 GMT
I did the consumer sequence number wrap-around is because that when we
requeue the message, I cannot know which consumer consume it. Therefore, I
cannot not update the sequence number of consumer and the messages rollback.
The consequence is that when a consumer rollback (requeue) messages, they
can not acquire them anymore (because requeue_msg.position is always larger
than consumer.position ).

However, my patch is not that dirty :), I didn't change the original
algorithm. We do not update the consumer sequence in consumeNextMessage at
the very beginning. From my point of view, the only concern is that when a
user decide to use selector in their messages, they might suffer from
performance issue, however, the other users who do not use selector will be
just fine.

Chenta

On Wed, Nov 25, 2009 at 10:54 AM, Carl Trieloff <cctrieloff@redhat.com>wrote:

>
> Thanks, the one remaining issue I know of with the selector patch is that
> consumer
> sequence number wrap-around I don't think works.
>
> We need a test there and maybe change the comp operators in your patch. I
> was looking into
> that last week on the selector patch, I'm itching to get the patch in.
>
> Carl.
>
>
>
> chenta lee wrote:
>
> Hi Carl,
> This patch looks great, I will update the selector patch later.
>
>  Chenta
>
> On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <cctrieloff@redhat.com>wrote:
>
>>
>> I created a patch which seems to work well, it targets querying the queue,
>> count, acquire making the
>> queue access faster for large queues (best 1 {if no requeue or acquire},
>> worst case binary-search) . In
>> most cases if it faster then binary search even if requeue or selector is
>> used.
>>
>> It does require that the re-queue order be corrected - which should be
>> done regardless.
>>
>> The remaining function that could use some similar dressing would be
>> Queue::seek()
>>
>> Any thoughts on the patch... This patch opens the way for reasonable
>> selector performance.
>> Carl.
>>
>>
>> Index: qpid/broker/Queue.cpp
>> ===================================================================
>> --- qpid/broker/Queue.cpp       (revision 833135)
>> +++ qpid/broker/Queue.cpp       (working copy)
>> @@ -243,18 +243,18 @@
>>  {
>>     Mutex::ScopedLock locker(messageLock);
>>     QPID_LOG(debug, "Attempting to acquire message at " << position);
>> -    for (Messages::iterator i = messages.begin(); i != messages.end();
>> i++) {
>> -        if (i->position == position) {
>> -            message = *i;
>> -            if (lastValueQueue) {
>> -                clearLVQIndex(*i);
>> -            }
>> -            QPID_LOG(debug,
>> -                     "Acquired message at " << i->position << " from
" <<
>> name);
>> -            messages.erase(i);
>> -            return true;
>> +
>> +    Messages::iterator i = findAt(position);
>> +    if (i != messages.end() ) {
>> +        message = *i;
>> +        if (lastValueQueue) {
>> +            clearLVQIndex(*i);
>>         }
>> -    }
>> +        QPID_LOG(debug,
>> +                 "Acquired message at " << i->position << " from
" <<
>> name);
>> +        messages.erase(i);
>> +        return true;
>> +    }
>>     QPID_LOG(debug, "Could not acquire message at " << position << "
from
>> " << name << "; no message at that position");
>>     return false;
>>  }
>> @@ -262,21 +262,21 @@
>>  bool Queue::acquire(const QueuedMessage& msg) {
>>     Mutex::ScopedLock locker(messageLock);
>>     QPID_LOG(debug, "attempting to acquire " << msg.position);
>> -    for (Messages::iterator i = messages.begin(); i != messages.end();
>> i++) {
>> -        if ((i->position == msg.position && !lastValueQueue) // note
that
>> in some cases payload not be set
>> -            || (lastValueQueue && (i->position == msg.position) &&
>> -                msg.payload.get() == checkLvqReplace(*i).payload.get()) )
>>  {
>> +    Messages::iterator i = findAt(msg.position);
>> +    if ((i != messages.end() && !lastValueQueue) // note that in some
>> cases payload not be set
>> +        || (lastValueQueue && (i->position == msg.position) &&
>> +            msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
>>
>> -            clearLVQIndex(msg);
>> -            QPID_LOG(debug,
>> -                     "Match found, acquire succeeded: " <<
>> -                     i->position << " == " << msg.position);
>> -            messages.erase(i);
>> -            return true;
>> -        } else {
>> -            QPID_LOG(debug, "No match: " << i->position << " != "
<<
>> msg.position);
>> -        }
>> +        clearLVQIndex(msg);
>> +        QPID_LOG(debug,
>> +                 "Match found, acquire succeeded: " <<
>> +                 i->position << " == " << msg.position);
>> +        messages.erase(i);
>> +        return true;
>> +    } else {
>> +        QPID_LOG(debug, "No match: " << i->position << " != " <<
>> msg.position);
>>     }
>> +
>>     QPID_LOG(debug, "Acquire failed for " << msg.position);
>>     return false;
>>  }
>> @@ -445,19 +445,35 @@
>>     return false;
>>  }
>>
>> -namespace {
>> -struct PositionEquals {
>> -    SequenceNumber pos;
>> -    PositionEquals(SequenceNumber p) : pos(p) {}
>> -    bool operator()(const QueuedMessage& msg) const { return msg.position
>> == pos; }
>> -};
>> -}// namespace
>> +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
>>
>> +    if(!messages.empty()){
>> +        QueuedMessage compM;
>> +        compM.position = pos;
>> +        unsigned long diff = pos.getValue() -
>> messages.front().position.getValue();
>> +        long maxEnd = diff < messages.size()? diff : messages.size();
>> +
>> +        Messages::iterator i =
>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>> +        if (i->position == pos)
>> +            return i;
>> +    }
>> +    return messages.end(); // no match found.
>> +}
>> +
>> +
>>  QueuedMessage Queue::find(SequenceNumber pos) const {
>> +
>>     Mutex::ScopedLock locker(messageLock);
>> -    Messages::const_iterator i = std::find_if(messages.begin(),
>> messages.end(), PositionEquals(pos));
>> -    if (i != messages.end())
>> -        return *i;
>> +    if(!messages.empty()){
>> +        QueuedMessage compM;
>> +        compM.position = pos;
>> +        unsigned long diff = pos.getValue() -
>> messages.front().position.getValue();
>> +        long maxEnd = diff < messages.size()? diff : messages.size();
>> +
>> +        Messages::const_iterator i =
>> lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
>> +        if (i != messages.end())
>> +            return *i;
>> +    }
>>     return QueuedMessage();
>>  }
>>
>> @@ -642,10 +658,9 @@
>>  }
>>
>>  /** function only provided for unit tests, or code not in critical
>> message path */
>> -uint32_t Queue::getMessageCount() const
>> +uint32_t Queue::getEnqueueCompleteMessageCount() const
>>  {
>>     Mutex::ScopedLock locker(messageLock);
>> -
>>     uint32_t count = 0;
>>     for ( Messages::const_iterator i = messages.begin(); i !=
>> messages.end(); ++i ) {
>>         //NOTE: don't need to use checkLvqReplace() here as it
>> @@ -657,6 +672,12 @@
>>     return count;
>>  }
>>
>> +uint32_t Queue::getMessageCount() const
>> +{
>> +    Mutex::ScopedLock locker(messageLock);
>> +    return messages.size();
>> +}
>> +
>>  uint32_t Queue::getConsumerCount() const
>>  {
>>     Mutex::ScopedLock locker(consumerLock);
>> Index: qpid/broker/QueuedMessage.h
>> ===================================================================
>> --- qpid/broker/QueuedMessage.h (revision 833135)
>> +++ qpid/broker/QueuedMessage.h (working copy)
>> @@ -38,7 +38,9 @@
>>     QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg,
>> framing::SequenceNumber sn) :
>>         payload(msg), position(sn), queue(q) {}
>>     QueuedMessage(Queue* q) : queue(q) {}
>> +
>>  };
>> +    inline bool operator<(const QueuedMessage& a, const QueuedMessage&
b)
>> { return a.position < b.position; }
>>
>>  }}
>>
>> Index: qpid/broker/Queue.h
>> ===================================================================
>> --- qpid/broker/Queue.h (revision 833135)
>> +++ qpid/broker/Queue.h (working copy)
>> @@ -148,6 +148,8 @@
>>                     }
>>                 }
>>             }
>> +
>> +            Messages::iterator findAt(framing::SequenceNumber pos);
>>
>>         public:
>>
>> @@ -221,6 +223,7 @@
>>             uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
>>
>>             QPID_BROKER_EXTERN uint32_t getMessageCount() const;
>> +            QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount()
>> const;
>>             QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
>>             inline const string& getName() const { return name; }
>>             bool isExclusiveOwner(const OwnershipToken* const o) const;
>> Index: tests/QueueTest.cpp
>> ===================================================================
>> --- tests/QueueTest.cpp (revision 833135)
>> +++ tests/QueueTest.cpp (working copy)
>> @@ -120,9 +120,10 @@
>>     queue->process(msg1);
>>     sleep(2);
>>     uint32_t compval=0;
>> -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>>     msg1->enqueueComplete();
>>     compval=1;
>> +    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
>>     BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
>>  }
>>
>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project:      http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message