rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lizhan...@apache.org
Subject [1/2] incubator-rocketmq-site git commit: Update example of scheduled message
Date Thu, 08 Jun 2017 03:05:27 GMT
Repository: incubator-rocketmq-site
Updated Branches:
  refs/heads/master a84f68daa -> 10f8abe52


Update example of scheduled message


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/3bb95024
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/3bb95024
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/3bb95024

Branch: refs/heads/master
Commit: 3bb950241e1e2d07aed7f7ed788d968a3dcece97
Parents: a84f68d
Author: Li Zhanhui <lizhanhui@gmail.com>
Authored: Wed Jun 7 11:57:11 2017 +0800
Committer: Li Zhanhui <lizhanhui@gmail.com>
Committed: Wed Jun 7 11:57:11 2017 +0800

----------------------------------------------------------------------
 _docs/17-rmq-schedule-example.md | 105 +++++++++++++++++++---------------
 1 file changed, 58 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/3bb95024/_docs/17-rmq-schedule-example.md
----------------------------------------------------------------------
diff --git a/_docs/17-rmq-schedule-example.md b/_docs/17-rmq-schedule-example.md
index e9610ee..0340f21 100644
--- a/_docs/17-rmq-schedule-example.md
+++ b/_docs/17-rmq-schedule-example.md
@@ -11,61 +11,72 @@ modified: 2017-04-24T15:01:43-04:00
 
 
 ### What is scheduled message?
-Scheduled messages differ from normal messages such that they won't be delivered until a
provided time later.
-If you use `DefaultMQPullConsumer` to consume message, you have to fetch message manually.
There are other options availible but  `MQPullConsumerScheduleService` is the easiest.
 
-#### DefaultMQPullConsumer use case
+Scheduled messages differ from normal messages in that they won't be delivered until a provided
time later.
 
-> First fetch subscribed queues of a topic
+### Application
+
+1. Start consumer to wait for incoming subscribed messages
 
 ```java
-Set<MessageQueue> testTopic = consumer.fetchSubscribeMessageQueues("testTopic");
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+
+public class ScheduledMessageConsumer {
+
+    public static void main(String[] args) throws Exception {
+        // Instantiate message consumer
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
+        // Subscribe topics
+        consumer.subscribe("TestTopic", "*");
+        // Register message listener
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
+                for (MessageExt message : messages) {
+                    // Print approximate delay time period
+                    System.out.println("Receive message[msgId=" + message.getMsgId() + "]
"
+                            + (System.currentTimeMillis() - message.getStoreTimestamp())
+ "ms later");
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        // Launch consumer
+        consumer.start();
+    }
+}
 ```
 
-> Second chose a queue to fetch message,and save queue offset manually.
 
-#### Use MQPullConsumerScheduleService consume message
+2. Send scheduled messages
 
 ```java
-final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
-
-scheduleService.setMessageModel(MessageModel.CLUSTERING);
-scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
-
-    @Override
-    public void doPullTask(MessageQueue mq, PullTaskContext context) {
-        MQPullConsumer consumer = context.getPullConsumer();
-        try {
-
-            long offset = consumer.fetchConsumeOffset(mq, false);
-            if (offset < 0)
-                offset = 0;
-
-            PullResult pullResult = consumer.pull(mq, "*", offset, 32);
-            System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
-            switch (pullResult.getPullStatus()) {
-                case FOUND:
-                    break;
-                case NO_MATCHED_MSG:
-                    break;
-                case NO_NEW_MSG:
-                case OFFSET_ILLEGAL:
-                    break;
-                default:
-                    break;
-            }
-            consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
-
-			//consume message auto
-            context.setPullNextDelayTimeMillis(100);
-        } catch (Exception e) {
-            e.printStackTrace();
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class ScheduledMessageProducer {
+
+    public static void main(String[] args) throws Exception {
+        // Instantiate a producer to send scheduled messages
+        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
+        // Launch producer
+        producer.start();
+        int totalMessagesToSend = 100;
+        for (int i = 0; i < totalMessagesToSend; i++) {
+            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
+            // This message will be delivered to consumer 10 seconds later.
+            message.setDelayTimeLevel(3);
+            // Send the message
+            producer.send(message);
         }
-    }
-});
-
-scheduleService.start();
-```
-
-#### Have fun with `MQPullConsumerScheduleService`.
 
+        // Shutdown producer after use.
+        producer.shutdown();
+    }
+    
+}
+```
\ No newline at end of file


Mime
View raw message