rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-ons] 33/43: feat(consumer) add pull consumer support
Date Fri, 06 Dec 2019 04:22:55 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch OpenMessaging
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit efeaebd7050d10800e463d1167a0940078df7f6a
Author: 翊名 <duheng.dh@alibaba-inc.com>
AuthorDate: Thu Nov 21 11:47:31 2019 +0800

    feat(consumer) add pull consumer support
---
 .../rocketmq/MessagingAccessPointImpl.java         |   5 +-
 .../org/apache/rocketmq/ons/api/ONSFactory.java    |  26 ++
 .../org/apache/rocketmq/ons/api/ONSFactoryAPI.java |  26 +-
 .../rocketmq/ons/api/impl/ONSFactoryImpl.java      |   7 +
 .../ons/api/impl/rocketmq/PullConsumerImpl.java    | 292 +++++++++++++++++++++
 ons-core/pom.xml                                   |   2 +-
 .../ons/sample/consumer/SimplePullConsumer.java    |  63 +++++
 .../producer/LocalTransactionCheckerImpl.java      |   2 +-
 .../sample/producer/SimpleTransactionProducer.java |   4 +-
 pom.xml                                            |   4 +-
 10 files changed, 408 insertions(+), 23 deletions(-)

diff --git a/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
b/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index bb90f64..c1b4a38 100644
--- a/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -21,7 +21,6 @@ import io.openmessaging.api.Consumer;
 import io.openmessaging.api.Message;
 import io.openmessaging.api.MessagingAccessPoint;
 import io.openmessaging.api.OMSBuiltinKeys;
-import io.openmessaging.api.OMSResponseStatus;
 import io.openmessaging.api.Producer;
 import io.openmessaging.api.PullConsumer;
 import io.openmessaging.api.batch.BatchConsumer;
@@ -43,6 +42,7 @@ import org.apache.rocketmq.ons.api.impl.rocketmq.ONSUtil;
 import org.apache.rocketmq.ons.api.impl.rocketmq.OrderConsumerImpl;
 import org.apache.rocketmq.ons.api.impl.rocketmq.OrderProducerImpl;
 import org.apache.rocketmq.ons.api.impl.rocketmq.ProducerImpl;
+import org.apache.rocketmq.ons.api.impl.rocketmq.PullConsumerImpl;
 import org.apache.rocketmq.ons.api.impl.rocketmq.TransactionProducerImpl;
 
 public class MessagingAccessPointImpl implements MessagingAccessPoint {
@@ -71,8 +71,7 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
 
     @Override public PullConsumer createPullConsumer(Properties properties) {
         injectNameServerAddress(properties);
-        properties.put(PropertyKeyConst.NAMESRV_ADDR, this.attributes.getProperty(OMSBuiltinKeys.ACCESS_POINTS));
-        throw OMSResponseStatus.generateException(OMSResponseStatus.STATUS_1101);
+        return new PullConsumerImpl(properties);
     }
 
     @Override
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java
index 847a6ae..fe2768f 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.ons.api;
 import io.openmessaging.api.Consumer;
 import io.openmessaging.api.OMS;
 import io.openmessaging.api.Producer;
+import io.openmessaging.api.PullConsumer;
 import io.openmessaging.api.batch.BatchConsumer;
 import io.openmessaging.api.order.OrderConsumer;
 import io.openmessaging.api.order.OrderProducer;
@@ -222,4 +223,29 @@ public class ONSFactory {
         return onsFactory.createOrderedConsumer(properties);
     }
 
+    /**
+     * Create Order Consumer
+     * <p>
+     * <code>properties</code>
+     * Requires:
+     * <ol>
+     * <li>{@link PropertyKeyConst#GROUP_ID}</li>
+     * <li>{@link PropertyKeyConst#AccessKey}</li>
+     * <li>{@link PropertyKeyConst#SecretKey}</li>
+     * <li>{@link PropertyKeyConst#ONSAddr}</li>
+     * </ol>
+     * Optional:
+     * <ul>
+     * <li>{@link PropertyKeyConst#ConsumeThreadNums}</li>
+     * <li>{@link PropertyKeyConst#OnsChannel}</li>
+     * </ul>
+     * </p>
+     *
+     * @param properties Consumer's configuration
+     * @return {@code PullConsumer} Thread safe {@link PullConsumer} instance
+     */
+    public static PullConsumer createPullConsumer(final Properties properties) {
+        return onsFactory.createPullConsumer(properties);
+    }
+
 }
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java
index 67186b4..f9213a6 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java
@@ -16,15 +16,16 @@
  */
 package org.apache.rocketmq.ons.api;
 
