rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lizhan...@apache.org
Subject [rocketmq] branch develop_oms_0.3.0 updated: Make unit tests compilable
Date Sun, 15 Apr 2018 09:00:25 GMT
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop_oms_0.3.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop_oms_0.3.0 by this push:
     new 2988a1e  Make unit tests compilable
2988a1e is described below

commit 2988a1e6caa8d21413ef6ff60cc04a8cfb3dae20
Author: shutian.lzh <shutian.lzh@alibaba-inc.com>
AuthorDate: Sun Apr 15 17:00:13 2018 +0800

    Make unit tests compilable
---
 .../example/openmessaging/SimpleProducer.java      |  4 +-
 .../rocketmq/consumer/PullConsumerImplTest.java    | 19 ++---
 .../rocketmq/consumer/PushConsumerImplTest.java    | 13 ++--
 .../rocketmq/producer/ProducerImplTest.java        | 14 ++--
 .../producer/SequenceProducerImplTest.java         | 86 ----------------------
 .../rocketmq/promise/DefaultPromiseTest.java       | 38 +++-------
 6 files changed, 33 insertions(+), 141 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index 2884797..c785504 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -27,8 +27,8 @@ import java.nio.charset.Charset;
 
 public class SimpleProducer {
     public static void main(String[] args) {
-        final MessagingAccessPoint messagingAccessPoint = OMS
-            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        final MessagingAccessPoint messagingAccessPoint =
+            OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
 
         final Producer producer = messagingAccessPoint.createProducer();
 
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
index 843ddb7..7e81b40 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -18,12 +18,9 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.BytesMessage;
 import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
 import io.openmessaging.OMS;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.PullConsumer;
+import io.openmessaging.consumer.PullConsumer;
 import io.openmessaging.rocketmq.config.ClientConfig;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import java.lang.reflect.Field;
@@ -50,11 +47,11 @@ public class PullConsumerImplTest {
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+        final MessagingAccessPoint messagingAccessPoint = OMS
             .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
 
-        consumer = messagingAccessPoint.createPullConsumer(queueName,
-            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+        consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"TestGroup"));
+        consumer.attachQueue(queueName);
 
         Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
         field.setAccessible(true);
@@ -83,18 +80,18 @@ public class PullConsumerImplTest {
 
         when(localMessageCache.poll()).thenReturn(consumedMsg);
 
-        Message message = consumer.poll();
-        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+        Message message = consumer.receive();
+        assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
         assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
     }
 
     @Test
     public void testPoll_WithTimeout() {
         //There is a default timeout value, @see ClientConfig#omsOperationTimeout.
-        Message message = consumer.poll();
+        Message message = consumer.receive();
         assertThat(message).isNull();
 
-        message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
+        message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
         assertThat(message).isNull();
     }
 }
\ No newline at end of file
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
index 882e57e..5caa2b6 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -18,13 +18,10 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.BytesMessage;
 import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessageListener;
+import io.openmessaging.consumer.MessageListener;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
 import io.openmessaging.OMS;
-import io.openmessaging.PushConsumer;
-import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.consumer.PushConsumer;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import java.lang.reflect.Field;
 import java.util.Collections;
@@ -49,7 +46,7 @@ public class PushConsumerImplTest {
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+        final MessagingAccessPoint messagingAccessPoint = OMS
             .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
         consumer = messagingAccessPoint.createPushConsumer(
             OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
@@ -75,8 +72,8 @@ public class PushConsumerImplTest {
         consumedMsg.setTopic("HELLO_QUEUE");
         consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
             @Override
-            public void onMessage(final Message message, final ReceivedMessageContext context)
{
-                assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+            public void onReceived(Message message, Context context) {
+                assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
                 assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
                 context.ack();
             }
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
index 1db80c3..7b36179 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -17,9 +17,9 @@
 package io.openmessaging.rocketmq.producer;
 
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.Producer;
+import io.openmessaging.OMS;
 import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.producer.Producer;
 import java.lang.reflect.Field;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -49,7 +49,7 @@ public class ProducerImplTest {
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+        final MessagingAccessPoint messagingAccessPoint = OMS
             .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
         producer = messagingAccessPoint.createProducer();
 
@@ -67,8 +67,8 @@ public class ProducerImplTest {
         sendResult.setMsgId("TestMsgID");
         sendResult.setSendStatus(SendStatus.SEND_OK);
         when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
-        io.openmessaging.SendResult omsResult =
-            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+        io.openmessaging.producer.SendResult omsResult =
+            producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
 
         assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
     }
@@ -80,7 +80,7 @@ public class ProducerImplTest {
 
         when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
         try {
-            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
             failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
         } catch (Exception e) {
             assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
@@ -91,7 +91,7 @@ public class ProducerImplTest {
     public void testSend_WithException() throws InterruptedException, RemotingException,
MQClientException, MQBrokerException {
         when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
         try {
-            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
             failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
         } catch (Exception e) {
             assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
deleted file mode 100644
index 823fe01..0000000
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.rocketmq.producer;
-
-import io.openmessaging.BytesMessage;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.SequenceProducer;
-import java.lang.reflect.Field;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class SequenceProducerImplTest {
-
-    private SequenceProducer producer;
-
-    @Mock
-    private DefaultMQProducer rocketmqProducer;
-
-    @Before
-    public void init() throws NoSuchFieldException, IllegalAccessException {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
-            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
-        producer = messagingAccessPoint.createSequenceProducer();
-
-        Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
-        field.setAccessible(true);
-        field.set(producer, rocketmqProducer);
-
-        messagingAccessPoint.startup();
-        producer.startup();
-    }
-
-    @Test
-    public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException,
MQBrokerException {
-        SendResult sendResult = new SendResult();
-        sendResult.setMsgId("TestMsgID");
-        sendResult.setSendStatus(SendStatus.SEND_OK);
-        when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
-        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
-        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new
byte[] {'a'});
-        producer.send(message);
-        producer.commit();
-        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
-    }
-
-    @Test
-    public void testRollback() {
-        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
-        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new
byte[] {'a'});
-        producer.send(message);
-        producer.rollback();
-        producer.commit(); //Commit nothing.
-        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
-    }
-}
\ No newline at end of file
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
index 2240ff2..f226ede 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -16,8 +16,9 @@
  */
 package io.openmessaging.rocketmq.promise;
 
+import io.openmessaging.Future;
+import io.openmessaging.FutureListener;
 import io.openmessaging.Promise;
-import io.openmessaging.PromiseListener;
 import io.openmessaging.exception.OMSRuntimeException;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,14 +64,10 @@ public class DefaultPromiseTest {
 
     @Test
     public void testAddListener() throws Exception {
-        promise.addListener(new PromiseListener<String>() {
+        promise.addListener(new FutureListener<String>() {
             @Override
-            public void operationCompleted(final Promise<String> promise) {
+            public void operationComplete(Future<String> future) {
                 assertThat(promise.get()).isEqualTo("Done");
-            }
-
-            @Override
-            public void operationFailed(final Promise<String> promise) {
 
             }
         });
@@ -80,15 +77,10 @@ public class DefaultPromiseTest {
     @Test
     public void testAddListener_ListenerAfterSet() throws Exception {
         promise.set("Done");
-        promise.addListener(new PromiseListener<String>() {
-            @Override
-            public void operationCompleted(final Promise<String> promise) {
-                assertThat(promise.get()).isEqualTo("Done");
-            }
-
+        promise.addListener(new FutureListener<String>() {
             @Override
-            public void operationFailed(final Promise<String> promise) {
-
+            public void operationComplete(Future<String> future) {
+                assertThat(future.get()).isEqualTo("Done");
             }
         });
     }
@@ -97,13 +89,9 @@ public class DefaultPromiseTest {
     public void testAddListener_WithException_ListenerAfterSet() throws Exception {
         final Throwable exception = new OMSRuntimeException("-1", "Test Error");
         promise.setFailure(exception);
-        promise.addListener(new PromiseListener<String>() {
-            @Override
-            public void operationCompleted(final Promise<String> promise) {
-            }
-
+        promise.addListener(new FutureListener<String>() {
             @Override
-            public void operationFailed(final Promise<String> promise) {
+            public void operationComplete(Future<String> future) {
                 assertThat(promise.getThrowable()).isEqualTo(exception);
             }
         });
@@ -112,13 +100,9 @@ public class DefaultPromiseTest {
     @Test
     public void testAddListener_WithException() throws Exception {
         final Throwable exception = new OMSRuntimeException("-1", "Test Error");
-        promise.addListener(new PromiseListener<String>() {
-            @Override
-            public void operationCompleted(final Promise<String> promise) {
-            }
-
+        promise.addListener(new FutureListener<String>() {
             @Override
-            public void operationFailed(final Promise<String> promise) {
+            public void operationComplete(Future<String> future) {
                 assertThat(promise.getThrowable()).isEqualTo(exception);
             }
         });

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.

Mime
View raw message