rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject incubator-rocketmq git commit: Add openmessaging unit tests. [Forced Update!]
Date Sun, 23 Apr 2017 12:38:20 GMT
Repository: incubator-rocketmq
Updated Branches:
  refs/heads/openmessaging-impl 00a166e8f -> 6edeb8317 (forced update)


Add openmessaging unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6edeb831
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6edeb831
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6edeb831

Branch: refs/heads/openmessaging-impl
Commit: 6edeb8317331aa089b32148c4659663d4dfb0836
Parents: 9c1dc74
Author: yukon <yukon@apache.org>
Authored: Fri Apr 21 13:48:05 2017 +0800
Committer: yukon <yukon@apache.org>
Committed: Sun Apr 23 20:35:41 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/producer/ProducerImpl.java         |   2 +-
 .../rocketmq/producer/SequenceProducerImpl.java |   4 +
 .../rocketmq/promise/DefaultPromise.java        |   8 +-
 .../consumer/LocalMessageCacheTest.java         |  89 ++++++++++++
 .../rocketmq/consumer/PullConsumerImplTest.java |  96 +++++++++++++
 .../rocketmq/consumer/PushConsumerImplTest.java |  87 ++++++++++++
 .../rocketmq/producer/ProducerImplTest.java     | 101 ++++++++++++++
 .../producer/SequenceProducerImplTest.java      |  86 ++++++++++++
 .../rocketmq/promise/DefaultPromiseTest.java    | 136 +++++++++++++++++++
 .../rocketmq/utils/BeanUtilsTest.java           | 110 +++++++++++++++
 10 files changed, 714 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
index 8b2ddd2..f644e7d 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -62,7 +62,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer
{
             org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage,
timeout);
             if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
                 log.error(String.format("Send message to RocketMQ failed, %s", message));
-                throw new OMSRuntimeException("-1", "Send message to RocketMQ failed.");
+                throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
             }
             message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
             return OMSUtil.sendResultConvert(rmqResult);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
index 58b1a12..f03826e 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
@@ -72,6 +72,10 @@ public class SequenceProducerImpl extends AbstractOMSProducer implements
Sequenc
             rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
         }
 