-import io.openmessaging.api.Consumer;
-import io.openmessaging.api.MessagingAccessPoint;
-import io.openmessaging.api.Producer;
-import io.openmessaging.api.batch.BatchConsumer;
-import io.openmessaging.api.order.OrderConsumer;
-import io.openmessaging.api.order.OrderProducer;
-import io.openmessaging.api.transaction.LocalTransactionChecker;
-import io.openmessaging.api.transaction.TransactionProducer;
-import java.util.Properties;
+    import io.openmessaging.api.Consumer;
+    import io.openmessaging.api.MessagingAccessPoint;
+    import io.openmessaging.api.Producer;
+    import io.openmessaging.api.PullConsumer;
+    import io.openmessaging.api.batch.BatchConsumer;
+    import io.openmessaging.api.order.OrderConsumer;
+    import io.openmessaging.api.order.OrderProducer;
+    import io.openmessaging.api.transaction.LocalTransactionChecker;
+    import io.openmessaging.api.transaction.TransactionProducer;
+    import java.util.Properties;
 
 /**
  * {@link MessagingAccessPoint} is recommended.
@@ -34,19 +35,16 @@ public interface ONSFactoryAPI {
 
     Producer createProducer(final Properties properties);
 
-
     Consumer createConsumer(final Properties properties);
 
-
     BatchConsumer createBatchConsumer(final Properties properties);
 
-
     OrderProducer createOrderProducer(final Properties properties);
 
-
     OrderConsumer createOrderedConsumer(final Properties properties);
 
-
     TransactionProducer createTransactionProducer(final Properties properties,
         final LocalTransactionChecker checker);
+
+    PullConsumer createPullConsumer(final Properties properties);
 }
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java
index 18ac9e4..5cded3a 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java
@@ -20,6 +20,7 @@ import io.openmessaging.api.Consumer;
 import io.openmessaging.api.Message;
 import io.openmessaging.api.OMS;
 import io.openmessaging.api.Producer;
+import io.openmessaging.api.PullConsumer;
 import io.openmessaging.api.batch.BatchConsumer;
 import io.openmessaging.api.order.OrderConsumer;
 import io.openmessaging.api.order.OrderProducer;
@@ -38,6 +39,7 @@ import org.apache.rocketmq.ons.api.impl.rocketmq.ONSUtil;
 import org.apache.rocketmq.ons.api.impl.rocketmq.OrderConsumerImpl;
 import org.apache.rocketmq.ons.api.impl.rocketmq.OrderProducerImpl;
 import org.apache.rocketmq.ons.api.impl.rocketmq.ProducerImpl;
+import org.apache.rocketmq.ons.api.impl.rocketmq.PullConsumerImpl;
 import org.apache.rocketmq.ons.api.impl.rocketmq.TransactionProducerImpl;
 
 /**
@@ -89,4 +91,9 @@ public class ONSFactoryImpl implements ONSFactoryAPI {
             }
         });
     }
+
+    @Override
+    public PullConsumer createPullConsumer(Properties properties) {
+        return new PullConsumerImpl(properties);
+    }
 }
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
new file mode 100644
index 0000000..c3a5c4d
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api.impl.rocketmq;
+
+import io.openmessaging.api.Message;
+import io.openmessaging.api.PullConsumer;
+import io.openmessaging.api.TopicPartition;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.api.Constants;
+import org.apache.rocketmq.ons.api.PropertyKeyConst;
+import org.apache.rocketmq.ons.api.PropertyValueConst;
+import org.apache.rocketmq.ons.api.exception.ONSClientException;
+import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+
+public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer {
+    private final static InternalLogger LOGGER = ClientLoggerUtil.getClientLogger();
+    private final static int MAX_CACHED_MESSAGE_SIZE_IN_MIB = 1024;
+    private final static int MIN_CACHED_MESSAGE_SIZE_IN_MIB = 16;
+    private final static int MAX_CACHED_MESSAGE_AMOUNT = 50000;
+    private final static int MIN_CACHED_MESSAGE_AMOUNT = 100;
+
+    private DefaultLitePullConsumer litePullConsumer;
+
+    private final String TOPIC_PARTITION_SPLITER = "#";
+
+    private int maxCachedMessageSizeInMiB = 512;
+
+    private int maxCachedMessageAmount = 5000;
+
+    public PullConsumerImpl(Properties properties) {
+        super(properties);
+        String consumerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.GROUP_ID));
+        if (StringUtils.isEmpty(consumerGroup)) {
+            throw new ONSClientException("Unable to get GROUP_ID property");
+        }
+
+        this.litePullConsumer =
+            new DefaultLitePullConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials));
+
+        String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
+        this.litePullConsumer.setMessageModel(MessageModel.valueOf(messageModel));
+
+        String maxBatchMessageCount = properties.getProperty(PropertyKeyConst.MAX_BATCH_MESSAGE_COUNT);
+        if (!UtilAll.isBlank(maxBatchMessageCount)) {
+            this.litePullConsumer.setPullBatchSize(Integer.valueOf(maxBatchMessageCount));
+        }
+
+        boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled,
"false"));
+        this.litePullConsumer.setVipChannelEnabled(isVipChannelEnabled);
+        if (properties.containsKey(PropertyKeyConst.LANGUAGE_IDENTIFIER)) {
+            int language = Integer.valueOf(properties.get(PropertyKeyConst.LANGUAGE_IDENTIFIER).toString());
+            byte languageByte = (byte) language;
+            this.litePullConsumer.setLanguage(LanguageCode.valueOf(languageByte));
+        }
+        String instanceName = properties.getProperty(PropertyKeyConst.InstanceName, this.buildIntanceName());
+        this.litePullConsumer.setInstanceName(instanceName);
+        this.litePullConsumer.setNamesrvAddr(this.getNameServerAddr());
+
+        String consumeThreadNums = properties.getProperty(PropertyKeyConst.ConsumeThreadNums);
+        if (!UtilAll.isBlank(consumeThreadNums)) {
+            this.litePullConsumer.setPullThreadNums(Integer.valueOf(consumeThreadNums));
+        }
+
+        String configuredCachedMessageAmount = properties.getProperty(PropertyKeyConst.MaxCachedMessageAmount);
+        if (!UtilAll.isBlank(configuredCachedMessageAmount)) {
+            maxCachedMessageAmount = Math.min(MAX_CACHED_MESSAGE_AMOUNT, Integer.valueOf(configuredCachedMessageAmount));
+            maxCachedMessageAmount = Math.max(MIN_CACHED_MESSAGE_AMOUNT, maxCachedMessageAmount);
+            this.litePullConsumer.setPullThresholdForAll(maxCachedMessageAmount);
+        }
+
+        String configuredCachedMessageSizeInMiB = properties.getProperty(PropertyKeyConst.MaxCachedMessageSizeInMiB);
+        if (!UtilAll.isBlank(configuredCachedMessageSizeInMiB)) {
+            maxCachedMessageSizeInMiB = Math.min(MAX_CACHED_MESSAGE_SIZE_IN_MIB, Integer.valueOf(configuredCachedMessageSizeInMiB));
+            maxCachedMessageSizeInMiB = Math.max(MIN_CACHED_MESSAGE_SIZE_IN_MIB, maxCachedMessageSizeInMiB);
+            this.litePullConsumer.setPullThresholdSizeForQueue(maxCachedMessageSizeInMiB);
+        }
+
+//        String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch);
+//        if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch)))
{
+//            LOGGER.info("MQ Client Disable the Trace Hook!");
+//        } else {
+//            try {
+//                Properties tempProperties = new Properties();
+//                tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
+//                tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
+//                tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
+//                tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
+//                tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
+//                tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
+//                tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
+//                tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
+//                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
sessionCredentials);
+//                dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl());
+//                traceDispatcher = dispatcher;
+//                this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
+//                    new OnsConsumeMessageHookImpl(traceDispatcher));
+//            } catch (Throwable e) {
+//                LOGGER.error("system mqtrace hook init failed ,maybe can't send msg trace
data", e);
+//            }
+//        }
+
+    }
+
+    @Override protected void updateNameServerAddr(String nameServerAddresses) {
+        //TODO
+    }
+
+    private Set<TopicPartition> convertToTopicPartitions(Collection<MessageQueue>
messageQueues) {
+        Set<TopicPartition> topicPartitions = new HashSet<>();
+        for (MessageQueue messageQueue : messageQueues) {
+            TopicPartition topicPartition = convertToTopicPartition(messageQueue);
+            topicPartitions.add(topicPartition);
+        }
+        return topicPartitions;
+    }
+
+    private Set<MessageQueue> convertToMessageQueues(Collection<TopicPartition>
topicPartitions) {
+        Set<MessageQueue> messageQueues = new HashSet<>();
+        for (TopicPartition topicPartition : topicPartitions) {
+            messageQueues.add(convertToMessageQueue(topicPartition));
+        }
+        return messageQueues;
+    }
+
+    private TopicPartition convertToTopicPartition(MessageQueue messageQueue) {
+        String topic = messageQueue.getTopic();
+        String partition = messageQueue.getBrokerName() + TOPIC_PARTITION_SPLITER + messageQueue.getQueueId();
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        return topicPartition;
+    }
+
+    private MessageQueue convertToMessageQueue(TopicPartition topicPartition) {
+        String topic = topicPartition.getTopic();
+        String[] tmp = topicPartition.getPartition().split(TOPIC_PARTITION_SPLITER);
+        if (tmp.length != 2) {
+            LOGGER.warn("Failed to get message queue from TopicPartition: {}", topicPartition);
+            throw new ONSClientException("Failed to get message queue");
+        }
+        String brokerName = tmp[0];
+        int queueId = Integer.valueOf(tmp[1]);
+        return new MessageQueue(topic, brokerName, queueId);
+    }
+
+    @Override public Set<TopicPartition> topicPartitions(String topic) {
+        try {
+            Collection<MessageQueue> messageQueues = litePullConsumer.fetchMessageQueues(topic);
+            Set<TopicPartition> topicPartitions = new HashSet<>();
+            for (MessageQueue messageQueue : messageQueues) {
+                topicPartitions.add(convertToTopicPartition(messageQueue));
+            }
+            return topicPartitions;
+        } catch (MQClientException ex) {
+            throw new ONSClientException("defaultMQPushConsumer subscribe exception", ex);
+        }
+    }
+
+    @Override public void assign(Collection<TopicPartition> topicPartitions) {
+        Set<MessageQueue> messageQueues = new HashSet<>();
+        for (TopicPartition topicPartition : topicPartitions) {
+            messageQueues.add(convertToMessageQueue(topicPartition));
+        }
+        this.litePullConsumer.assign(messageQueues);
+    }
+
+    @Override public void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener
callback) {
+        TopicMessageQueueChangeListener listener = new TopicMessageQueueChangeListener()
{
+            @Override public void onChanged(String topic, Set<MessageQueue> messageQueues)
{
+                callback.onChanged(convertToTopicPartitions(messageQueues));
+            }
+        };
+        try {
+            this.litePullConsumer.registerTopicMessageQueueChangeListener(topic, listener);
+
+        } catch (MQClientException ex) {
+            LOGGER.warn("Register listener error", ex);
+            throw new ONSClientException("Failed to register topic partition listener");
+        }
+    }
+
+    @Override public List<Message> poll(long timeout) {
+        List<MessageExt> rmqMsgList = litePullConsumer.poll(timeout);
+        List<Message> msgList = new ArrayList<Message>();
+        for (MessageExt rmqMsg : rmqMsgList) {
+            Message msg = ONSUtil.msgConvert(rmqMsg);
+            Map<String, String> propertiesMap = rmqMsg.getProperties();
+            msg.setMsgID(rmqMsg.getMsgId());
+            if (propertiesMap != null && propertiesMap.get(Constants.TRANSACTION_ID)
!= null) {
+                msg.setMsgID(propertiesMap.get(Constants.TRANSACTION_ID));
+            }
+            msgList.add(msg);
+        }
+        return msgList;
+    }
+
+    @Override public void seek(TopicPartition topicPartition, long offset) {
+        MessageQueue messageQueue = convertToMessageQueue(topicPartition);
+        try {
+            litePullConsumer.seek(messageQueue, offset);
+        } catch (MQClientException ex) {
+            LOGGER.warn("Topic partition: {} seek to: {} error", topicPartition, offset,
ex);
+            throw new ONSClientException("Seek offset failed");
+        }
+    }
+
+    @Override public void seekToBeginning(TopicPartition topicPartition) {
+        //TODO
+    }
+
+    @Override public void seekToEnd(TopicPartition topicPartition) {
+        //TODO
+
+    }
+
+    @Override public void pause(Collection<TopicPartition> topicPartitions) {
+        this.litePullConsumer.pause(convertToMessageQueues(topicPartitions));
+    }
+
+    @Override public void resume(Collection<TopicPartition> topicPartitions) {
+        this.litePullConsumer.resume(convertToMessageQueues(topicPartitions));
+    }
+
+    @Override public Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp)
{
+        try {
+            return litePullConsumer.offsetForTimestamp(convertToMessageQueue(topicPartition),
timestamp);
+        } catch (MQClientException ex) {
+            LOGGER.warn("Get offset for topic partition:{} with timestamp:{} error", topicPartition,
timestamp, ex);
+            throw new ONSClientException("Failed to get offset");
+        }
+    }
+
+    @Override public Long committed(TopicPartition topicPartition) {
+        try {
+            return litePullConsumer.committed(convertToMessageQueue(topicPartition));
+        } catch (MQClientException ex) {
+            LOGGER.warn("Get committed offset for topic partition: {} error", topicPartition,
ex);
+            throw new ONSClientException("Failed to get committed offset");
+        }
+    }
+
+    @Override public void commitSync() {
+        litePullConsumer.commitSync();
+    }
+
+    @Override public void start() {
+        try {
+            if (this.started.compareAndSet(false, true)) {
+                this.litePullConsumer.start();
+                super.start();
+            }
+        } catch (Exception e) {
+            throw new ONSClientException(e.getMessage());
+        }
+    }
+
+    @Override public void shutdown() {
+        if (this.started.compareAndSet(true, false)) {
+            this.litePullConsumer.shutdown();
+        }
+        super.shutdown();
+    }
+}
diff --git a/ons-core/pom.xml b/ons-core/pom.xml
index 7a3da6a..62851ac 100644
--- a/ons-core/pom.xml
+++ b/ons-core/pom.xml
@@ -44,7 +44,7 @@
         <java_target_version>1.8</java_target_version>
         <file_encoding>UTF-8</file_encoding>
         <!-- Always use stable version of RocketMQ -->
-        <rocketmq.version>4.5.2</rocketmq.version>
+        <rocketmq.version>4.6.0</rocketmq.version>
         <auth.version>${project.version}</auth.version>
         <spring.version>4.1.2.RELEASE</spring.version>
         <diamond.version>3.7.4</diamond.version>
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java
new file mode 100644
index 0000000..91bf280
--- /dev/null
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.sample.consumer;
+
+import io.openmessaging.api.Message;
+import io.openmessaging.api.MessagingAccessPoint;
+import io.openmessaging.api.OMS;
+import io.openmessaging.api.PullConsumer;
+import io.openmessaging.api.TopicPartition;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.rocketmq.ons.api.PropertyKeyConst;
+import org.apache.rocketmq.ons.sample.MQConfig;
+
+public class SimplePullConsumer {
+    public static void main(String[] args) {
+
+        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876");
+
+        Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID);
+        consumerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY);
+        consumerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY);
+
+        PullConsumer consumer = messagingAccessPoint.createPullConsumer(consumerProperties);
+        /*
+         * Alternatively, you can use the ONSFactory to create instance directly.
+         * <pre>
+         * {@code
+         * consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
+         * OrderConsumer consumer  = ONSFactory.createOrderedConsumer(consumerProperties);
+         * }
+         * </pre>
+         */
+
+        consumer.start();
+        Set<TopicPartition> topicPartitions = consumer.topicPartitions(MQConfig.TOPIC);
+        consumer.assign(topicPartitions);
+
+        while (true){
+            List<Message> messages = consumer.poll(3000);
+            System.out.printf("Received message: %s %n", messages);
+            consumer.commitSync();
+        }
+
+
+    }
+}
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java
index 973dc4d..bcfa454 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java
@@ -24,7 +24,7 @@ public class LocalTransactionCheckerImpl implements LocalTransactionChecker
{
 
     @Override
     public TransactionStatus check(Message msg) {
-        System.out.printf("Receive transaction check back request, MsgId: %s%n", msg.getMsgID());
+        System.out.printf("Receive transaction check back request, MsgId: %s%n", msg);
         return TransactionStatus.CommitTransaction;
     }
 }
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
index 44da502..2e41e22 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
@@ -60,8 +60,8 @@ public class SimpleTransactionProducer {
                 SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecuter()
{
                     @Override
                     public TransactionStatus execute(Message msg, Object arg) {
-                        System.out.printf("Execute local transaction and return TransactionStatus.
%n");
-                        return TransactionStatus.CommitTransaction;
+                        System.out.printf("Execute local transaction and return TransactionStatus.
%s %n", msg);
+                        return TransactionStatus.Unknow;
                     }
                 }, null);
                 assert sendResult != null;
diff --git a/pom.xml b/pom.xml
index 71433c0..40f5660 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,8 +45,8 @@
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <file_encoding>UTF-8</file_encoding>
         <!-- Compiler settings properties -->
-        <maven.compiler.source>1.7</maven.compiler.source>
-        <maven.compiler.target>1.7</maven.compiler.target>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
         <eagleeye.core.version>1.4.8</eagleeye.core.version>
     </properties>
     <modules>


Mime
View raw message