activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [1/2] activemq git commit: AMQ-6428 - Added convience methods to EmbeddedActiveMQBroker and JUnit Resources for ActiveMQ clients
Date Tue, 04 Oct 2016 17:34:25 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 3239e4f79 -> a27f4f2ea


AMQ-6428 - Added convience methods to EmbeddedActiveMQBroker and JUnit Resources for ActiveMQ clients


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bab4a92d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bab4a92d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bab4a92d

Branch: refs/heads/master
Commit: bab4a92d6e337828481400a7ec9d94d1516c7158
Parents: 3239e4f
Author: Quinn Stevenson <quinn@pronoia-solutions.com>
Authored: Mon Oct 3 12:31:31 2016 -0600
Committer: Quinn Stevenson <quinn@pronoia-solutions.com>
Committed: Mon Oct 3 12:31:31 2016 -0600

----------------------------------------------------------------------
 .../junit/AbstractActiveMQClientResource.java   | 250 +++++++++
 .../junit/AbstractActiveMQConsumerResource.java | 118 +++++
 .../junit/AbstractActiveMQProducerResource.java | 132 +++++
 .../ActiveMQDynamicQueueSenderResource.java     | 137 +++++
 .../ActiveMQDynamicTopicPublisherResource.java  | 136 +++++
 .../junit/ActiveMQQueueReceiverResource.java    |  45 ++
 .../junit/ActiveMQQueueSenderResource.java      |  52 ++
 .../ActiveMQTopicDurableSubscriberResource.java |  72 +++
 .../junit/ActiveMQTopicPublisherResource.java   |  64 +++
 .../junit/ActiveMQTopicSubscriberResource.java  |  46 ++
 .../activemq/junit/EmbeddedActiveMQBroker.java  | 514 ++++++++++++++++---
 11 files changed, 1504 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQClientResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQClientResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQClientResource.java
