qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alex Rudyy (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (QPID-8210) [Broker-J] Consumer attach when message is being released and requeued can end-up in broker crash due to unhandled NullPointerException
Date Sat, 23 Jun 2018 22:07:00 GMT

     [ https://issues.apache.org/jira/browse/QPID-8210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Alex Rudyy closed QPID-8210.
----------------------------

> [Broker-J]  Consumer attach when message is being released and requeued can end-up in
broker crash due to unhandled NullPointerException
> ----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: QPID-8210
>                 URL: https://issues.apache.org/jira/browse/QPID-8210
>             Project: Qpid
>          Issue Type: Bug
>          Components: Broker-J
>    Affects Versions: qpid-java-broker-7.0.3, qpid-java-broker-7.0.2, qpid-java-broker-7.0.0,
qpid-java-broker-7.0.1, qpid-java-broker-7.0.4, qpid-java-broker-7.0.5
>            Reporter: Alex Rudyy
>            Priority: Critical
>             Fix For: qpid-java-broker-7.0.6
>
>
> The consumer node is currently set after consumer is added into queue consumer manager.
When released message is requeued all consumers in consumer manager are iterated and AbstractQueue#updateSubRequeueEntry
is invoke for every of them to update the released entry in consumer queue context if required.
The notification of consumer without a consumer node can result in NullPointerException:
> {noformat}
> java.lang.NullPointerException
>     at org.apache.qpid.server.queue.QueueConsumerManagerImpl.setNotified(QueueConsumerManagerImpl.java:151)
>     at org.apache.qpid.server.queue.AbstractQueue.notifyConsumer(AbstractQueue.java:2268)
>     at org.apache.qpid.server.queue.AbstractQueue.updateSubRequeueEntry(AbstractQueue.java:1382)
>     at org.apache.qpid.server.queue.AbstractQueue.resetSubPointers(AbstractQueue.java:1413)
>     at org.apache.qpid.server.queue.AbstractQueue.requeue(AbstractQueue.java:1399)
>     at org.apache.qpid.server.queue.QueueEntryImpl.postRelease(QueueEntryImpl.java:446)
>     at org.apache.qpid.server.queue.QueueEntryImpl.release(QueueEntryImpl.java:437)
>     at org.apache.qpid.server.protocol.v0_8.AMQChannel.requeue(AMQChannel.java:896)
>     at org.apache.qpid.server.protocol.v0_8.AMQChannel.close(AMQChannel.java:787)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.closeChannel(AMQPConnection_0_8Impl.java:459)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.closeChannel(AMQPConnection_0_8Impl.java:430)
>     at org.apache.qpid.server.protocol.v0_8.AMQChannel.receiveChannelClose(AMQChannel.java:2167)
>     at org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody.process(ChannelCloseBody.java:140)
>     at org.apache.qpid.server.protocol.v0_8.ServerDecoder.processMethod(ServerDecoder.java:132)
>     at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processFrame(AMQDecoder.java:203)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.doProcessFrame(BrokerDecoder.java:141)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.processFrame(BrokerDecoder.java:65)
>     at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processInput(AMQDecoder.java:185)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder$1.run(BrokerDecoder.java:104)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder$1.run(BrokerDecoder.java:97)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.processAMQPFrames(BrokerDecoder.java:96)
>     at org.apache.qpid.server.protocol.v0_8.AMQDecoder.decode(AMQDecoder.java:118)
>     at org.apache.qpid.server.protocol.v0_8.ServerDecoder.decodeBuffer(ServerDecoder.java:44)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl$1.run(AMQPConnection_0_8Impl.java:257)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl$1.run(AMQPConnection_0_8Impl.java:249)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.received(AMQPConnection_0_8Impl.java:248)
>     at org.apache.qpid.server.transport.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:135)
>     at org.apache.qpid.server.transport.NonBlockingConnection.processAmqpData(NonBlockingConnection.java:610)
>     at org.apache.qpid.server.transport.NonBlockingConnectionPlainDelegate.processData(NonBlockingConnectionPlainDelegate.java:58)
>     at org.apache.qpid.server.transport.NonBlockingConnection.doRead(NonBlockingConnection.java:496)
>     at org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:270)
>     at org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:134)
>     at org.apache.qpid.server.transport.SelectorThread$ConnectionProcessor.processConnection(SelectorThread.java:575)
>     at org.apache.qpid.server.transport.SelectorThread$SelectionTask.performSelect(SelectorThread.java:366)
>     at org.apache.qpid.server.transport.SelectorThread$SelectionTask.run(SelectorThread.java:97)
>     at org.apache.qpid.server.transport.SelectorThread.run(SelectorThread.java:533)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at org.apache.qpid.server.bytebuffer.QpidByteBufferFactory.lambda$null$0(QpidByteBufferFactory.java:464)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The issue can happen in the following scenarios:
>  * when previously acquired message is released due to consumer/session/connection close
and another consumer is attached to the queue at the same time.
>  * when transaction is rolled back or message is released (for example, JMS session Session#recover()
is called for client_ack session) and a new competing consumer is attached at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message