rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tiger...@apache.org
Subject [rocketmq] branch develop updated: production level pull api demo
Date Thu, 23 Sep 2021 16:46:53 GMT
This is an automated email from the ASF dual-hosted git repository.

tigerlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 401dc8e  production level pull api demo
     new 1e8e728  Merge pull request #3295 from lwclover/develop
401dc8e is described below

commit 401dc8eaf11b24110786884f28a8406117ef2224
Author: sunhangda <lwclover@126.com>
AuthorDate: Fri Aug 27 11:17:46 2021 +0800

    production level pull api demo
    
    production level pull api demo
---
 .../rocketmq/example/simple/PullConsumer.java      | 154 +++++++++++++++------
 1 file changed, 111 insertions(+), 43 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
index 8aec7e3..c6c706b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -16,63 +16,131 @@
  */
 package org.apache.rocketmq.example.simple;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 
+@SuppressWarnings("deprecation")
 public class PullConsumer {
-    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue,
Long>();
 
     public static void main(String[] args) throws MQClientException {
+    	
         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
         consumer.setNamesrvAddr("127.0.0.1:9876");
+        Set<String> topics = new HashSet<>();
+        //You would better to register topics,It will use in rebalance when starting
+        topics.add("TopicTest");
+        consumer.setRegisterTopics(topics);
         consumer.start();
 
-        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
-        for (MessageQueue mq : mqs) {
-            System.out.printf("Consume from the queue: %s%n", mq);
-            SINGLE_MQ:
-            while (true) {
-                try {
-                    PullResult pullResult =
-                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq),
32);
-                    System.out.printf("%s%n", pullResult);
-                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
-                    switch (pullResult.getPullStatus()) {
-                        case FOUND:
-                            break;
-                        case NO_MATCHED_MSG:
-                            break;
-                        case NO_NEW_MSG:
-                            break SINGLE_MQ;
-                        case OFFSET_ILLEGAL:
-                            break;
-                        default:
-                            break;
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
+        ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactory()
{
+			@Override
+			public Thread newThread(Runnable r) {
+                return new Thread(r, "PullConsumerThread");
             }
+		});
+        for(String topic : consumer.getRegisterTopics()){
+        	
+        	executors.execute(new Runnable() {
+        		
+        		public void doSomething(List<MessageExt> msgs){
+        			//do you business
+        			System.out.println(msgs);
+        		}
+				@Override
+				public void run() {
+					while(true){
+						try {
+							Set<MessageQueue> messageQueues =  consumer.fetchMessageQueuesInBalance(topic);
+							if(messageQueues == null || messageQueues.isEmpty()){
+								Thread.sleep(1000);
+								continue;
+							}
+							PullResult pullResult = null;
+							for(MessageQueue messageQueue : messageQueues){
+								try {
+									long offset = this.consumeFromOffset(messageQueue);
+									pullResult = consumer.pull(messageQueue, "*", offset, 32);
+									switch (pullResult.getPullStatus()) {
+			                        case FOUND:
+			                        	List<MessageExt> msgs = pullResult.getMsgFoundList();
+			                        	
+			                        	if(msgs != null && !msgs.isEmpty()){
+			                        		this.doSomething(msgs);
+			                        		//update offset to broker
+			                        		consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+			                        		//print pull tps
+											this.incPullTPS(topic, pullResult.getMsgFoundList().size());
+			                        	}
+			                        	break;
+			                        case OFFSET_ILLEGAL:
+			                        	consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+			                        	break;
+			                        case NO_NEW_MSG:
+			                        	Thread.sleep(1);
+			                        	consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+			                        	break;
+			                        case NO_MATCHED_MSG:
+			                        	consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+			                        	break;
+			                        default:
+								}
+								} catch (RemotingException e) {
+									e.printStackTrace();
+								} catch (MQBrokerException e) {
+									e.printStackTrace();
+								} catch (Exception e){
+									e.printStackTrace();
+								}
+							}
+						} catch (MQClientException e) {
+							//reblance error
+							e.printStackTrace();
+						} catch (InterruptedException e) {
+							e.printStackTrace();
+						} catch (Exception e){
+							e.printStackTrace();
+						}
+					}
+				}
+				
+				public long consumeFromOffset(MessageQueue messageQueue) throws MQClientException{
+					//-1 when started
+					long offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
+					if(offset < 0){
+						//query from broker
+						offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
+					}
+                    if (offset < 0){
+                    	//first time start from last offset
+                    	offset = consumer.maxOffset(messageQueue);
+                    }
+                    //make sure
+                    if (offset < 0){
+                    	offset = 0;
+                    }
+					return offset;
+				}
+				public void incPullTPS(String topic, int pullSize) {
+					consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+							.getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
+				}
+        	});
+        	
         }
-
-        consumer.shutdown();
+//        executors.shutdown();
+//        consumer.shutdown();
     }
-
-    private static long getMessageQueueOffset(MessageQueue mq) {
-        Long offset = OFFSE_TABLE.get(mq);
-        if (offset != null)
-            return offset;
-
-        return 0;
-    }
-
-    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
-        OFFSE_TABLE.put(mq, offset);
-    }
-
 }

Mime
View raw message