rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-ons] 38/43: feat(PullConsumer) add seek to begin/end support
Date Fri, 06 Dec 2019 04:23:00 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch OpenMessaging
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit 3736c07f3d9042a00d8e9ee779e134cfad8a4c26
Author: duhenglucky <duhengforever@apache.org>
AuthorDate: Mon Nov 25 20:20:31 2019 +0800

    feat(PullConsumer) add seek to begin/end support
---
 .../org/apache/rocketmq/ons/api/Constants.java     |  2 ++
 .../apache/rocketmq/ons/api/bean/ConsumerBean.java | 33 +++++-----------------
 .../ons/api/impl/rocketmq/PullConsumerImpl.java    |  6 ++--
 .../ons/sample/consumer/SimplePullConsumer.java    |  7 +++--
 .../ons/sample/producer/MQTimerProducer.java       |  1 +
 .../ons/sample/producer/SimpleMQProducer.java      |  1 -
 .../sample/producer/SimpleTransactionProducer.java |  2 +-
 7 files changed, 17 insertions(+), 35 deletions(-)

diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
index 48d8f67..a82c405 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
@@ -19,4 +19,6 @@ package org.apache.rocketmq.ons.api;
 
 public class Constants {
     public static final String TRANSACTION_ID = "__transactionId__";
+
+    public static final String TOPIC_PARTITION_SEPARATOR = "#";
 }
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java
index 1555be9..c95cd9f 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java
@@ -21,8 +21,6 @@ import io.openmessaging.api.ExpressionType;
 import io.openmessaging.api.MessageListener;
 import io.openmessaging.api.MessageSelector;
 import io.openmessaging.api.bean.Subscription;
-import io.openmessaging.api.bean.SubscriptionExt;
-import java.lang.reflect.Method;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -69,36 +67,19 @@ public class ConsumerBean implements Consumer {
         Iterator<Entry<Subscription, MessageListener>> it = this.subscriptionTable.entrySet().iterator();
         while (it.hasNext()) {
             Entry<Subscription, MessageListener> next = it.next();
-            if ("com.aliyun.openservices.ons.api.impl.notify.ConsumerImpl".equals(this.consumer.getClass().getCanonicalName())
-                && (next.getKey() instanceof SubscriptionExt)) {
-                SubscriptionExt subscription = (SubscriptionExt) next.getKey();
-                for (Method method : this.consumer.getClass().getMethods()) {
-                    if ("subscribeNotify".equals(method.getName())) {
-                        try {
-                            method.invoke(consumer, subscription.getTopic(), subscription.getExpression(),
-                                subscription.isPersistence(), next.getValue());
-                        } catch (Exception e) {
-                            throw new ONSClientException("subscribeNotify invoke exception",
e);
-                        }
-                        break;
-                    }
-                }
 
-            } else {
-                Subscription subscription = next.getKey();
-                if (subscription.getType() == null || ExpressionType.TAG.name().equals(subscription.getType()))
{
+            Subscription subscription = next.getKey();
+            if (subscription.getType() == null || ExpressionType.TAG.name().equals(subscription.getType()))
{
 
-                    this.subscribe(subscription.getTopic(), subscription.getExpression(),
next.getValue());
+                this.subscribe(subscription.getTopic(), subscription.getExpression(), next.getValue());
 
-                } else if (ExpressionType.SQL92.name().equals(subscription.getType())) {
+            } else if (ExpressionType.SQL92.name().equals(subscription.getType())) {
 
-                    this.subscribe(subscription.getTopic(), MessageSelector.bySql(subscription.getExpression()),
next.getValue());
-                } else {
+                this.subscribe(subscription.getTopic(), MessageSelector.bySql(subscription.getExpression()),
next.getValue());
+            } else {
 
-                    throw new ONSClientException(String.format("Expression type %s is unknown!",
subscription.getType()));
-                }
+                throw new ONSClientException(String.format("Expression type %s is unknown!",
subscription.getType()));
             }
-
         }
 
         this.consumer.start();
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
index 37678bd..d364b88 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
@@ -51,8 +51,6 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer
 
     private DefaultLitePullConsumer litePullConsumer;
 
-    private final String TOPIC_PARTITION_SPLITER = "#";
-
     private int maxCachedMessageSizeInMiB = 512;
 
     private int maxCachedMessageAmount = 5000;
@@ -134,14 +132,14 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer
 
     private TopicPartition convertToTopicPartition(MessageQueue messageQueue) {
         String topic = messageQueue.getTopic();
-        String partition = messageQueue.getBrokerName() + TOPIC_PARTITION_SPLITER + messageQueue.getQueueId();
+        String partition = messageQueue.getBrokerName() + Constants.TOPIC_PARTITION_SEPARATOR
+ messageQueue.getQueueId();
         TopicPartition topicPartition = new TopicPartition(topic, partition);
         return topicPartition;
     }
 
     private MessageQueue convertToMessageQueue(TopicPartition topicPartition) {
         String topic = topicPartition.getTopic();
-        String[] tmp = topicPartition.getPartition().split(TOPIC_PARTITION_SPLITER);
+        String[] tmp = topicPartition.getPartition().split(Constants.TOPIC_PARTITION_SEPARATOR);
         if (tmp.length != 2) {
             LOGGER.warn("Failed to get message queue from TopicPartition: {}", topicPartition);
             throw new ONSClientException("Failed to get message queue");
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java
index 91bf280..5037c24 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java
@@ -28,6 +28,8 @@ import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.sample.MQConfig;
 
 public class SimplePullConsumer {
+    public static volatile boolean running = true;
+
     public static void main(String[] args) {
 
         MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876");
@@ -52,12 +54,11 @@ public class SimplePullConsumer {
         Set<TopicPartition> topicPartitions = consumer.topicPartitions(MQConfig.TOPIC);
         consumer.assign(topicPartitions);
 
-        while (true){
+        while (running) {
             List<Message> messages = consumer.poll(3000);
             System.out.printf("Received message: %s %n", messages);
             consumer.commitSync();
         }
-
-
+        consumer.shutdown();
     }
 }
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java
index fcf7554..6c81c59 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java
@@ -63,5 +63,6 @@ public class MQTimerProducer {
                 e.printStackTrace();
             }
         }
+        producer.shutdown();
     }
 }
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java
index 5bb7f9e..c07389c 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java
@@ -26,7 +26,6 @@ import java.util.Properties;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.sample.MQConfig;
 
-//    io.openmessaging.api.xxx => com.aliyun.openservices.ons.api.xxxx
 
 public class SimpleMQProducer {
 
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
index 2e41e22..c1d890b 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
@@ -70,7 +70,7 @@ public class SimpleTransactionProducer {
                 e.printStackTrace();
             }
         }
-
+        transactionProducer.shutdown();
         System.out.printf("Send transaction message success. %n");
     }
 }
\ No newline at end of file


Mime
View raw message