qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Praveen Murugesan (Created) (JIRA)" <j...@apache.org>
Subject [jira] [Created] (QPID-3927) Java Broker Priority Queue dispatch not working correctly
Date Mon, 02 Apr 2012 21:29:24 GMT
Java Broker Priority Queue dispatch not working correctly
---------------------------------------------------------

                 Key: QPID-3927
                 URL: https://issues.apache.org/jira/browse/QPID-3927
             Project: Qpid
          Issue Type: Bug
          Components: Java Broker
    Affects Versions: 0.16
         Environment: Ubuntu 10.04 LTS, 
Java Broker (0.16), Java Client (0.16), BDBMessageStore backend.
            Reporter: Praveen Murugesan
            Priority: Critical


The following steps reproduce the issue.

1) I create a priority Queue (with 10 levels) - Q1
2) I create a listener for Q1 and attach it to a consumer.
3) I enqueue a message to the priority Queue with priority 0.
4) The message gets dequeued by the subscribed consumer, and in the handler callback, I re-enqueue
the message that I dequeued with default priority(4) using another session & commit to
the same queue Q1.
5) Commit the consumer session.
6) Ideally the message I re-enqueued should be delivered the consumer when the handler returns.
The message delivery happens sometimes/sometimes not (flaps).

I decided to take a swing at this and get to the bottom of the problem and looked into the
Java Broker code, and this is what I see is happening:

I realized that the problem was in the logic of the method
public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node); in priorityQueueList.java

This method walks through the priority list using the node passed in as the base node, and
walks to list to the end. But this causes a problem, when encountering race conditions where
new messages arrive at a higher priority when a message of lower priority is still worked
upon.

Case 1)

1) I create a priority Queue (with 10 levels) - Q1
2) I create a listener for Q1 and attach it to a consumer.
3) I enqueue a message to the priority Queue with priority 0.
4) At this point, since the subscriber is free/available, the enqueued message is delivered
to the subscriber synchronously. QueueContext (Last Seen -> msg1 in P0(acquired state),
released entry -> null)
5) Enqueue a message to P4 ~ delivery async as the subscriber is busy. QueueContext(last seen
-> msg1 in p0(acquired state), released entry -> msg1 in p4(availabe state)).
6) commit the consumer session and return the handler. 
7) msg in p4 dispatched, QueueContext(last seen -> msg1 in p4(acquired state))
8) QueueRunner runs again, and updates the QueueContext(last seen -> msg1 in p0) ~ as msg
1 in P0 is still in acquired state.
9) msg 2 now enqueued to p4Q. QueueContext(last seen -> msg1 in p0, msg2 in p4)
10) now the subscriber cannot make progress, as the comparison for the next message [QueueEntry
node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode
: _entries.next(lastSeen);] always goes to _entries.next(lastSeen), which is at the last priority
and thinks it doesn't have anything to process.


Example Sequence for problem described above.
P4 Q -> msg1 -> msg2(arrives after msg1 of p4 is dispatched, but before msg1 of p0 status
changes to deleted).
p0 Q -> msg1 

In the above scenario, I'm still quite confused on the race in state change between acquired
-> deleted. 

Case 2)

1) I create a priority Queue (with 10 levels) - Q1
2) I enqueue a message with priority P4.
3) I consume the message synchronously (consumer.receive)., so that the message is now marked
deleted. 
4) I commit/close the consumer
5) I create a new listener for Q1 and attach it to a consumer.
6) I enqueue a message to the priority Queue with priority 0.
7) At this point, since the subscriber is free/available, the enqueued message is delivered
to the subscriber synchronously. QueueContext (Last Seen -> msg1 in P0(acquired state),
released entry -> null)
8) Enqueue a message to P4 ~ delivery async as the subscriber is busy. QueueContext(last seen
-> msg1 in p0(acquired state), released entry -> msg2 in p4(availabe state)).
9) commit the consumer session and return the handler. 
10) now queueRunner, tries to make progress, but the subscriber cannot make progress, as the
comparison for the next message [QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0)
? releasedNode : _entries.next(lastSeen);] always goes to _entries.next(lastSeen), which is
at the last priority and thinks it doesn't have anything to process.

Example Sequence for problem described above. Here msg1 is consumed synchronously before starting
the asynchronous consumer.
P4 Q -> msg1(deleted) -> msg2
p0 Q -> msg1 

I hope i'm quite clear in the above notes, either way I've attached a test which totally reproduces
the issue that I mention here! :)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message