new file mode 100644
index 0000000..fd7d5f8
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQClientResource.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Map;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractActiveMQClientResource extends ExternalResource {
+    Logger log = LoggerFactory.getLogger(this.getClass());
+
+    ActiveMQConnectionFactory connectionFactory;
+    Connection connection;
+    Session session;
+    ActiveMQDestination destination;
+
+    public AbstractActiveMQClientResource(ActiveMQConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public AbstractActiveMQClientResource(URI brokerURI) {
+        this(new ActiveMQConnectionFactory(brokerURI));
+    }
+
+    public AbstractActiveMQClientResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        this(embeddedActiveMQBroker.createConnectionFactory());
+    }
+
+    public AbstractActiveMQClientResource(URI brokerURI, String userName, String password) {
+        this(new ActiveMQConnectionFactory(userName, password, brokerURI));
+    }
+
+    public AbstractActiveMQClientResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
+        this(connectionFactory);
+        destination = createDestination(destinationName);
+    }
+
+    public AbstractActiveMQClientResource(String destinationName, URI brokerURI) {
+        this(destinationName, new ActiveMQConnectionFactory(brokerURI));
+    }
+
+    public AbstractActiveMQClientResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        this(destinationName, embeddedActiveMQBroker.createConnectionFactory());
+    }
+
+    public AbstractActiveMQClientResource(String destinationName, URI brokerURI, String userName, String password) {
+        this(destinationName, new ActiveMQConnectionFactory(userName, password, brokerURI));
+    }
+
+    public static void setMessageProperties(Message message, Map<String, Object> properties) throws JMSException {
+        if (properties != null) {
+            for (Map.Entry<String, Object> property : properties.entrySet()) {
+                message.setObjectProperty(property.getKey(), property.getValue());
+            }
+        }
+    }
+
+    public String getClientId() {
+        return null;
+    }
+
+    public String getDestinationName() {
+        return (destination != null) ? destination.toString() : null;
+    }
+
+    public abstract byte getDestinationType();
+
+    protected abstract void createClient() throws JMSException;
+
+    /**
+     * Start the Client
+     * <p/>
+     * Invoked by JUnit to setup the resource
+     */
+    @Override
+    protected void before() throws Throwable {
+        log.info("Starting {}: {}", this.getClass().getSimpleName(), connectionFactory.getBrokerURL());
+
+        this.start();
+
+        super.before();
+    }
+
+    /**
+     * Stop the Client
+     * <p/>
+     * Invoked by JUnit to tear down the resource
+     */
+    @Override
+    protected void after() {
+        log.info("Stopping {}: {}", this.getClass().getSimpleName(), connectionFactory.getBrokerURL());
+
+        super.after();
+
+        this.stop();
+    }
+
+    public void start() {
+        try {
+            try {
+                connection = connectionFactory.createConnection();
+                String clientId = getClientId();
+                if (clientId != null) {
+                    connection.setClientID(clientId);
+                }
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                createClient();
+            } catch (JMSException jmsEx) {
+                throw new RuntimeException("Producer initialization failed" + this.getClass().getSimpleName(), jmsEx);
+            }
+            connection.start();
+        } catch (JMSException jmsEx) {
+            throw new IllegalStateException("Producer failed to start", jmsEx);
+        }
+        log.info("Ready to produce messages to {}", connectionFactory.getBrokerURL());
+    }
+
+    public void stop() {
+        try {
+            connection.close();
+        } catch (JMSException jmsEx) {
+            log.warn("Exception encountered closing JMS Connection", jmsEx);
+        }
+    }
+
+    public String getBrokerURL() {
+        return connectionFactory.getBrokerURL();
+    }
+
+    protected ActiveMQDestination createDestination(String destinationName) {
+        if (destinationName != null) {
+            return ActiveMQDestination.createDestination(destinationName, getDestinationType());
+        }
+
+        return null;
+    }
+
+    public BytesMessage createBytesMessage() throws JMSException {
+        return session.createBytesMessage();
+    }
+
+    public TextMessage createTextMessage() throws JMSException {
+        return session.createTextMessage();
+    }
+
+    public MapMessage createMapMessage() throws JMSException {
+        return session.createMapMessage();
+    }
+
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return session.createObjectMessage();
+    }
+
+    public StreamMessage createStreamMessage() throws JMSException {
+        return session.createStreamMessage();
+    }
+
+    public BytesMessage createMessage(byte[] body) throws JMSException {
+        return this.createMessage(body, null);
+    }
+
+    public TextMessage createMessage(String body) throws JMSException {
+        return this.createMessage(body, null);
+    }
+
+    public MapMessage createMessage(Map<String, Object> body) throws JMSException {
+        return this.createMessage(body, null);
+    }
+
+    public ObjectMessage createMessage(Serializable body) throws JMSException {
+        return this.createMessage(body, null);
+    }
+
+    public BytesMessage createMessage(byte[] body, Map<String, Object> properties) throws JMSException {
+        BytesMessage message = this.createBytesMessage();
+        if (body != null) {
+            message.writeBytes(body);
+        }
+
+        setMessageProperties(message, properties);
+
+        return message;
+    }
+
+    public TextMessage createMessage(String body, Map<String, Object> properties) throws JMSException {
+        TextMessage message = this.createTextMessage();
+        if (body != null) {
+            message.setText(body);
+        }
+
+        setMessageProperties(message, properties);
+
+        return message;
+    }
+
+    public MapMessage createMessage(Map<String, Object> body, Map<String, Object> properties) throws JMSException {
+        MapMessage message = this.createMapMessage();
+
+        if (body != null) {
+            for (Map.Entry<String, Object> entry : body.entrySet()) {
+                message.setObject(entry.getKey(), entry.getValue());
+            }
+        }
+
+        setMessageProperties(message, properties);
+
+        return message;
+    }
+
+    public ObjectMessage createMessage(Serializable body, Map<String, Object> properties) throws JMSException {
+        ObjectMessage message = this.createObjectMessage();
+
+        if (body != null) {
+            message.setObject(body);
+        }
+
+        setMessageProperties(message, properties);
+
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQConsumerResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQConsumerResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQConsumerResource.java
new file mode 100644
index 0000000..5b04171
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQConsumerResource.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.net.URI;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+public abstract class AbstractActiveMQConsumerResource extends AbstractActiveMQClientResource {
+    MessageConsumer consumer;
+    long defaultReceiveTimout = 50;
+
+    public AbstractActiveMQConsumerResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
+        super(destinationName, connectionFactory);
+    }
+
+    public AbstractActiveMQConsumerResource(String destinationName, URI brokerURI) {
+        super(destinationName, brokerURI);
+    }
+
+    public AbstractActiveMQConsumerResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(destinationName, embeddedActiveMQBroker);
+    }
+
+    public AbstractActiveMQConsumerResource(String destinationName, URI brokerURI, String userName, String password) {
+        super(destinationName, brokerURI, userName, password);
+    }
+
+    public long getDefaultReceiveTimout() {
+        return defaultReceiveTimout;
+    }
+
+    public void setDefaultReceiveTimout(long defaultReceiveTimout) {
+        this.defaultReceiveTimout = defaultReceiveTimout;
+    }
+
+    @Override
+    protected void createClient() throws JMSException {
+        consumer = session.createConsumer(destination);
+    }
+
+    public BytesMessage receiveBytesMessage() throws JMSException {
+        return (BytesMessage) this.receiveMessage();
+    }
+
+    public TextMessage receiveTextMessage() throws JMSException {
+        return (TextMessage) this.receiveMessage();
+    }
+
+    public MapMessage receiveMapMessage() throws JMSException {
+        return (MapMessage) this.receiveMessage();
+    }
+
+    public ObjectMessage receiveObjectMessage() throws JMSException {
+        return (ObjectMessage) this.receiveMessage();
+    }
+
+    public BytesMessage receiveBytesMessage(long timeout) throws JMSException {
+        return (BytesMessage) this.receiveMessage(timeout);
+    }
+
+    public TextMessage receiveTextMessage(long timeout) throws JMSException {
+        return (TextMessage) this.receiveMessage(timeout);
+    }
+
+    public MapMessage receiveMapMessage(long timeout) throws JMSException {
+        return (MapMessage) this.receiveMessage(timeout);
+    }
+
+    public ObjectMessage receiveObjectMessage(long timeout) throws JMSException {
+        return (ObjectMessage) this.receiveMessage(timeout);
+    }
+
+    public Message receiveMessage() throws JMSException {
+        return receiveMessage(defaultReceiveTimout);
+    }
+
+    /**
+     * Receive a message with the given timeout
+     *
+     * @param timeout
+     * @return
+     * @throws JMSException
+     */
+    public Message receiveMessage(long timeout) throws JMSException {
+        Message message = null;
+        if (timeout > 0) {
+            message = consumer.receive(timeout);
+        } else if (timeout == 0) {
+            message = consumer.receiveNoWait();
+        } else {
+            message = consumer.receive();
+        }
+
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQProducerResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQProducerResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQProducerResource.java
new file mode 100644
index 0000000..69e89af
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQProducerResource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Map;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+public abstract class AbstractActiveMQProducerResource extends AbstractActiveMQClientResource {
+    MessageProducer producer;
+
+    public AbstractActiveMQProducerResource(ActiveMQConnectionFactory connectionFactory) {
+        super(connectionFactory);
+    }
+
+    public AbstractActiveMQProducerResource(URI brokerURI) {
+        super(brokerURI);
+    }
+
+    public AbstractActiveMQProducerResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(embeddedActiveMQBroker);
+    }
+
+    public AbstractActiveMQProducerResource(URI brokerURI, String userName, String password) {
+        super(brokerURI, userName, password);
+    }
+
+    public AbstractActiveMQProducerResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
+        super(destinationName, connectionFactory);
+    }
+
+    public AbstractActiveMQProducerResource(String destinationName, URI brokerURI) {
+        super(destinationName, brokerURI);
+    }
+
+    public AbstractActiveMQProducerResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(destinationName, embeddedActiveMQBroker);
+    }
+
+    public AbstractActiveMQProducerResource(String destinationName, URI brokerURI, String userName, String password) {
+        super(destinationName, brokerURI, userName, password);
+    }
+
+    @Override
+    public String getDestinationName() {
+        try {
+            if (producer != null && producer.getDestination() != null) {
+                return producer.getDestination().toString();
+            }
+        } catch (JMSException e) {
+            // eat this
+        }
+
+        return null;
+    }
+
+    public void sendMessage(Message message) throws JMSException {
+        producer.send(message);
+    }
+
+    public BytesMessage sendMessage(byte[] body) throws JMSException {
+        BytesMessage message = this.createMessage(body);
+        sendMessage(message);
+        return message;
+    }
+
+    public TextMessage sendMessage(String body) throws JMSException {
+        TextMessage message = this.createMessage(body);
+        sendMessage(message);
+        return message;
+    }
+
+    public MapMessage sendMessage(Map<String, Object> body) throws JMSException {
+        MapMessage message = this.createMessage(body);
+        sendMessage(message);
+        return message;
+    }
+
+    public ObjectMessage sendMessage(Serializable body) throws JMSException {
+        ObjectMessage message = this.createMessage(body);
+        sendMessage(message);
+        return message;
+    }
+
+    public BytesMessage sendMessageWithProperties(byte[] body, Map<String, Object> properties) throws JMSException {
+        BytesMessage message = this.createMessage(body, properties);
+        sendMessage(message);
+        return message;
+    }
+
+    public TextMessage sendMessageWithProperties(String body, Map<String, Object> properties) throws JMSException {
+        TextMessage message = this.createMessage(body, properties);
+        sendMessage(message);
+        return message;
+    }
+
+    public MapMessage sendMessageWithProperties(Map<String, Object> body, Map<String, Object> properties) throws JMSException {
+        MapMessage message = this.createMessage(body, properties);
+        sendMessage(message);
+        return message;
+    }
+
+    public ObjectMessage sendMessageWithProperties(Serializable body, Map<String, Object> properties) throws JMSException {
+        ObjectMessage message = this.createMessage(body, properties);
+        sendMessage(message);
+        return message;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicQueueSenderResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicQueueSenderResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicQueueSenderResource.java
new file mode 100644
index 0000000..1e355ae
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicQueueSenderResource.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Map;
+import javax.jms.BytesMessage;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class ActiveMQDynamicQueueSenderResource extends AbstractActiveMQProducerResource {
+    public ActiveMQDynamicQueueSenderResource(ActiveMQConnectionFactory connectionFactory) {
+        super(connectionFactory);
+    }
+
+    public ActiveMQDynamicQueueSenderResource(URI brokerURI) {
+        super(brokerURI);
+    }
+
+    public ActiveMQDynamicQueueSenderResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(embeddedActiveMQBroker);
+    }
+
+    public ActiveMQDynamicQueueSenderResource(URI brokerURI, String userName, String password) {
+        super(brokerURI, userName, password);
+    }
+
+    public ActiveMQDynamicQueueSenderResource(String defaultDestinationName, ActiveMQConnectionFactory connectionFactory) {
+        super(defaultDestinationName, connectionFactory);
+    }
+
+    public ActiveMQDynamicQueueSenderResource(String defaultDestinationName, URI brokerURI) {
+        super(defaultDestinationName, brokerURI);
+    }
+
+    public ActiveMQDynamicQueueSenderResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(destinationName, embeddedActiveMQBroker);
+    }
+
+    public ActiveMQDynamicQueueSenderResource(String defaultDestinationName, URI brokerURI, String userName, String password) {
+        super(defaultDestinationName, brokerURI, userName, password);
+    }
+
+    @Override
+    protected void createClient() throws JMSException {
+        producer = session.createProducer(null);
+    }
+
+    @Override
+    public byte getDestinationType() {
+        return ActiveMQDestination.QUEUE_TYPE;
+    }
+
+    @Override
+    public void sendMessage(Message message) throws JMSException {
+        if (destination == null) {
+            throw new IllegalStateException("Destination is not specified");
+        }
+
+        producer.send(destination, message);
+    }
+
+    public void sendMessage(String destinationName, Message message) throws JMSException {
+        producer.send(createDestination(destinationName), message);
+    }
+
+    public BytesMessage sendMessage(String destinationName, byte[] body) throws JMSException {
+        BytesMessage message = this.createMessage(body);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public TextMessage sendMessage(String destinationName, String body) throws JMSException {
+        TextMessage message = this.createMessage(body);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public MapMessage sendMessage(String destinationName, Map<String, Object> body) throws JMSException {
+        MapMessage message = this.createMessage(body);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public ObjectMessage sendMessage(String destinationName, Serializable body) throws JMSException {
+        ObjectMessage message = this.createMessage(body);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public BytesMessage sendMessageWithProperties(String destinationName, byte[] body, Map<String, Object> properties) throws JMSException {
+        BytesMessage message = this.createMessage(body, properties);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public TextMessage sendMessageWithProperties(String destinationName, String body, Map<String, Object> properties) throws JMSException {
+        TextMessage message = this.createMessage(body, properties);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public MapMessage sendMessageWithProperties(String destinationName, Map<String, Object> body, Map<String, Object> properties) throws JMSException {
+        MapMessage message = this.createMessage(body, properties);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public ObjectMessage sendMessageWithProperties(String destinationName, Serializable body, Map<String, Object> properties) throws JMSException {
+        ObjectMessage message = this.createMessage(body, properties);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicTopicPublisherResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicTopicPublisherResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicTopicPublisherResource.java
new file mode 100644
index 0000000..8181946
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicTopicPublisherResource.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Map;
+import javax.jms.BytesMessage;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class ActiveMQDynamicTopicPublisherResource extends AbstractActiveMQProducerResource {
+    public ActiveMQDynamicTopicPublisherResource(ActiveMQConnectionFactory connectionFactory) {
+        super(connectionFactory);
+    }
+
+    public ActiveMQDynamicTopicPublisherResource(URI brokerURI) {
+        super(brokerURI);
+    }
+
+    public ActiveMQDynamicTopicPublisherResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(embeddedActiveMQBroker);
+    }
+
+    public ActiveMQDynamicTopicPublisherResource(URI brokerURI, String userName, String password) {
+        super(brokerURI, userName, password);
+    }
+
+    public ActiveMQDynamicTopicPublisherResource(String defaultDestinationName, ActiveMQConnectionFactory connectionFactory) {
+        super(defaultDestinationName, connectionFactory);
+    }
+
+    public ActiveMQDynamicTopicPublisherResource(String defaultDestinationName, URI brokerURI) {
+        super(defaultDestinationName, brokerURI);
+    }
+
+    public ActiveMQDynamicTopicPublisherResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(destinationName, embeddedActiveMQBroker);
+    }
+
+    public ActiveMQDynamicTopicPublisherResource(String defaultDestinationName, URI brokerURI, String userName, String password) {
+        super(defaultDestinationName, brokerURI, userName, password);
+    }
+
+    @Override
+    protected void createClient() throws JMSException {
+        producer = session.createProducer(null);
+    }
+
+    @Override
+    public byte getDestinationType() {
+        return ActiveMQDestination.TOPIC_TYPE;
+    }
+
+    @Override
+    public void sendMessage(Message message) throws JMSException {
+        if (destination == null) {
+            throw new IllegalStateException("Destination is not specified");
+        }
+
+        producer.send(destination, message);
+    }
+
+    public void sendMessage(String destinationName, Message message) throws JMSException {
+        producer.send(createDestination(destinationName), message);
+    }
+
+    public BytesMessage sendMessage(String destinationName, byte[] body) throws JMSException {
+        BytesMessage message = this.createMessage(body);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public TextMessage sendMessage(String destinationName, String body) throws JMSException {
+        TextMessage message = this.createMessage(body);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public MapMessage sendMessage(String destinationName, Map<String, Object> body) throws JMSException {
+        MapMessage message = this.createMessage(body);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public ObjectMessage sendMessage(String destinationName, Serializable body) throws JMSException {
+        ObjectMessage message = this.createMessage(body);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public BytesMessage sendMessageWithProperties(String destinationName, byte[] body, Map<String, Object> properties) throws JMSException {
+        BytesMessage message = this.createMessage(body, properties);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public TextMessage sendMessageWithProperties(String destinationName, String body, Map<String, Object> properties) throws JMSException {
+        TextMessage message = this.createMessage(body, properties);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public MapMessage sendMessageWithProperties(String destinationName, Map<String, Object> body, Map<String, Object> properties) throws JMSException {
+        MapMessage message = this.createMessage(body, properties);
+        sendMessage(destinationName, message);
+        return message;
+    }
+
+    public ObjectMessage sendMessageWithProperties(String destinationName, Serializable body, Map<String, Object> properties) throws JMSException {
+        ObjectMessage message = this.createMessage(body, properties);
+        sendMessage(destinationName, message);
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueReceiverResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueReceiverResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueReceiverResource.java
new file mode 100644
index 0000000..a974836
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueReceiverResource.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.net.URI;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class ActiveMQQueueReceiverResource extends AbstractActiveMQConsumerResource {
+    public ActiveMQQueueReceiverResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
+        super(destinationName, connectionFactory);
+    }
+
+    public ActiveMQQueueReceiverResource(String destinationName, URI brokerURI) {
+        super(destinationName, brokerURI);
+    }
+
+    public ActiveMQQueueReceiverResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(destinationName, embeddedActiveMQBroker);
+    }
+
+    public ActiveMQQueueReceiverResource(String destinationName, URI brokerURI, String userName, String password) {
+        super(destinationName, brokerURI, userName, password);
+    }
+
+    @Override
+    public byte getDestinationType() {
+        return ActiveMQDestination.QUEUE_TYPE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueSenderResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueSenderResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueSenderResource.java
new file mode 100644
index 0000000..fa3cdca
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueSenderResource.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.net.URI;
+import javax.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class ActiveMQQueueSenderResource extends AbstractActiveMQProducerResource {
+    public ActiveMQQueueSenderResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
+        super(destinationName, connectionFactory);
+    }
+
+    public ActiveMQQueueSenderResource(String destinationName, URI brokerURI) {
+        super(destinationName, brokerURI);
+    }
+
+    public ActiveMQQueueSenderResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(destinationName, embeddedActiveMQBroker);
+    }
+
+    public ActiveMQQueueSenderResource(String destinationName, URI brokerURI, String userName, String password) {
+        super(destinationName, brokerURI, userName, password);
+    }
+
+    @Override
+    public byte getDestinationType() {
+        return ActiveMQDestination.QUEUE_TYPE;
+    }
+
+    @Override
+    protected void createClient() throws JMSException {
+        producer = session.createProducer(destination);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicDurableSubscriberResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicDurableSubscriberResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicDurableSubscriberResource.java
new file mode 100644
index 0000000..58bb6a5
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicDurableSubscriberResource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.net.URI;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class ActiveMQTopicDurableSubscriberResource extends AbstractActiveMQConsumerResource {
+    String clientId = "test-client-id";
+    String subscriberName = "test-subscriber";
+
+    public ActiveMQTopicDurableSubscriberResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
+        super(destinationName, connectionFactory);
+    }
+
+    public ActiveMQTopicDurableSubscriberResource(String destinationName, URI brokerURI) {
+        super(destinationName, brokerURI);
+    }
+
+    public ActiveMQTopicDurableSubscriberResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(destinationName, embeddedActiveMQBroker);
+    }
+
+    public ActiveMQTopicDurableSubscriberResource(String destinationName, URI brokerURI, String userName, String password) {
+        super(destinationName, brokerURI, userName, password);
+    }
+
+    @Override
+    public byte getDestinationType() {
+        return ActiveMQDestination.TOPIC_TYPE;
+    }
+
+    @Override
+    protected void createClient() throws JMSException {
+        consumer = session.createDurableSubscriber((Topic) destination, subscriberName);
+    }
+
+    @Override
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getSubscriberName() {
+        return subscriberName;
+    }
+
+    public void setSubscriberName(String subscriberName) {
+        this.subscriberName = subscriberName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicPublisherResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicPublisherResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicPublisherResource.java
new file mode 100644
index 0000000..0def0e9
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicPublisherResource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.net.URI;
+import javax.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class ActiveMQTopicPublisherResource extends AbstractActiveMQProducerResource {
+    public ActiveMQTopicPublisherResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
+        super(destinationName, connectionFactory);
+    }
+
+    public ActiveMQTopicPublisherResource(String destinationName, URI brokerURI) {
+        super(destinationName, brokerURI);
+    }
+
+    public ActiveMQTopicPublisherResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(destinationName, embeddedActiveMQBroker);
+    }
+
+    public ActiveMQTopicPublisherResource(String destinationName, URI brokerURI, String userName, String password) {
+        super(destinationName, brokerURI, userName, password);
+    }
+
+    @Override
+    public String getDestinationName() {
+        try {
+            if (producer != null && producer.getDestination() != null) {
+                return producer.getDestination().toString();
+            }
+        } catch (JMSException e) {
+            // eat this
+        }
+
+        return null;
+    }
+
+    @Override
+    public byte getDestinationType() {
+        return ActiveMQDestination.TOPIC_TYPE;
+    }
+
+    @Override
+    protected void createClient() throws JMSException {
+        producer = session.createProducer(destination);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicSubscriberResource.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicSubscriberResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicSubscriberResource.java
new file mode 100644
index 0000000..547ff11
--- /dev/null
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicSubscriberResource.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.junit;
+
+import java.net.URI;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class ActiveMQTopicSubscriberResource extends AbstractActiveMQConsumerResource {
+    public ActiveMQTopicSubscriberResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
+        super(destinationName, connectionFactory);
+    }
+
+    public ActiveMQTopicSubscriberResource(String destinationName, URI brokerURI) {
+        super(destinationName, brokerURI);
+    }
+
+    public ActiveMQTopicSubscriberResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
+        super(destinationName, embeddedActiveMQBroker);
+    }
+
+    public ActiveMQTopicSubscriberResource(String destinationName, URI brokerURI, String userName, String password) {
+        super(destinationName, brokerURI, userName, password);
+    }
+
+    @Override
+    public byte getDestinationType() {
+        return ActiveMQDestination.TOPIC_TYPE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bab4a92d/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/EmbeddedActiveMQBroker.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/EmbeddedActiveMQBroker.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/EmbeddedActiveMQBroker.java
index 3e328e8..d1877ba 100644
--- a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/EmbeddedActiveMQBroker.java
+++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/EmbeddedActiveMQBroker.java
@@ -16,23 +16,37 @@
  */
 package org.apache.activemq.junit;
 
+import java.io.Serializable;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.plugin.StatisticsBrokerPlugin;
 import org.apache.activemq.pool.PooledConnectionFactory;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
+
 /**
  * A JUnit Rule that embeds an ActiveMQ broker into a test.
  */
@@ -40,15 +54,15 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
     Logger log = LoggerFactory.getLogger(this.getClass());
 
     BrokerService brokerService;
+    InternalClient internalClient;
 
     /**
      * Create an embedded ActiveMQ broker using defaults
-     *
+     * <p>
      * The defaults are:
-     *  - the broker name is 'embedded-broker'
-     *  - JMX is disabled
-     *  - Persistence is disabled
-     *
+     * - the broker name is 'embedded-broker'
+     * - JMX is disabled
+     * - Persistence is disabled
      */
     public EmbeddedActiveMQBroker() {
         brokerService = new BrokerService();
@@ -61,7 +75,7 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
     /**
      * Create an embedded ActiveMQ broker using a configuration URI
      */
-    public EmbeddedActiveMQBroker(String configurationURI ) {
+    public EmbeddedActiveMQBroker(String configurationURI) {
         try {
             brokerService = BrokerFactory.createBroker(configurationURI);
         } catch (Exception ex) {
@@ -72,7 +86,7 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
     /**
      * Create an embedded ActiveMQ broker using a configuration URI
      */
-    public EmbeddedActiveMQBroker(URI configurationURI ) {
+    public EmbeddedActiveMQBroker(URI configurationURI) {
         try {
             brokerService = BrokerFactory.createBroker(configurationURI);
         } catch (Exception ex) {
@@ -80,13 +94,26 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
         }
     }
 
+    public static void setMessageProperties(Message message, Map<String, Object> properties) {
+        if (properties != null && properties.size() > 0) {
+            for (Map.Entry<String, Object> property : properties.entrySet()) {
+                try {
+                    message.setObjectProperty(property.getKey(), property.getValue());
+                } catch (JMSException jmsEx) {
+                    throw new EmbeddedActiveMQBrokerException(String.format("Failed to set property {%s = %s}", property.getKey(), property.getValue().toString()), jmsEx);
+                }
+            }
+        }
+    }
+
     /**
      * Customize the configuration of the embedded ActiveMQ broker
-     *
+     * <p>
      * This method is called before the embedded ActiveMQ broker is started, and can
      * be overridden to this method to customize the broker configuration.
      */
-    protected void configure() {}
+    protected void configure() {
+    }
 
     /**
      * Start the embedded ActiveMQ broker, blocking until the broker has successfully started.
@@ -98,6 +125,8 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
         try {
             this.configure();
             brokerService.start();
+            internalClient = new InternalClient();
+            internalClient.start();
         } catch (Exception ex) {
             throw new RuntimeException("Exception encountered starting embedded ActiveMQ broker: {}" + this.getBrokerName(), ex);
         }
@@ -112,6 +141,10 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
      * be stopped manually to support advanced testing scenarios.
      */
     public void stop() {
+        if (internalClient != null) {
+            internalClient.stop();
+            internalClient = null;
+        }
         if (!brokerService.isStopped()) {
             try {
                 brokerService.stop();
@@ -158,7 +191,7 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
      */
     public ActiveMQConnectionFactory createConnectionFactory() {
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
-        connectionFactory.setBrokerURL(brokerService.getVmConnectorURI().toString());
+        connectionFactory.setBrokerURL(getVmURL());
         return connectionFactory;
     }
 
@@ -187,15 +220,64 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
     }
 
     /**
-     * Get the VM URL for the embedded ActiveMQ Broker
+     * Get the failover VM URL for the embedded ActiveMQ Broker
      * <p/>
-     * NOTE:  The option is precreate=false option is appended to the URL to avoid the automatic creation of brokers
+     * NOTE:  The create=false option is appended to the URL to avoid the automatic creation of brokers
      * and the resulting duplicate broker errors
      *
      * @return the VM URL for the embedded broker
      */
     public String getVmURL() {
-        return String.format("failover:(%s?create=false)", brokerService.getVmConnectorURI().toString());
+        return getVmURL(true);
+    }
+
+    /**
+     * Get the VM URL for the embedded ActiveMQ Broker
+     * <p/>
+     * NOTE:  The create=false option is appended to the URL to avoid the automatic creation of brokers
+     * and the resulting duplicate broker errors
+     *
+     * @param failoverURL if true a failover URL will be returned
+     * @return the VM URL for the embedded broker
+     */
+    public String getVmURL(boolean failoverURL) {
+        if (failoverURL) {
+            return String.format("failover:(%s?create=false)", brokerService.getVmConnectorURI().toString());
+        }
+
+        return brokerService.getVmConnectorURI().toString() + "?create=false";
+    }
+
+    /**
+     * Get the failover VM URI for the embedded ActiveMQ Broker
+     * <p/>
+     * NOTE:  The create=false option is appended to the URI to avoid the automatic creation of brokers
+     * and the resulting duplicate broker errors
+     *
+     * @return the VM URI for the embedded broker
+     */
+    public URI getVmURI() {
+        return getVmURI(true);
+    }
+
+    /**
+     * Get the VM URI for the embedded ActiveMQ Broker
+     * <p/>
+     * NOTE:  The create=false option is appended to the URI to avoid the automatic creation of brokers
+     * and the resulting duplicate broker errors
+     *
+     * @param failoverURI if true a failover URI will be returned
+     * @return the VM URI for the embedded broker
+     */
+    public URI getVmURI(boolean failoverURI) {
+        URI result;
+        try {
+            result = new URI(getVmURL(failoverURI));
+        } catch (URISyntaxException uriEx) {
+            throw new RuntimeException("Unable to create failover URI", uriEx);
+        }
+
+        return result;
     }
 
     /**
@@ -326,64 +408,53 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
     /**
      * Get the number of messages in a specific JMS Destination.
      * <p/>
-     * The full name of the JMS destination including the prefix should be provided - i.e. queue:myQueue
-     * or topic:myTopic.  If the destination type prefix is not included in the destination name, a prefix
-     * of "queue:" is assumed.
+     * The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
+     * or topic://myTopic.  If the destination type prefix is not included in the destination name, a prefix
+     * of "queue://" is assumed.
      *
-     * @param fullDestinationName the full name of the JMS Destination
+     * @param destinationName the full name of the JMS Destination
      * @return the number of messages in the JMS Destination
      */
-    public int getMessageCount(String fullDestinationName) throws Exception {
-        final int QUEUE_TYPE = 1;
-        final int TOPIC_TYPE = 2;
-
+    public long getMessageCount(String destinationName) {
         if (null == brokerService) {
             throw new IllegalStateException("BrokerService has not yet been created - was before() called?");
         }
 
-        int destinationType = QUEUE_TYPE;
-        String destinationName = fullDestinationName;
-
-        if (fullDestinationName.startsWith("queue:")) {
-            destinationName = fullDestinationName.substring(fullDestinationName.indexOf(':') + 1);
-        } else if (fullDestinationName.startsWith("topic:")) {
-            destinationType = TOPIC_TYPE;
-            destinationName = fullDestinationName.substring(fullDestinationName.indexOf(':') + 1);
-        }
-
-        int messageCount = -1;
-        boolean foundDestination = false;
-        for (Destination destination : brokerService.getBroker().getDestinationMap().values()) {
-            String tmpName = destination.getName();
-            if (tmpName.equalsIgnoreCase(destinationName)) {
-                switch (destinationType) {
-                    case QUEUE_TYPE:
-                        if (destination instanceof Queue) {
-                            messageCount = destination.getMessageStore().getMessageCount();
-                            foundDestination = true;
-                        }
-                        break;
-                    case TOPIC_TYPE:
-                        if (destination instanceof Topic) {
-                            messageCount = destination.getMessageStore().getMessageCount();
-                            foundDestination = true;
-                        }
-                        break;
-                    default:
-                        // Should never see this
-                        log.error("Type didn't match: {}", destination.getClass().getName());
-                }
-            }
-            if (foundDestination) {
-                break;
-            }
+        // TODO: Figure out how to do this for Topics
+        Destination destination = getDestination(destinationName);
+        if (destination == null) {
+            throw new RuntimeException("Failed to find destination: " + destinationName);
         }
 
-        if (!foundDestination) {
-            log.warn("Didn't find destination {} in broker {}", fullDestinationName, getBrokerName());
+        // return destination.getMessageStore().getMessageCount();
+        return destination.getDestinationStatistics().getMessages().getCount();
+    }
+
+    /**
+     * Get the ActiveMQ destination
+     * <p/>
+     * The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
+     * or topic://myTopic.  If the destination type prefix is not included in the destination name, a prefix
+     * of "queue://" is assumed.
+     *
+     * @param destinationName the full name of the JMS Destination
+     * @return the ActiveMQ destination, null if not found
+     */
+    public Destination getDestination(String destinationName) {
+        if (null == brokerService) {
+            throw new IllegalStateException("BrokerService has not yet been created - was before() called?");
         }
 
-        return messageCount;
+        Destination destination = null;
+        try {
+            destination = brokerService.getDestination(ActiveMQDestination.createDestination(destinationName, QUEUE_TYPE));
+        } catch (RuntimeException runtimeEx) {
+            throw runtimeEx;
+        } catch (Exception ex) {
+            throw new EmbeddedActiveMQBrokerException("Unexpected exception getting destination from broker", ex);
+        }
+
+        return destination;
     }
 
     private PolicyEntry getDefaultPolicyEntry() {
@@ -401,4 +472,323 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
 
         return defaultEntry;
     }
+
+    public BytesMessage createBytesMessage() {
+        return internalClient.createBytesMessage();
+    }
+
+    public TextMessage createTextMessage() {
+        return internalClient.createTextMessage();
+    }
+
+    public MapMessage createMapMessage() {
+        return internalClient.createMapMessage();
+    }
+
+    public ObjectMessage createObjectMessage() {
+        return internalClient.createObjectMessage();
+    }
+
+    public StreamMessage createStreamMessage() {
+        return internalClient.createStreamMessage();
+    }
+
+    public BytesMessage createMessage(byte[] body) {
+        return this.createMessage(body, null);
+    }
+
+    public TextMessage createMessage(String body) {
+        return this.createMessage(body, null);
+    }
+
+    public MapMessage createMessage(Map<String, Object> body) {
+        return this.createMessage(body, null);
+    }
+
+    public ObjectMessage createMessage(Serializable body) {
+        return this.createMessage(body, null);
+    }
+
+    public BytesMessage createMessage(byte[] body, Map<String, Object> properties) {
+        BytesMessage message = this.createBytesMessage();
+        if (body != null) {
+            try {
+                message.writeBytes(body);
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on BytesMessage", new String(body)), jmsEx);
+            }
+        }
+
+        setMessageProperties(message, properties);
+
+        return message;
+    }
+
+    public TextMessage createMessage(String body, Map<String, Object> properties) {
+        TextMessage message = this.createTextMessage();
+        if (body != null) {
+            try {
+                message.setText(body);
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on TextMessage", body), jmsEx);
+            }
+        }
+
+        setMessageProperties(message, properties);
+
+        return message;
+    }
+
+    public MapMessage createMessage(Map<String, Object> body, Map<String, Object> properties) {
+        MapMessage message = this.createMapMessage();
+
+        if (body != null) {
+            for (Map.Entry<String, Object> entry : body.entrySet()) {
+                try {
+                    message.setObject(entry.getKey(), entry.getValue());
+                } catch (JMSException jmsEx) {
+                    throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body entry {%s = %s} on MapMessage", entry.getKey(), entry.getValue().toString()), jmsEx);
+                }
+            }
+        }
+
+        setMessageProperties(message, properties);
+
+        return message;
+    }
+
+    public ObjectMessage createMessage(Serializable body, Map<String, Object> properties) {
+        ObjectMessage message = this.createObjectMessage();
+
+        if (body != null) {
+            try {
+                message.setObject(body);
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on ObjectMessage", body.toString()), jmsEx);
+            }
+        }
+
+        setMessageProperties(message, properties);
+
+        return message;
+    }
+
+    public void pushMessage(String destinationName, Message message) {
+        if (destinationName == null) {
+            throw new IllegalArgumentException("pushMessage failure - destination name is required");
+        } else if (message == null) {
+            throw new IllegalArgumentException("pushMessage failure - a Message is required");
+        }
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
+
+        internalClient.pushMessage(destination, message);
+    }
+
+    public BytesMessage pushMessage(String destinationName, byte[] body) {
+        BytesMessage message = createMessage(body, null);
+        pushMessage(destinationName, message);
+        return message;
+    }
+
+    public TextMessage pushMessage(String destinationName, String body) {
+        TextMessage message = createMessage(body, null);
+        pushMessage(destinationName, message);
+        return message;
+    }
+
+    public MapMessage pushMessage(String destinationName, Map<String, Object> body) {
+        MapMessage message = createMessage(body, null);
+        pushMessage(destinationName, message);
+        return message;
+    }
+
+    public ObjectMessage pushMessage(String destinationName, Serializable body) {
+        ObjectMessage message = createMessage(body, null);
+        pushMessage(destinationName, message);
+        return message;
+    }
+
+    public BytesMessage pushMessageWithProperties(String destinationName, byte[] body, Map<String, Object> properties) {
+        BytesMessage message = createMessage(body, properties);
+        pushMessage(destinationName, message);
+        return message;
+    }
+
+    public TextMessage pushMessageWithProperties(String destinationName, String body, Map<String, Object> properties) {
+        TextMessage message = createMessage(body, properties);
+        pushMessage(destinationName, message);
+        return message;
+    }
+
+    public MapMessage pushMessageWithProperties(String destinationName, Map<String, Object> body, Map<String, Object> properties) {
+        MapMessage message = createMessage(body, properties);
+        pushMessage(destinationName, message);
+        return message;
+    }
+
+    public ObjectMessage pushMessageWithProperties(String destinationName, Serializable body, Map<String, Object> properties) {
+        ObjectMessage message = createMessage(body, properties);
+        pushMessage(destinationName, message);
+        return message;
+    }
+
+
+    public Message peekMessage(String destinationName) {
+        if (null == brokerService) {
+            throw new NullPointerException("peekMessage failure  - BrokerService is null");
+        }
+
+        if (destinationName == null) {
+            throw new IllegalArgumentException("peekMessage failure - destination name is required");
+        }
+
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
+        Destination brokerDestination = null;
+
+        try {
+            brokerDestination = brokerService.getDestination(destination);
+        } catch (Exception ex) {
+            throw new EmbeddedActiveMQBrokerException("peekMessage failure - unexpected exception getting destination from BrokerService", ex);
+        }
+
+        if (brokerDestination == null) {
+            throw new IllegalStateException(String.format("peekMessage failure - destination %s not found in broker %s", destination.toString(), brokerService.getBrokerName()));
+        }
+
+        org.apache.activemq.command.Message[] messages = brokerDestination.browse();
+        if (messages != null && messages.length > 0) {
+            return (Message) messages[0];
+        }
+
+        return null;
+    }
+
+    public BytesMessage peekBytesMessage(String destinationName) {
+        return (BytesMessage) peekMessage(destinationName);
+    }
+
+    public TextMessage peekTextMessage(String destinationName) {
+        return (TextMessage) peekMessage(destinationName);
+    }
+
+    public MapMessage peekMapMessage(String destinationName) {
+        return (MapMessage) peekMessage(destinationName);
+    }
+
+    public ObjectMessage peekObjectMessage(String destinationName) {
+        return (ObjectMessage) peekMessage(destinationName);
+    }
+
+    public StreamMessage peekStreamMessage(String destinationName) {
+        return (StreamMessage) peekMessage(destinationName);
+    }
+
+    public static class EmbeddedActiveMQBrokerException extends RuntimeException {
+        public EmbeddedActiveMQBrokerException(String message) {
+            super(message);
+        }
+
+        public EmbeddedActiveMQBrokerException(String message, Exception cause) {
+            super(message, cause);
+        }
+    }
+
+    private class InternalClient {
+        ActiveMQConnectionFactory connectionFactory;
+        Connection connection;
+        Session session;
+        MessageProducer producer;
+
+        public InternalClient() {
+        }
+
+        void start() {
+            connectionFactory = createConnectionFactory();
+            try {
+                connection = connectionFactory.createConnection();
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                producer = session.createProducer(null);
+                connection.start();
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException("Internal Client creation failure", jmsEx);
+            }
+        }
+
+        void stop() {
+            if (null != connection) {
+                try {
+                    connection.close();
+                } catch (JMSException jmsEx) {
+                    log.warn("JMSException encounter closing InternalClient connection - ignoring", jmsEx);
+                }
+            }
+        }
+
+        public BytesMessage createBytesMessage() {
+            checkSession();
+
+            try {
+                return session.createBytesMessage();
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException("Failed to create BytesMessage", jmsEx);
+            }
+        }
+
+        public TextMessage createTextMessage() {
+            checkSession();
+
+            try {
+                return session.createTextMessage();
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException("Failed to create TextMessage", jmsEx);
+            }
+        }
+
+        public MapMessage createMapMessage() {
+            checkSession();
+
+            try {
+                return session.createMapMessage();
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException("Failed to create MapMessage", jmsEx);
+            }
+        }
+
+        public ObjectMessage createObjectMessage() {
+            checkSession();
+
+            try {
+                return session.createObjectMessage();
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException("Failed to create ObjectMessage", jmsEx);
+            }
+        }
+
+        public StreamMessage createStreamMessage() {
+            checkSession();
+            try {
+                return session.createStreamMessage();
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException("Failed to create StreamMessage", jmsEx);
+            }
+        }
+
+        public void pushMessage(ActiveMQDestination destination, Message message) {
+            if (producer == null) {
+                throw new IllegalStateException("JMS MessageProducer is null - has the InternalClient been started?");
+            }
+
+            try {
+                producer.send(destination, message);
+            } catch (JMSException jmsEx) {
+                throw new EmbeddedActiveMQBrokerException(String.format("Failed to push %s to %s", message.getClass().getSimpleName(), destination.toString()), jmsEx);
+            }
+        }
+
+        void checkSession() {
+            if (session == null) {
+                throw new IllegalStateException("JMS Session is null - has the InternalClient been started?");
+            }
+        }
+    }
 }


Mime
View raw message