rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lizhan...@apache.org
Subject [rocketmq] branch develop_oms_0.3.0 updated: Run tests
Date Thu, 19 Apr 2018 08:44:46 GMT
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop_oms_0.3.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop_oms_0.3.0 by this push:
     new 7535889  Run tests
7535889 is described below

commit 753588947b782d1abfbde5998783895e6e7e3ddc
Author: shutian.lzh <shutian.lzh@alibaba-inc.com>
AuthorDate: Thu Apr 19 16:44:32 2018 +0800

    Run tests
---
 .../example/openmessaging/SimplePullConsumer.java  | 36 ++++++++++++++++------
 .../apache/rocketmq/example/simple/Producer.java   |  2 +-
 .../example/simple/PullScheduleService.java        |  2 +-
 .../rocketmq/consumer/PullConsumerImpl.java        |  2 +-
 4 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 4ddf50f..86aba41 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -21,38 +21,56 @@ import io.openmessaging.MessagingAccessPoint;
 import io.openmessaging.OMS;
 import io.openmessaging.OMSBuiltinKeys;
 import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.producer.Producer;
+import io.openmessaging.producer.SendResult;
 
 public class SimplePullConsumer {
     public static void main(String[] args) {
         final MessagingAccessPoint messagingAccessPoint =
             OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
 
+        messagingAccessPoint.startup();
+
+        final Producer producer = messagingAccessPoint.createProducer();
+
         final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
             OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
 
         messagingAccessPoint.startup();
         System.out.printf("MessagingAccessPoint startup OK%n");
 
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                consumer.shutdown();
-                messagingAccessPoint.shutdown();
-            }
-        }));
+        final String queueName = "TopicTest";
 
-        consumer.attachQueue("OMS_HELLO_TOPIC");
+        producer.startup();
+        Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
+        SendResult sendResult = producer.send(msg);
+        System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
+        producer.shutdown();
+
+        consumer.attachQueue(queueName);
 
         consumer.startup();
         System.out.printf("Consumer startup OK%n");
 
-        while (true) {
+        // Keep running until we find the one that has just been sent
+        boolean stop = false;
+        while (!stop) {
             Message message = consumer.receive();
             if (message != null) {
                 String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
                 System.out.printf("Received one message: %s%n", msgId);
                 consumer.ack(msgId);
+
+                if (!stop) {
+                    stop = msgId.equalsIgnoreCase(sendResult.messageId());
+                }
+
+            } else {
+                System.out.printf("Return without any message%n");
             }
         }
+
+        consumer.shutdown();
+        messagingAccessPoint.shutdown();
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
index 5751d22..7b504dd 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
@@ -29,7 +29,7 @@ public class Producer {
 
         producer.start();
 
-        for (int i = 0; i < 10000000; i++)
+        for (int i = 0; i < 128; i++)
             try {
                 {
                     Message msg = new Message("TopicTest",
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
index 151628f..8cfdd9b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
@@ -32,7 +32,7 @@ public class PullScheduleService {
         final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
 
         scheduleService.setMessageModel(MessageModel.CLUSTERING);
-        scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
+        scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {
 
             @Override
             public void doPullTask(MessageQueue mq, PullTaskContext context) {
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index 225b09e..2e22509 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -34,9 +34,9 @@ import org.apache.rocketmq.client.consumer.PullTaskContext;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
 
 public class PullConsumerImpl implements PullConsumer {
     private final DefaultMQPullConsumer rocketmqPullConsumer;

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.

Mime
View raw message