rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-ons] 37/43: feat(PullConsumer) add seek to begin/end support
Date Fri, 06 Dec 2019 04:22:59 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch OpenMessaging
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit 1e95020fec3548c56b9c08776d451a85d782f81f
Author: 翊名 <duheng.dh@alibaba-inc.com>
AuthorDate: Mon Nov 25 16:53:18 2019 +0800

    feat(PullConsumer) add seek to begin/end support
---
 .../apache/rocketmq/ons/api/PropertyKeyConst.java  |  1 +
 .../ons/api/impl/rocketmq/PullConsumerImpl.java    | 44 +++++++++-------------
 ons-core/pom.xml                                   |  2 +-
 3 files changed, 19 insertions(+), 28 deletions(-)

diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java
index b3d2670..d968264 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java
@@ -79,6 +79,7 @@ public class PropertyKeyConst implements OMSBuiltinKeys {
 
     public static final String MsgTraceSwitch = "msgTraceSwitch";
 
+    public static final String AUTO_COMMIT = "autoCommit";
 
 
 }
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
index 4d625a0..37678bd 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
@@ -105,34 +105,14 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer
             this.litePullConsumer.setPullThresholdSizeForQueue(maxCachedMessageSizeInMiB);
         }
 
-//        String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch);
-//        if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch)))
{
-//            LOGGER.info("MQ Client Disable the Trace Hook!");
-//        } else {
-//            try {
-//                Properties tempProperties = new Properties();
-//                tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
-//                tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
-//                tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
-//                tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
-//                tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
-//                tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
-//                tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
-//                tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
-//                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
sessionCredentials);
-//                dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl());
-//                traceDispatcher = dispatcher;
-//                this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
-//                    new OnsConsumeMessageHookImpl(traceDispatcher));
-//            } catch (Throwable e) {
-//                LOGGER.error("system mqtrace hook init failed ,maybe can't send msg trace
data", e);
-//            }
-//        }
-
+        String autoCommit = properties.getProperty(PropertyKeyConst.AUTO_COMMIT);
+        if (!UtilAll.isBlank(autoCommit)) {
+            this.litePullConsumer.setAutoCommit(Boolean.valueOf(autoCommit));
+        }
     }
 
     @Override protected void updateNameServerAddr(String nameServerAddresses) {
-        //TODO
+        this.litePullConsumer.updateNameServerAddress(nameServerAddresses);
     }
 
     private Set<TopicPartition> convertToTopicPartitions(Collection<MessageQueue>
messageQueues) {
@@ -233,11 +213,21 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer
     }
 
     @Override public void seekToBeginning(TopicPartition topicPartition) {
-        //TODO
+        try {
+            this.litePullConsumer.seekToBegin(convertToMessageQueue(topicPartition));
+        } catch (MQClientException ex) {
+            LOGGER.warn("Topic partition: {} seek to beginning error", topicPartition, ex);
+            throw new ONSClientException("Seek offset to beginning failed");
+        }
     }
 
     @Override public void seekToEnd(TopicPartition topicPartition) {
-        //TODO
+        try {
+            this.litePullConsumer.seekToEnd(convertToMessageQueue(topicPartition));
+        } catch (MQClientException ex) {
+            LOGGER.warn("Topic partition: {} seek to end error", topicPartition, ex);
+            throw new ONSClientException("Seek offset to end failed");
+        }
 
     }
 
diff --git a/ons-core/pom.xml b/ons-core/pom.xml
index 62851ac..cd82803 100644
--- a/ons-core/pom.xml
+++ b/ons-core/pom.xml
@@ -44,7 +44,7 @@
         <java_target_version>1.8</java_target_version>
         <file_encoding>UTF-8</file_encoding>
         <!-- Always use stable version of RocketMQ -->
-        <rocketmq.version>4.6.0</rocketmq.version>
+        <rocketmq.version>4.6.1-SNAPSHOT</rocketmq.version>
         <auth.version>${project.version}</auth.version>
         <spring.version>4.1.2.RELEASE</spring.version>
         <diamond.version>3.7.4</diamond.version>


Mime
View raw message