Repository: activemq-artemis
Updated Branches:
refs/heads/master 25e9fd78d -> db5a9597a
[ARTEMIS-1209] JMS OpenWire client cannot read notifications from activemq.notifications topic
Issue: https://issues.apache.org/jira/browse/ARTEMIS-1209
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/45321c65
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/45321c65
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/45321c65
Branch: refs/heads/master
Commit: 45321c65bd5e95bf92b1c29c072334f11ed4ca00
Parents: 25e9fd7
Author: Ingo Weiss <ingo@redhat.com>
Authored: Tue Jun 6 16:46:58 2017 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Jun 7 16:26:33 2017 -0400
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 5 ++++-
.../openwire/SimpleOpenWireTest.java | 22 ++++++++++++++++++++
2 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/45321c65/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index dd7879c..508bac9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -94,6 +94,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
+ private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
+
private final WireFormat marshaller;
public OpenWireMessageConverter(WireFormat marshaller) {
@@ -774,7 +776,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
if (props != null) {
for (SimpleString s : props) {
String keyStr = s.toString();
- if (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) {
+ if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
+ !(actualDestination.toString().contains(AMQ_NOTIFICATIONS_DESTINATION)))
{
continue;
}
Object prop = coreMessage.getObjectProperty(s);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/45321c65/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index 6eb45a8..cb4bd11 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -38,6 +38,7 @@ import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
@@ -1467,6 +1468,27 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
}
+ @Test
+ public void testNotificationProperties() throws Exception {
+ try (TopicConnection topicConnection = factory.createTopicConnection()) {
+ TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic notificationsTopic = topicSession.createTopic("activemq.notifications");
+ TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic);
+ List<Message> receivedMessages = new ArrayList<>();
+ subscriber.setMessageListener(receivedMessages::add);
+ topicConnection.start();
+
+ while (receivedMessages.size() == 0) {
+ Thread.sleep(1000);
+ }
+
+ for (Message message : receivedMessages) {
+ assertNotNull(message);
+ assertNotNull(message.getStringProperty("_AMQ_NotifType"));
+ }
+ }
+ }
+
private void checkQueueEmpty(String qName) {
PostOffice po = server.getPostOffice();
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));
|