+        if (rmqMessages.size() == 0) {
+            return;
+        }
+
         try {
             SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
             String [] msgIdArray = sendResult.getMsgId().split(",");

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
index 43f96ce..3e4bd26 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -32,7 +32,7 @@ public class DefaultPromise<V> implements Promise<V> {
     private long timeout;
     private long createTime;
     private Throwable exception = null;
-    private List<PromiseListener> promiseListenerList;
+    private List<PromiseListener<V>> promiseListenerList;
 
     public DefaultPromise() {
         createTime = System.currentTimeMillis();
@@ -120,7 +120,7 @@ public class DefaultPromise<V> implements Promise<V> {
     }
 
     @Override
-    public void addListener(final PromiseListener listener) {
+    public void addListener(final PromiseListener<V> listener) {
         if (listener == null) {
             throw new NullPointerException("FutureListener is null");
         }
@@ -149,7 +149,7 @@ public class DefaultPromise<V> implements Promise<V> {
 
     private void notifyListeners() {
         if (promiseListenerList != null) {
-            for (PromiseListener listener : promiseListenerList) {
+            for (PromiseListener<V> listener : promiseListenerList) {
                 notifyListener(listener);
             }
         }
@@ -165,7 +165,7 @@ public class DefaultPromise<V> implements Promise<V> {
                 return;
             }
             state = FutureState.CANCELLED;
-            exception = new RuntimeException("get request result is timeout or interrupted");
+            exception = new RuntimeException("Get request result is timeout or interrupted");
             lock.notifyAll();
         }
         notifyListeners();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
new file mode 100644
index 0000000..ae4d3ed
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LocalMessageCacheTest {
+    private LocalMessageCache localMessageCache;
+    @Mock
+    private DefaultMQPullConsumer rocketmqPullConsume;
+    @Mock
+    private ConsumeRequest consumeRequest;
+
+    @Before
+    public void init() {
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.setRmqPullMessageBatchNums(512);
+        clientConfig.setRmqPullMessageCacheCapacity(1024);
+        localMessageCache = new LocalMessageCache(rocketmqPullConsume, clientConfig);
+    }
+
+    @Test
+    public void testNextPullBatchNums() throws Exception {
+        assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(512);
+        for (int i = 0; i < 513; i++) {
+            localMessageCache.submitConsumeRequest(consumeRequest);
+        }
+        assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(511);
+    }
+
+    @Test
+    public void testNextPullOffset() throws Exception {
+        MessageQueue messageQueue = new MessageQueue();
+        when(rocketmqPullConsume.fetchConsumeOffset(any(MessageQueue.class), anyBoolean()))
+            .thenReturn(123L);
+        assertThat(localMessageCache.nextPullOffset(new MessageQueue())).isEqualTo(123L);
+    }
+
+    @Test
+    public void testUpdatePullOffset() throws Exception {
+        MessageQueue messageQueue = new MessageQueue();
+        localMessageCache.updatePullOffset(messageQueue, 124L);
+        assertThat(localMessageCache.nextPullOffset(messageQueue)).isEqualTo(124L);
+    }
+
+    @Test
+    public void testSubmitConsumeRequest() throws Exception {
+        byte [] body = new byte[]{'1', '2', '3'};
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(body);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic("HELLO_QUEUE");
+
+        when(consumeRequest.getMessageExt()).thenReturn(consumedMsg);
+        localMessageCache.submitConsumeRequest(consumeRequest);
+        assertThat(localMessageCache.poll()).isEqualTo(consumedMsg);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
new file mode 100644
index 0000000..277a5c6
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullConsumerImplTest {
+    private PullConsumer consumer;
+    private String queueName = "HELLO_QUEUE";
+
+    @Mock
+    private DefaultMQPullConsumer rocketmqPullConsumer;
+    private LocalMessageCache localMessageCache =
+        spy(new LocalMessageCache(rocketmqPullConsumer, new ClientConfig()));
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        consumer = messagingAccessPoint.createPullConsumer(queueName,
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+
+        Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
+        field.setAccessible(true);
+        field.set(consumer, rocketmqPullConsumer); //Replace
+
+        field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
+        field.setAccessible(true);
+        field.set(consumer, localMessageCache);
+
+        messagingAccessPoint.startup();
+        consumer.startup();
+    }
+
+    @Test
+    public void testPoll() {
+        final byte[] testBody = new byte[] {'a', 'b'};
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(testBody);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic(queueName);
+
+        when(localMessageCache.poll()).thenReturn(consumedMsg);
+
+        Message message = consumer.poll();
+        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+        assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
+    }
+
+    @Test
+    public void testPoll_WithTimeout() {
+        //There is a default timeout value, @see ClientConfig#omsOperationTimeout.
+        Message message = consumer.poll();
+        assertThat(message).isNull();
+
+        message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
+        assertThat(message).isNull();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
new file mode 100644
index 0000000..882e57e
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessageListener;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PushConsumerImplTest {
+    private PushConsumer consumer;
+
+    @Mock
+    private DefaultMQPushConsumer rocketmqPushConsumer;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        consumer = messagingAccessPoint.createPushConsumer(
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+
+        Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
+        field.setAccessible(true);
+        DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer);
+        field.set(consumer, rocketmqPushConsumer); //Replace
+
+        when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
+        messagingAccessPoint.startup();
+        consumer.startup();
+    }
+
+    @Test
+    public void testConsumeMessage() {
+        final byte[] testBody = new byte[] {'a', 'b'};
+
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(testBody);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic("HELLO_QUEUE");
+        consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
+            @Override
+            public void onMessage(final Message message, final ReceivedMessageContext context)
{
+                assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+                assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
+                context.ack();
+            }
+        });
+        ((MessageListenerConcurrently) rocketmqPushConsumer
+            .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg),
null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
new file mode 100644
index 0000000..1db80c3
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.Producer;
+import io.openmessaging.exception.OMSRuntimeException;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProducerImplTest {
+    private Producer producer;
+
+    @Mock
+    private DefaultMQProducer rocketmqProducer;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        producer = messagingAccessPoint.createProducer();
+
+        Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
+        field.setAccessible(true);
+        field.set(producer, rocketmqProducer);
+
+        messagingAccessPoint.startup();
+        producer.startup();
+    }
+
+    @Test
+    public void testSend_OK() throws InterruptedException, RemotingException, MQClientException,
MQBrokerException {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("TestMsgID");
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
+        io.openmessaging.SendResult omsResult =
+            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+
+        assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
+    }
+
+    @Test
+    public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException,
MQBrokerException {
+        SendResult sendResult = new SendResult();
+        sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT);
+
+        when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
+        try {
+            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+        } catch (Exception e) {
+            assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
+        }
+    }
+
+    @Test
+    public void testSend_WithException() throws InterruptedException, RemotingException,
MQClientException, MQBrokerException {
+        when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
+        try {
+            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+        } catch (Exception e) {
+            assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
new file mode 100644
index 0000000..823fe01
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.SequenceProducer;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SequenceProducerImplTest {
+
+    private SequenceProducer producer;
+
+    @Mock
+    private DefaultMQProducer rocketmqProducer;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        producer = messagingAccessPoint.createSequenceProducer();
+
+        Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
+        field.setAccessible(true);
+        field.set(producer, rocketmqProducer);
+
+        messagingAccessPoint.startup();
+        producer.startup();
+    }
+
+    @Test
+    public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException,
MQBrokerException {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("TestMsgID");
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
+        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
+        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new
byte[] {'a'});
+        producer.send(message);
+        producer.commit();
+        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
+    }
+
+    @Test
+    public void testRollback() {
+        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
+        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new
byte[] {'a'});
+        producer.send(message);
+        producer.rollback();
+        producer.commit(); //Commit nothing.
+        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
new file mode 100644
index 0000000..2240ff2
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.promise;
+
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.exception.OMSRuntimeException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
+public class DefaultPromiseTest {
+    private Promise<String> promise;
+
+    @Before
+    public void init() {
+        promise = new DefaultPromise<>();
+    }
+
+    @Test
+    public void testIsCancelled() throws Exception {
+        assertThat(promise.isCancelled()).isEqualTo(false);
+    }
+
+    @Test
+    public void testIsDone() throws Exception {
+        assertThat(promise.isDone()).isEqualTo(false);
+        promise.set("Done");
+        assertThat(promise.isDone()).isEqualTo(true);
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        promise.set("Done");
+        assertThat(promise.get()).isEqualTo("Done");
+    }
+
+    @Test
+    public void testGet_WithTimeout() throws Exception {
+        try {
+            promise.get(100);
+            failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+        } catch (OMSRuntimeException e) {
+            assertThat(e).hasMessageContaining("Get request result is timeout or interrupted");
+        }
+    }
+
+    @Test
+    public void testAddListener() throws Exception {
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+                assertThat(promise.get()).isEqualTo("Done");
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+
+            }
+        });
+        promise.set("Done");
+    }
+
+    @Test
+    public void testAddListener_ListenerAfterSet() throws Exception {
+        promise.set("Done");
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+                assertThat(promise.get()).isEqualTo("Done");
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+
+            }
+        });
+    }
+
+    @Test
+    public void testAddListener_WithException_ListenerAfterSet() throws Exception {
+        final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        promise.setFailure(exception);
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+                assertThat(promise.getThrowable()).isEqualTo(exception);
+            }
+        });
+    }
+
+    @Test
+    public void testAddListener_WithException() throws Exception {
+        final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+                assertThat(promise.getThrowable()).isEqualTo(exception);
+            }
+        });
+        promise.setFailure(exception);
+    }
+
+    @Test
+    public void getThrowable() throws Exception {
+        assertThat(promise.getThrowable()).isNull();
+        Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        promise.setFailure(exception);
+        assertThat(promise.getThrowable()).isEqualTo(exception);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6edeb831/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
new file mode 100644
index 0000000..71ca11c
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.OMS;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BeanUtilsTest {
+    private KeyValue properties = OMS.newKeyValue();
+
+    public static class CustomizedConfig extends ClientConfig {
+        final static String STRING_TEST = "string.test";
+        String stringTest = "foobar";
+
+        final static String DOUBLE_TEST = "double.test";
+        double doubleTest = 123.0;
+
+        final static String LONG_TEST = "long.test";
+        long longTest = 123L;
+
+        String getStringTest() {
+            return stringTest;
+        }
+
+        public void setStringTest(String stringTest) {
+            this.stringTest = stringTest;
+        }
+
+        double getDoubleTest() {
+            return doubleTest;
+        }
+
+        public void setDoubleTest(final double doubleTest) {
+            this.doubleTest = doubleTest;
+        }
+
+        long getLongTest() {
+            return longTest;
+        }
+
+        public void setLongTest(final long longTest) {
+            this.longTest = longTest;
+        }
+
+        CustomizedConfig() {
+        }
+    }
+
+    @Before
+    public void init() {
+        properties.put(NonStandardKeys.MAX_REDELIVERY_TIMES, 120);
+        properties.put(CustomizedConfig.STRING_TEST, "kaka");
+        properties.put(NonStandardKeys.CONSUMER_GROUP, "Default_Consumer_Group");
+        properties.put(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT, 101);
+
+        properties.put(CustomizedConfig.LONG_TEST, 1234567890L);
+        properties.put(CustomizedConfig.DOUBLE_TEST, 10.234);
+    }
+
+    @Test
+    public void testPopulate() {
+        CustomizedConfig config = BeanUtils.populate(properties, CustomizedConfig.class);
+
+        //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
+        Assert.assertEquals(config.getStringTest(), "kaka");
+        Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
+        Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
+        Assert.assertEquals(config.getLongTest(), 1234567890L);
+        Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
+    }
+
+    @Test
+    public void testPopulate_ExistObj() {
+        CustomizedConfig config = new CustomizedConfig();
+        config.setOmsConsumerId("NewConsumerId");
+
+        Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
+
+        config = BeanUtils.populate(properties, config);
+
+        //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
+        Assert.assertEquals(config.getStringTest(), "kaka");
+        Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
+        Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
+        Assert.assertEquals(config.getLongTest(), 1234567890L);
+        Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
+    }
+
+}
\ No newline at end of file



Mime
View raw message