rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huzongt...@apache.org
Subject [rocketmq] branch feature_oms_1.0.0 updated: [ISSUE #1199] Implement the 1.0.0 openmessaging new consumer API for rocketmq oms module (#1240)
Date Thu, 20 Jun 2019 02:06:09 GMT
This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch feature_oms_1.0.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/feature_oms_1.0.0 by this push:
     new 246e34e  [ISSUE #1199] Implement the 1.0.0 openmessaging new consumer API for rocketmq oms module (#1240)
246e34e is described below

commit 246e34eb01cdf46db20dccea9f04dc4fae279715
Author: zhoubo <877036922@qq.com>
AuthorDate: Thu Jun 20 10:06:04 2019 +0800

    [ISSUE #1199] Implement the 1.0.0 openmessaging new consumer API for rocketmq oms module (#1240)
    
    * Adapt to the new consumer api
    
    * New consumer api implements code optimization
    
    * Adjust consumer example
    
    * 1、Optimize consumer code implementation
    2、Fix bug
    
    * 1、Optimize consumer code implementation
    2、Fix bug
    
    * Add new api unit test
    
    * Rename OMSUtil to OMSClientUtil
    
    * Unit test adds more verification
---
 .../example/openmessaging/SimplePullConsumer.java  |  39 +++--
 .../example/openmessaging/SimplePushConsumer.java  |  25 +--
 .../rocketmq/MessagingAccessPointImpl.java         |  93 +++--------
 .../rocketmq/config/DefaultQueueMetaData.java      |  65 --------
 .../rocketmq/consumer/LocalMessageCache.java       |  62 +++----
 .../rocketmq/consumer/PullConsumerImpl.java        | 164 ++++++-------------
 .../rocketmq/consumer/PushConsumerImpl.java        | 179 +++++++--------------
 .../rocketmq/domain/BytesMessageImpl.java          |  24 ++-
 .../rocketmq/domain/DefaultMessageFactory.java     |  29 ++++
 .../rocketmq/domain/DefaultMessageReceipt.java     |  66 ++++++++
 .../rocketmq/domain/DefaultQueueMetaData.java      |  72 +++++++++
 .../rocketmq/domain/MessageExtension.java          |  34 ++++
 .../rocketmq/domain/MessageExtensionHeader.java    | 146 +++++++++++++++++
 .../rocketmq/domain/MessageHeader.java             |  13 ++
 .../rocketmq/domain/NonStandardKeys.java           |   4 +-
 .../rocketmq/producer/AbstractOMSProducer.java     |   2 +-
 .../rocketmq/producer/ProducerImpl.java            |  38 +++--
 .../utils/{OMSUtil.java => OMSClientUtil.java}     |  28 +++-
 .../rocketmq/consumer/LocalMessageCacheTest.java   |  90 +++++++++++
 .../rocketmq/consumer/PullConsumerImplTest.java    | 155 ++++++++++++++++--
 .../rocketmq/consumer/PushConsumerImplTest.java    | 101 ++++++++++--
 21 files changed, 949 insertions(+), 480 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 86aba41..7d82dc8 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -16,40 +16,44 @@
  */
 package org.apache.rocketmq.example.openmessaging;
 
-import io.openmessaging.Message;
 import io.openmessaging.MessagingAccessPoint;
 import io.openmessaging.OMS;
-import io.openmessaging.OMSBuiltinKeys;
 import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.message.Message;
 import io.openmessaging.producer.Producer;
 import io.openmessaging.producer.SendResult;
+import io.openmessaging.rocketmq.domain.DefaultMessageReceipt;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.util.HashSet;
+import java.util.Set;
 
 public class SimplePullConsumer {
     public static void main(String[] args) {
         final MessagingAccessPoint messagingAccessPoint =
             OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
 
-        messagingAccessPoint.startup();
-
         final Producer producer = messagingAccessPoint.createProducer();
 
         final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
-            OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_ID, "OMS_CONSUMER"));
 
-        messagingAccessPoint.startup();
         System.out.printf("MessagingAccessPoint startup OK%n");
 
-        final String queueName = "TopicTest";
-
-        producer.startup();
-        Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
+        final String queueName = "OMS_HELLO_TOPIC";
+        producer.start();
+        Message msg = producer.createMessage(queueName, "Hello Open Messaging".getBytes());
         SendResult sendResult = producer.send(msg);
         System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
-        producer.shutdown();
+        producer.stop();
 
-        consumer.attachQueue(queueName);
+        Set<String> queueNames = new HashSet<String>(8) {
+            {
+                add(queueName);
+            }
+        };
+        consumer.bindQueue(queueNames);
 
-        consumer.startup();
+        consumer.start();
         System.out.printf("Consumer startup OK%n");
 
         // Keep running until we find the one that has just been sent
@@ -57,9 +61,11 @@ public class SimplePullConsumer {
         while (!stop) {
             Message message = consumer.receive();
             if (message != null) {
-                String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
+                String msgId = message.header().getMessageId();
                 System.out.printf("Received one message: %s%n", msgId);
-                consumer.ack(msgId);
+                DefaultMessageReceipt defaultMessageReceipt = new DefaultMessageReceipt();
+                defaultMessageReceipt.setMessageId(msgId);
+                consumer.ack(defaultMessageReceipt);
 
                 if (!stop) {
                     stop = msgId.equalsIgnoreCase(sendResult.messageId());
@@ -70,7 +76,6 @@ public class SimplePullConsumer {
             }
         }
 
-        consumer.shutdown();
-        messagingAccessPoint.shutdown();
+        consumer.stop();
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
index 220c132..7ac905a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -16,12 +16,14 @@
  */
 package org.apache.rocketmq.example.openmessaging;
 
-import io.openmessaging.Message;
 import io.openmessaging.MessagingAccessPoint;
 import io.openmessaging.OMS;
-import io.openmessaging.OMSBuiltinKeys;
 import io.openmessaging.consumer.MessageListener;
 import io.openmessaging.consumer.PushConsumer;
+import io.openmessaging.message.Message;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.util.HashSet;
+import java.util.Set;
 
 public class SimplePushConsumer {
     public static void main(String[] args) {
@@ -29,28 +31,29 @@ public class SimplePushConsumer {
             .getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
 
         final PushConsumer consumer = messagingAccessPoint.
-            createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
-
-        messagingAccessPoint.startup();
-        System.out.printf("MessagingAccessPoint startup OK%n");
+            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_ID, "OMS_CONSUMER"));
 
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                consumer.shutdown();
-                messagingAccessPoint.shutdown();
+                consumer.stop();
             }
         }));
 
-        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
+        Set<String> queueNames = new HashSet<String>(8) {
+            {
+                add("OMS_HELLO_TOPIC");
+            }
+        };
+        consumer.bindQueue(queueNames, new MessageListener() {
             @Override
             public void onReceived(Message message, Context context) {
-                System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
+                System.out.printf("Received one message: %s%n", message.header().getMessageId());
                 context.ack();
             }
         });
 
-        consumer.startup();
+        consumer.start();
         System.out.printf("Consumer startup OK%n");
     }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index f045a41..715adb4 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -18,24 +18,26 @@ package io.openmessaging.rocketmq;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.consumer.Consumer;
+import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.consumer.PushConsumer;
 import io.openmessaging.manager.ResourceManager;
 import io.openmessaging.message.MessageFactory;
 import io.openmessaging.producer.Producer;
 import io.openmessaging.producer.TransactionStateCheckListener;
 import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
 import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.domain.DefaultMessageFactory;
 import io.openmessaging.rocketmq.producer.ProducerImpl;
-import java.util.HashSet;
-import java.util.Set;
 
 public class MessagingAccessPointImpl implements MessagingAccessPoint {
 
     private final KeyValue accessPointProperties;
 
+    private final MessageFactory messageFactory;
+
     public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
         this.accessPointProperties = accessPointProperties;
+        this.messageFactory = new DefaultMessageFactory();
     }
 
     @Override
@@ -57,77 +59,34 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
         return null;
     }
 
-    @Override public Consumer createConsumer() {
-        String consumerId = accessPointProperties.getString(NonStandardKeys.CONSUMER_ID);
-        String[] nsStrArr = consumerId.split("_");
-        if (nsStrArr.length < 2) {
-            return new PushConsumerImpl(accessPointProperties);
-        }
-        if (NonStandardKeys.PULL_CONSUMER.equals(nsStrArr[0])) {
-            return new PullConsumerImpl(accessPointProperties);
-        }
+    @Override public PushConsumer createPushConsumer() {
         return new PushConsumerImpl(accessPointProperties);
     }
 
-    @Override
-    public ResourceManager resourceManager() {
-        DefaultResourceManager resourceManager = new DefaultResourceManager();
-        return resourceManager;
-    }
-
-    @Override public MessageFactory messageFactory() {
-        return null;
+    @Override public PullConsumer createPullConsumer() {
+        return new PullConsumerImpl(accessPointProperties);
     }
 
-    class DefaultResourceManager implements ResourceManager {
-
-        @Override
-        public void createNamespace(String nsName) {
-            accessPointProperties.put(NonStandardKeys.CONSUMER_ID, nsName);
-        }
-
-        @Override
-        public void deleteNamespace(String nsName) {
-            accessPointProperties.put(NonStandardKeys.CONSUMER_ID, null);
-        }
-
-        @Override
-        public void switchNamespace(String targetNamespace) {
-            accessPointProperties.put(NonStandardKeys.CONSUMER_ID, targetNamespace);
+    @Override public PushConsumer createPushConsumer(KeyValue attributes) {
+        for (String key : attributes.keySet()) {
+            accessPointProperties.put(key, attributes.getString(key));
         }
+        return new PushConsumerImpl(accessPointProperties);
+    }
 
-        @Override
-        public Set<String> listNamespaces() {
-            return new HashSet<String>() {
-                {
-                    add(accessPointProperties.getString(NonStandardKeys.CONSUMER_ID));
-                }
-            };
-        }
-
-        @Override
-        public void createQueue(String queueName) {
-
-        }
-
-        @Override
-        public void deleteQueue(String queueName) {
-
-        }
-
-        @Override
-        public Set<String> listQueues(String nsName) {
-            return null;
-        }
-
-        @Override
-        public void filter(String queueName, String filterString) {
-
+    @Override public PullConsumer createPullConsumer(KeyValue attributes) {
+        for (String key : attributes.keySet()) {
+            accessPointProperties.put(key, attributes.getString(key));
         }
+        return new PullConsumerImpl(accessPointProperties);
+    }
 
-        @Override
-        public void routing(String sourceQueue, String targetQueue) {
+    @Override
+    public ResourceManager resourceManager() {
+        return null;
+    }
 
-        }
-    };
+    @Override public MessageFactory messageFactory() {
+        return messageFactory;
+    }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java
deleted file mode 100644
index b2695bf..0000000
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.rocketmq.config;
-
-import io.openmessaging.extension.QueueMetaData;
-
-import java.util.List;
-
-public class DefaultQueueMetaData implements QueueMetaData {
-
-    private String queueName;
-
-    private List<QueueMetaData.Partition> partitions;
-
-    public DefaultQueueMetaData(String queueName, List<QueueMetaData.Partition> partitions) {
-        this.queueName = queueName;
-        this.partitions = partitions;
-    }
-
-    @Override
-    public String queueName() {
-        return queueName;
-    }
-
-    @Override
-    public List<QueueMetaData.Partition> partitions() {
-        return partitions;
-    }
-
-    public static class DefaultPartition implements Partition {
-
-        public DefaultPartition(int partitionId, String partitonHost) {
-            this.partitionId = partitionId;
-            this.partitonHost = partitonHost;
-        }
-
-        private int partitionId;
-
-        private String partitonHost;
-
-        @Override
-        public int partitionId() {
-            return partitionId;
-        }
-
-        @Override
-        public String partitonHost() {
-            return partitonHost;
-        }
-    }
-}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
index c0f498b..1838983 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -21,9 +21,9 @@ import io.openmessaging.ServiceLifeState;
 import io.openmessaging.ServiceLifecycle;
 import io.openmessaging.extension.QueueMetaData;
 import io.openmessaging.rocketmq.config.ClientConfig;
-import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
 import io.openmessaging.rocketmq.domain.ConsumeRequest;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.utils.OMSClientUtil;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -36,6 +36,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -67,7 +68,7 @@ class LocalMessageCache implements ServiceLifecycle {
         this.rocketmqPullConsumer = rocketmqPullConsumer;
         this.clientConfig = clientConfig;
         this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-                "OMS_CleanExpireMsgScheduledThread_"));
+            "OMS_CleanExpireMsgScheduledThread_"));
         this.currentState = ServiceLifeState.INITIALIZED;
     }
 
@@ -79,7 +80,7 @@ class LocalMessageCache implements ServiceLifecycle {
         if (!pullOffsetTable.containsKey(remoteQueue)) {
             try {
                 pullOffsetTable.putIfAbsent(remoteQueue,
-                        rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
+                    rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
             } catch (MQClientException e) {
                 log.error("A error occurred in fetch consume offset process.", e);
             }
@@ -125,19 +126,28 @@ class LocalMessageCache implements ServiceLifecycle {
         return null;
     }
 
-    List<MessageExt> batchPoll(final KeyValue properties) {
-        List<ConsumeRequest> consumeRequests = new ArrayList<>(16);
-        int n = consumeRequestCache.drainTo(consumeRequests);
-        if (n > 0) {
-            List<MessageExt> messageExts = new ArrayList<>(n);
-            for (ConsumeRequest consumeRequest : consumeRequests) {
-                MessageExt messageExt = consumeRequest.getMessageExt();
-                consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
-                MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
-                consumedRequest.put(messageExt.getMsgId(), consumeRequest);
-                messageExts.add(messageExt);
+    List<MessageExt> batchPoll(KeyValue properties) {
+        List<ConsumeRequest> consumeRequests = new ArrayList<>(clientConfig.getRmqPullMessageBatchNums());
+        long timeout = properties.getLong(NonStandardKeys.TIMEOUT);
+        while (timeout >= 0) {
+            int n = consumeRequestCache.drainTo(consumeRequests, clientConfig.getRmqPullMessageBatchNums());
+            if (n > 0) {
+                List<MessageExt> messageExts = new ArrayList<>(n);
+                for (ConsumeRequest consumeRequest : consumeRequests) {
+                    MessageExt messageExt = consumeRequest.getMessageExt();
+                    consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+                    MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
+                    consumedRequest.put(messageExt.getMsgId(), consumeRequest);
+                    messageExts.add(messageExt);
+                }
+                return messageExts;
+            }
+            if (timeout > 0) {
+                LockSupport.parkNanos(timeout * 1000 * 1000);
+                timeout = 0;
+            } else {
+                timeout = -1;
             }
-            return messageExts;
         }
         return null;
     }
@@ -166,7 +176,7 @@ class LocalMessageCache implements ServiceLifecycle {
 
     private void cleanExpireMsg() {
         for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl()
-                .getRebalanceImpl().getProcessQueueTable().entrySet()) {
+            .getRebalanceImpl().getProcessQueueTable().entrySet()) {
             ProcessQueue pq = next.getValue();
             MessageQueue mq = next.getKey();
             ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
@@ -186,7 +196,7 @@ class LocalMessageCache implements ServiceLifecycle {
                         if (!msgTreeMap.isEmpty()) {
                             msg = msgTreeMap.firstEntry().getValue();
                             if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
-                                    > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
+                                > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
                                 //Expired, ack and remove it.
                             } else {
                                 break;
@@ -204,7 +214,7 @@ class LocalMessageCache implements ServiceLifecycle {
                 try {
                     rocketmqPullConsumer.sendMessageBack(msg, 3);
                     log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
-                            msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+                        msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
                     ack(mq, pq, msg);
                 } catch (Exception e) {
                     log.error("Send back expired msg exception", e);
@@ -237,7 +247,7 @@ class LocalMessageCache implements ServiceLifecycle {
     public void stop() {
         this.currentState = ServiceLifeState.STOPPING;
         ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS);
-        this.currentState = ServiceLifeState.STARTED;
+        this.currentState = ServiceLifeState.STOPPED;
     }
 
     @Override
@@ -246,7 +256,7 @@ class LocalMessageCache implements ServiceLifecycle {
     }
 
     @Override
-    public QueueMetaData getQueueMetaData(String queueName) {
+    public Set<QueueMetaData> getQueueMetaData(String queueName) {
         Set<MessageQueue> messageQueues;
         try {
             messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
@@ -254,16 +264,6 @@ class LocalMessageCache implements ServiceLifecycle {
             log.error("A error occurred when get queue metadata.", e);
             return null;
         }
-        List<QueueMetaData.Partition> partitions = new ArrayList<>(16);
-        if (null != messageQueues && !messageQueues.isEmpty()) {
-            for (MessageQueue messageQueue : messageQueues) {
-                QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName());
-                partitions.add(partition);
-            }
-        } else {
-            return null;
-        }
-        QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions);
-        return queueMetaData;
+        return OMSClientUtil.queueMetaDataConvert(messageQueues);
     }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index 03ff901..f4efa92 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -18,10 +18,8 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.ServiceLifeState;
-import io.openmessaging.consumer.BatchMessageListener;
-import io.openmessaging.consumer.Consumer;
-import io.openmessaging.consumer.MessageListener;
 import io.openmessaging.consumer.MessageReceipt;
+import io.openmessaging.consumer.PullConsumer;
 import io.openmessaging.exception.OMSRuntimeException;
 import io.openmessaging.extension.Extension;
 import io.openmessaging.extension.QueueMetaData;
@@ -31,10 +29,13 @@ import io.openmessaging.message.Message;
 import io.openmessaging.rocketmq.config.ClientConfig;
 import io.openmessaging.rocketmq.domain.BytesMessageImpl;
 import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.DefaultMessageReceipt;
+import io.openmessaging.rocketmq.domain.MessageExtension;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import io.openmessaging.rocketmq.utils.BeanUtils;
-import io.openmessaging.rocketmq.utils.OMSUtil;
+import io.openmessaging.rocketmq.utils.OMSClientUtil;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -55,10 +56,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
-public class PullConsumerImpl implements Consumer {
-
-    private static final int PULL_MAX_NUMS = 32;
-    private static final int PULL_MIN_NUMS = 1;
+public class PullConsumerImpl implements PullConsumer {
 
     private final DefaultMQPullConsumer rocketmqPullConsumer;
     private final KeyValue properties;
@@ -67,7 +65,8 @@ public class PullConsumerImpl implements Consumer {
     private final LocalMessageCache localMessageCache;
     private final ClientConfig clientConfig;
     private ServiceLifeState currentState;
-    private List<ConsumerInterceptor> consumerInterceptors;
+    private final List<ConsumerInterceptor> consumerInterceptors;
+    private final Extension extension;
 
     private final static InternalLogger log = ClientLogger.getLog();
 
@@ -96,15 +95,15 @@ public class PullConsumerImpl implements Consumer {
         int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes();
         this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
 
-        String consumerId = OMSUtil.buildInstanceName();
+        String consumerId = OMSClientUtil.buildInstanceName();
         this.rocketmqPullConsumer.setInstanceName(consumerId);
         properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
 
         this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
 
         this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
-
         consumerInterceptors = new ArrayList<>(16);
+        this.extension = new MessageExtension(this);
     }
 
     private void registerPullTaskCallback(final String targetQueueName) {
@@ -139,96 +138,36 @@ public class PullConsumerImpl implements Consumer {
         });
     }
 
-    @Override
-    public void resume() {
-        currentState = ServiceLifeState.STARTED;
-    }
-
-    @Override
-    public void suspend() {
-        currentState = ServiceLifeState.STOPPED;
+    @Override public Set<String> getBindQueues() {
+        return rocketmqPullConsumer.getRegisterTopics();
     }
 
     @Override
-    public void suspend(long timeout) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isSuspended() {
-        if (ServiceLifeState.STOPPED.equals(currentState)) {
-            return true;
-        }
-        return false;
+    public void addInterceptor(ConsumerInterceptor interceptor) {
+        consumerInterceptors.add(interceptor);
     }
 
     @Override
-    public void bindQueue(String queueName) {
-        registerPullTaskCallback(queueName);
+    public void removeInterceptor(ConsumerInterceptor interceptor) {
+        consumerInterceptors.remove(interceptor);
     }
 
-    @Override
-    public void bindQueue(List<String> queueNames) {
+    @Override public void bindQueue(Collection<String> queueNames) {
         for (String queueName : queueNames) {
-            bindQueue(queueName);
+            registerPullTaskCallback(queueName);
         }
     }
 
-    @Override
-    public void bindQueue(String queueName, MessageListener listener) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void bindQueues(List<String> queueNames, MessageListener listener) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void bindQueue(String queueName, BatchMessageListener listener) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void bindQueues(List<String> queueNames, BatchMessageListener listener) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void unbindQueue(String queueName) {
-        this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
-    }
-
-    @Override
-    public void unbindQueues(List<String> queueNames) {
+    @Override public void unbindQueue(Collection<String> queueNames) {
         for (String queueName : queueNames) {
             this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
         }
     }
 
-    @Override
-    public boolean isBindQueue() {
-        Set<String> registerTopics = rocketmqPullConsumer.getRegisterTopics();
-        if (null == registerTopics || registerTopics.isEmpty()) {
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public List<String> getBindQueues() {
-        Set<String> registerTopics = rocketmqPullConsumer.getRegisterTopics();
-        return new ArrayList<>(registerTopics);
-    }
-
-    @Override
-    public void addInterceptor(ConsumerInterceptor interceptor) {
-        consumerInterceptors.add(interceptor);
-    }
-
-    @Override
-    public void removeInterceptor(ConsumerInterceptor interceptor) {
-        consumerInterceptors.remove(interceptor);
+    @Override public Message receive() {
+        KeyValue properties = new DefaultKeyValue();
+        MessageExt rmqMsg = localMessageCache.poll(properties);
+        return rmqMsg == null ? null : OMSClientUtil.msgConvert(rmqMsg);
     }
 
     @Override
@@ -236,23 +175,24 @@ public class PullConsumerImpl implements Consumer {
         KeyValue properties = new DefaultKeyValue();
         properties.put(NonStandardKeys.TIMEOUT, timeout);
         MessageExt rmqMsg = localMessageCache.poll(properties);
-        return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
+        return rmqMsg == null ? null : OMSClientUtil.msgConvert(rmqMsg);
     }
 
     @Override
-    public Message receive(String queueName, int partitionId, long receiptId, long timeout) {
-        MessageQueue mq = null;
-        mq = getQueue(queueName, partitionId, mq);
-        PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MIN_NUMS);
-        if (pullResult == null)
+    public Message receive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt, long timeout) {
+        MessageQueue mq;
+        mq = getQueue(queueMetaData);
+        PullResult pullResult = getResult(((DefaultMessageReceipt) messageReceipt).getOffset(), timeout, mq, NonStandardKeys.PULL_MIN_NUMS);
+        if (pullResult == null) {
             return null;
+        }
         PullStatus pullStatus = pullResult.getPullStatus();
         List<Message> messages = new ArrayList<>(16);
         if (PullStatus.FOUND.equals(pullStatus)) {
             List<MessageExt> rmqMsgs = pullResult.getMsgFoundList();
             if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
                 for (MessageExt messageExt : rmqMsgs) {
-                    BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
+                    BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt);
                     messages.add(bytesMessage);
                 }
                 return messages.get(0);
@@ -261,10 +201,10 @@ public class PullConsumerImpl implements Consumer {
         return null;
     }
 
-    private PullResult getResult(long receiptId, long timeout, MessageQueue mq, int nums) {
+    private PullResult getResult(long offset, long timeout, MessageQueue mq, int maxNums) {
         PullResult pullResult;
         try {
-            pullResult = rocketmqPullConsumer.pull(mq, "*", receiptId, nums, timeout);
+            pullResult = rocketmqPullConsumer.pull(mq, "*", offset, maxNums, timeout);
         } catch (MQClientException e) {
             log.error("A error occurred when pull message.", e);
             return null;
@@ -278,17 +218,15 @@ public class PullConsumerImpl implements Consumer {
             log.error("A error occurred when pull message.", e);
             return null;
         }
-        if (null == pullResult) {
-            return null;
-        }
         return pullResult;
     }
 
-    private MessageQueue getQueue(String queueName, int partitionId, MessageQueue mq) {
+    private MessageQueue getQueue(QueueMetaData queueMetaData) {
+        MessageQueue mq = null;
         try {
-            Set<MessageQueue> messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
+            Set<MessageQueue> messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueMetaData.queueName());
             for (MessageQueue messageQueue : messageQueues) {
-                if (messageQueue.getQueueId() == partitionId) {
+                if (messageQueue.getQueueId() == queueMetaData.partitionId()) {
                     mq = messageQueue;
                 }
             }
@@ -306,7 +244,7 @@ public class PullConsumerImpl implements Consumer {
         if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
             List<Message> messages = new ArrayList<>(rmqMsgs.size());
             for (MessageExt messageExt : rmqMsgs) {
-                BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
+                BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt);
                 messages.add(bytesMessage);
             }
             return messages;
@@ -315,19 +253,21 @@ public class PullConsumerImpl implements Consumer {
     }
 
     @Override
-    public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
-        MessageQueue mq = null;
-        mq = getQueue(queueName, partitionId, mq);
-        PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MAX_NUMS);
-        if (pullResult == null)
+    public List<Message> batchReceive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt,
+        long timeout) {
+        MessageQueue mq;
+        mq = getQueue(queueMetaData);
+        PullResult pullResult = getResult(((DefaultMessageReceipt) messageReceipt).getOffset(), timeout, mq, clientConfig.getRmqPullMessageBatchNums());
+        if (pullResult == null) {
             return null;
+        }
         PullStatus pullStatus = pullResult.getPullStatus();
         List<Message> messages = new ArrayList<>(16);
         if (PullStatus.FOUND.equals(pullStatus)) {
             List<MessageExt> rmqMsgs = pullResult.getMsgFoundList();
             if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
                 for (MessageExt messageExt : rmqMsgs) {
-                    BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
+                    BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt);
                     messages.add(bytesMessage);
                 }
                 return messages;
@@ -338,18 +278,12 @@ public class PullConsumerImpl implements Consumer {
 
     @Override
     public void ack(MessageReceipt receipt) {
-
+        localMessageCache.ack(((DefaultMessageReceipt) receipt).getMessageId());
     }
 
     @Override
     public Optional<Extension> getExtension() {
-
-        return Optional.of(new Extension() {
-            @Override
-            public QueueMetaData getQueueMetaData(String queueName) {
-                return getQueueMetaData(queueName);
-            }
-        });
+        return Optional.of(extension);
     }
 
     @Override
@@ -381,7 +315,7 @@ public class PullConsumerImpl implements Consumer {
     }
 
     @Override
-    public QueueMetaData getQueueMetaData(String queueName) {
+    public Set<QueueMetaData> getQueueMetaData(String queueName) {
         return localMessageCache.getQueueMetaData(queueName);
     }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index 8d55b57..4576b28 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -20,27 +20,30 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.OMS;
 import io.openmessaging.ServiceLifeState;
 import io.openmessaging.consumer.BatchMessageListener;
-import io.openmessaging.consumer.Consumer;
 import io.openmessaging.consumer.MessageListener;
 import io.openmessaging.consumer.MessageReceipt;
+import io.openmessaging.consumer.PushConsumer;
 import io.openmessaging.exception.OMSRuntimeException;
 import io.openmessaging.extension.Extension;
 import io.openmessaging.extension.QueueMetaData;
 import io.openmessaging.interceptor.ConsumerInterceptor;
 import io.openmessaging.message.Message;
 import io.openmessaging.rocketmq.config.ClientConfig;
-import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
 import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.MessageExtension;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import io.openmessaging.rocketmq.utils.BeanUtils;
-import io.openmessaging.rocketmq.utils.OMSUtil;
+import io.openmessaging.rocketmq.utils.OMSClientUtil;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -48,12 +51,13 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
-public class PushConsumerImpl implements Consumer {
+public class PushConsumerImpl implements PushConsumer {
 
     private final static InternalLogger log = ClientLogger.getLog();
 
@@ -65,6 +69,8 @@ public class PushConsumerImpl implements Consumer {
     private final ClientConfig clientConfig;
     private ServiceLifeState currentState;
     private List<ConsumerInterceptor> consumerInterceptors;
+    private ScheduledExecutorService scheduledExecutorService;
+    private final Extension extension;
 
     public PushConsumerImpl(final KeyValue properties) {
         this.rocketmqPushConsumer = new DefaultMQPushConsumer();
@@ -89,7 +95,7 @@ public class PushConsumerImpl implements Consumer {
         this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
         this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
 
-        String consumerId = OMSUtil.buildInstanceName();
+        String consumerId = OMSClientUtil.buildInstanceName();
         this.rocketmqPushConsumer.setInstanceName(consumerId);
         properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
         this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
@@ -97,6 +103,9 @@ public class PushConsumerImpl implements Consumer {
         this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
 
         consumerInterceptors = new ArrayList<>(16);
+        scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+            "OMS_SuspendTimeouThread_"));
+        extension = new MessageExtension(this);
         currentState = ServiceLifeState.INITIALIZED;
     }
 
@@ -112,7 +121,12 @@ public class PushConsumerImpl implements Consumer {
 
     @Override
     public void suspend(long timeout) {
-        throw new UnsupportedOperationException();
+        this.rocketmqPushConsumer.suspend();
+        scheduledExecutorService.schedule(new Runnable() {
+            @Override public void run() {
+                PushConsumerImpl.this.rocketmqPushConsumer.resume();
+            }
+        }, timeout, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -120,90 +134,49 @@ public class PushConsumerImpl implements Consumer {
         return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
     }
 
-    @Override
-    public void bindQueue(String queueName) {
-        try {
-            rocketmqPushConsumer.subscribe(queueName, "*");
-        } catch (MQClientException e) {
-            throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
-        }
-    }
-
-    @Override
-    public void bindQueue(List<String> queueNames) {
+    @Override public void bindQueue(Collection<String> queueNames, MessageListener listener) {
         for (String queueName : queueNames) {
-            bindQueue(queueName);
-        }
-    }
-
-    @Override
-    public void bindQueue(String queueName, MessageListener listener) {
-        this.subscribeTable.put(queueName, listener);
-        this.batchSubscribeTable.remove(queueName);
-        try {
-            this.rocketmqPushConsumer.subscribe(queueName, "*");
-        } catch (MQClientException e) {
-            throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
-        }
-    }
-
-    @Override
-    public void bindQueues(List<String> queueNames, MessageListener listener) {
-        for (String queueName : queueNames) {
-            bindQueue(queueName, listener);
-        }
-    }
-
-    @Override
-    public void bindQueue(String queueName, BatchMessageListener listener) {
-        this.batchSubscribeTable.put(queueName, listener);
-        this.subscribeTable.remove(queueName);
-        try {
-            this.rocketmqPushConsumer.subscribe(queueName, "*");
-        } catch (MQClientException e) {
-            throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
+            this.subscribeTable.put(queueName, listener);
+            this.batchSubscribeTable.remove(queueName);
+            this.rocketmqPushConsumer.setConsumeMessageBatchMaxSize(NonStandardKeys.PULL_MIN_NUMS);
+            try {
+                this.rocketmqPushConsumer.subscribe(queueName, "*");
+            } catch (MQClientException e) {
+                throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
+            }
         }
     }
 
-    @Override
-    public void bindQueues(List<String> queueNames, BatchMessageListener listener) {
+    @Override public void bindQueue(Collection<String> queueNames, BatchMessageListener listener) {
         for (String queueName : queueNames) {
-            bindQueue(queueName, listener);
-        }
-    }
-
-    @Override
-    public void unbindQueue(String queueName) {
-        this.subscribeTable.remove(queueName);
-        this.batchSubscribeTable.remove(queueName);
-        try {
-            this.rocketmqPushConsumer.unsubscribe(queueName);
-        } catch (Exception e) {
-            throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
+            this.batchSubscribeTable.put(queueName, listener);
+            this.subscribeTable.remove(queueName);
+            this.rocketmqPushConsumer.setConsumeMessageBatchMaxSize(clientConfig.getRmqPullMessageBatchNums());
+            try {
+                this.rocketmqPushConsumer.subscribe(queueName, "*");
+            } catch (MQClientException e) {
+                throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
+            }
         }
     }
 
-    @Override
-    public void unbindQueues(List<String> queueNames) {
+    @Override public void unbindQueue(Collection<String> queueNames) {
         for (String queueName : queueNames) {
-            unbindQueue(queueName);
-        }
-    }
-
-    @Override
-    public boolean isBindQueue() {
-        Map<String, String> subscription = rocketmqPushConsumer.getSubscription();
-        if (null != subscription && subscription.size() > 0) {
-            return true;
+            this.subscribeTable.remove(queueName);
+            this.batchSubscribeTable.remove(queueName);
+            try {
+                this.rocketmqPushConsumer.unsubscribe(queueName);
+            } catch (Exception e) {
+                throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
+            }
         }
-        return false;
     }
 
     @Override
-    public List<String> getBindQueues() {
+    public Set<String> getBindQueues() {
         Map<String, String> subscription = rocketmqPushConsumer.getSubscription();
         if (null != subscription && subscription.size() > 0) {
-            return new ArrayList<>(subscription.keySet());
+            return subscription.keySet();
         }
         return null;
     }
@@ -219,39 +192,13 @@ public class PushConsumerImpl implements Consumer {
     }
 
     @Override
-    public Message receive(long timeout) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Message receive(String queueName, int partitionId, long receiptId, long timeout) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<Message> batchReceive(long timeout) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public void ack(MessageReceipt receipt) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public Optional<Extension> getExtension() {
-        return Optional.of(new Extension() {
-
-            @Override
-            public QueueMetaData getQueueMetaData(String queueName) {
-                return getQueueMetaData(queueName);
-            }
-        });
+        return Optional.of(extension);
     }
 
     @Override
@@ -284,7 +231,7 @@ public class PushConsumerImpl implements Consumer {
     }
 
     @Override
-    public QueueMetaData getQueueMetaData(String queueName) {
+    public Set<QueueMetaData> getQueueMetaData(String queueName) {
         Set<MessageQueue> messageQueues;
         try {
             messageQueues = rocketmqPushConsumer.fetchSubscribeMessageQueues(queueName);
@@ -292,24 +239,14 @@ public class PushConsumerImpl implements Consumer {
             log.error("A error occurred when get queue metadata.", e);
             return null;
         }
-        List<QueueMetaData.Partition> partitions = new ArrayList<>(16);
-        if (null != messageQueues && !messageQueues.isEmpty()) {
-            for (MessageQueue messageQueue : messageQueues) {
-                QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName());
-                partitions.add(partition);
-            }
-        } else {
-            return null;
-        }
-        QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions);
-        return queueMetaData;
+        return OMSClientUtil.queueMetaDataConvert(messageQueues);
     }
 
     class MessageListenerImpl implements MessageListenerConcurrently {
 
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList,
-                                                        ConsumeConcurrentlyContext contextRMQ) {
+            ConsumeConcurrentlyContext contextRMQ) {
             boolean batchFlag = true;
             MessageExt rmqMsg = rmqMsgList.get(0);
             BatchMessageListener batchMessageListener = PushConsumerImpl.this.batchSubscribeTable.get(rmqMsg.getTopic());
@@ -319,14 +256,14 @@ public class PushConsumerImpl implements Consumer {
             }
             if (listener == null && batchMessageListener == null) {
                 throw new OMSRuntimeException(-1,
-                        String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
+                    String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
             }
             final KeyValue contextProperties = OMS.newKeyValue();
 
             if (batchFlag) {
-                List<Message> messages = new ArrayList<>(16);
+                List<Message> messages = new ArrayList<>(32);
                 for (MessageExt messageExt : rmqMsgList) {
-                    BytesMessageImpl omsMsg = OMSUtil.msgConvert(messageExt);
+                    BytesMessageImpl omsMsg = OMSClientUtil.msgConvert(messageExt);
                     messages.add(omsMsg);
                 }
                 final CountDownLatch sync = new CountDownLatch(1);
@@ -344,7 +281,7 @@ public class PushConsumerImpl implements Consumer {
                     public void ack() {
                         sync.countDown();
                         contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                                ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                            ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
                     }
                 };
                 long begin = System.currentTimeMillis();
@@ -356,7 +293,7 @@ public class PushConsumerImpl implements Consumer {
                 } catch (InterruptedException ignore) {
                 }
             } else {
-                BytesMessageImpl omsMsg = OMSUtil.msgConvert(rmqMsg);
+                BytesMessageImpl omsMsg = OMSClientUtil.msgConvert(rmqMsg);
 
                 final CountDownLatch sync = new CountDownLatch(1);
 
@@ -368,7 +305,7 @@ public class PushConsumerImpl implements Consumer {
                     public void ack() {
                         sync.countDown();
                         contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                                ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                            ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
                     }
                 };
                 long begin = System.currentTimeMillis();
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
index f1405b2..b5da5ce 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -17,22 +17,26 @@
 package io.openmessaging.rocketmq.domain;
 
 import io.openmessaging.KeyValue;
+import io.openmessaging.OMS;
 import io.openmessaging.consumer.MessageReceipt;
 import io.openmessaging.extension.ExtensionHeader;
 import io.openmessaging.message.Header;
 import io.openmessaging.message.Message;
-import io.openmessaging.OMS;
-import java.util.Optional;
+import java.util.Arrays;
 
 public class BytesMessageImpl implements Message {
 
     private Header sysHeaders;
+    private ExtensionHeader extensionHeader;
+    private MessageReceipt messageReceipt;
     private KeyValue userProperties;
     private byte[] data;
 
     public BytesMessageImpl() {
         this.sysHeaders = new MessageHeader();
         this.userProperties = OMS.newKeyValue();
+        this.extensionHeader = new MessageExtensionHeader();
+        this.messageReceipt = new DefaultMessageReceipt();
     }
 
     @Override
@@ -41,8 +45,8 @@ public class BytesMessageImpl implements Message {
     }
 
     @Override
-    public Optional<ExtensionHeader> extensionHeader() {
-        return null;
+    public ExtensionHeader extensionHeader() {
+        return extensionHeader;
     }
 
     @Override
@@ -62,6 +66,16 @@ public class BytesMessageImpl implements Message {
 
     @Override
     public MessageReceipt getMessageReceipt() {
-        return null;
+        return messageReceipt;
+    }
+
+    @Override public String toString() {
+        return "BytesMessageImpl{" +
+            "sysHeaders=" + sysHeaders +
+            ", extensionHeader=" + extensionHeader +
+            ", messageReceipt=" + messageReceipt +
+            ", userProperties=" + userProperties +
+            ", data=" + Arrays.toString(data) +
+            '}';
     }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java
new file mode 100644
index 0000000..b3baeb2
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.message.Message;
+import io.openmessaging.message.MessageFactory;
+
+public class DefaultMessageFactory implements MessageFactory {
+    @Override public Message createMessage(String queueName, byte[] body) {
+        Message message = new BytesMessageImpl();
+        message.setData(body);
+        message.header().setDestination(queueName);
+        return message;
+    }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java
new file mode 100644
index 0000000..9339006
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.consumer.MessageReceipt;
+import java.util.Objects;
+
+public class DefaultMessageReceipt implements MessageReceipt {
+
+    private long offset;
+
+    private String messageId;
+
+    public DefaultMessageReceipt() {
+
+    }
+
+    public DefaultMessageReceipt(String messageId, long offset) {
+        this.messageId = messageId;
+        this.offset = offset;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public void setMessageId(String messageId) {
+        this.messageId = messageId;
+    }
+
+    public String getMessageId() {
+        return messageId;
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        DefaultMessageReceipt receipt = (DefaultMessageReceipt) o;
+        return offset == receipt.offset &&
+            Objects.equals(messageId, receipt.messageId);
+    }
+
+    @Override public int hashCode() {
+        return Objects.hash(offset, messageId);
+    }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java
new file mode 100644
index 0000000..2958f96
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.extension.QueueMetaData;
+import java.util.Objects;
+
+public class DefaultQueueMetaData implements QueueMetaData {
+
+    private String queueName;
+
+    private int partitionId;
+
+    public DefaultQueueMetaData(String queueName, int partitionId) {
+        this.queueName = queueName;
+        this.partitionId = partitionId;
+    }
+
+    @Override public void setQueueName(String queueNaome) {
+        this.queueName = queueName;
+    }
+
+    @Override public void setPartitionId(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    @Override public int partitionId() {
+        return partitionId;
+    }
+
+    @Override
+    public String queueName() {
+        return queueName;
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DefaultQueueMetaData data = (DefaultQueueMetaData) o;
+        return partitionId == data.partitionId &&
+            queueName.equals(data.queueName);
+    }
+
+    @Override public int hashCode() {
+        return Objects.hash(queueName, partitionId);
+    }
+
+    @Override public String toString() {
+        return "DefaultQueueMetaData{" +
+            "queueName='" + queueName + '\'' +
+            ", partitionId=" + partitionId +
+            '}';
+    }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java
new file mode 100644
index 0000000..7449827
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.extension.Extension;
+import io.openmessaging.extension.QueueMetaData;
+import java.util.Set;
+
+public class MessageExtension implements Extension {
+
+    private Extension extension;
+
+    public MessageExtension(Extension extension) {
+        this.extension = extension;
+    }
+
+    @Override public Set<QueueMetaData> getQueueMetaData(String queueName) {
+        return extension.getQueueMetaData(queueName);
+    }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java
new file mode 100644
index 0000000..3f103a4
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.extension.ExtensionHeader;
+
+public class MessageExtensionHeader implements ExtensionHeader {
+    private int partition;
+
+    private long offset;
+
+    private String correlationId;
+
+    private String transactionId;
+
+    private long storeTimestamp;
+
+    private String storeHost;
+
+    private String messageKey;
+
+    private String traceId;
+
+    private long delayTime;
+
+    private long expireTime;
+
+    @Override public ExtensionHeader setPartition(int partition) {
+        this.partition = partition;
+        return this;
+    }
+
+    @Override public ExtensionHeader setOffset(long offset) {
+        this.offset = offset;
+        return this;
+    }
+
+    @Override public ExtensionHeader setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
+        return this;
+    }
+
+    @Override public ExtensionHeader setTransactionId(String transactionId) {
+        this.transactionId = transactionId;
+        return this;
+    }
+
+    @Override public ExtensionHeader setStoreTimestamp(long storeTimestamp) {
+        this.storeTimestamp = storeTimestamp;
+        return this;
+    }
+
+    @Override public ExtensionHeader setStoreHost(String storeHost) {
+        this.storeHost = storeHost;
+        return this;
+    }
+
+    @Override public ExtensionHeader setMessageKey(String messageKey) {
+        this.messageKey = messageKey;
+        return this;
+    }
+
+    @Override public ExtensionHeader setTraceId(String traceId) {
+        this.traceId = traceId;
+        return this;
+    }
+
+    @Override public ExtensionHeader setDelayTime(long delayTime) {
+        this.delayTime = delayTime;
+        return this;
+    }
+
+    @Override public ExtensionHeader setExpireTime(long expireTime) {
+        this.expireTime = expireTime;
+        return this;
+    }
+
+    @Override public int getPartiton() {
+        return partition;
+    }
+
+    @Override public long getOffset() {
+        return offset;
+    }
+
+    @Override public String getCorrelationId() {
+        return correlationId;
+    }
+
+    @Override public String getTransactionId() {
+        return transactionId;
+    }
+
+    @Override public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+    @Override public String getStoreHost() {
+        return storeHost;
+    }
+
+    @Override public long getDelayTime() {
+        return delayTime;
+    }
+
+    @Override public long getExpireTime() {
+        return expireTime;
+    }
+
+    @Override public String getMessageKey() {
+        return messageKey;
+    }
+
+    @Override public String getTraceId() {
+        return traceId;
+    }
+
+    @Override public String toString() {
+        return "MessageExtensionHeader{" +
+            "partition=" + partition +
+            ", offset=" + offset +
+            ", correlationId='" + correlationId + '\'' +
+            ", transactionId='" + transactionId + '\'' +
+            ", storeTimestamp=" + storeTimestamp +
+            ", storeHost='" + storeHost + '\'' +
+            ", messageKey='" + messageKey + '\'' +
+            ", traceId='" + traceId + '\'' +
+            ", delayTime=" + delayTime +
+            ", expireTime=" + expireTime +
+            '}';
+    }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
index a6e7585..495dc1a 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
@@ -110,4 +110,17 @@ public class MessageHeader implements Header {
     @Override public short getCompression() {
         return this.compression;
     }
+
+    @Override public String toString() {
+        return "MessageHeader{" +
+            "destination='" + destination + '\'' +
+            ", messageId='" + messageId + '\'' +
+            ", bornTimestamp=" + bornTimestamp +
+            ", bornHost='" + bornHost + '\'' +
+            ", priority=" + priority +
+            ", deliveryCount=" + deliveryCount +
+            ", compression=" + compression +
+            ", durability=" + durability +
+            '}';
+    }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
index 16ecb0d..2d0d3e6 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -30,7 +30,5 @@ public interface NonStandardKeys {
     String PRODUCER_ID = "PRODUCER_ID";
     String CONSUMER_ID = "CONSUMER_ID";
     String TIMEOUT = "TIMEOUT";
-    String PULL_CONSUMER = "PULL";
-    String PUSH_CONSUMER = "PUSH";
-
+    int PULL_MIN_NUMS = 1;
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index 63034e3..735bace 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
-import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
+import static io.openmessaging.rocketmq.utils.OMSClientUtil.buildInstanceName;
 
 abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
     final static InternalLogger log = ClientLogger.getLog();
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
index d3acce2..a44a0f7 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -18,31 +18,38 @@ package io.openmessaging.rocketmq.producer;
 
 import io.openmessaging.Future;
 import io.openmessaging.KeyValue;
+import io.openmessaging.Promise;
 import io.openmessaging.ServiceLifeState;
 import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.exception.OMSRuntimeException;
 import io.openmessaging.extension.Extension;
 import io.openmessaging.extension.QueueMetaData;
-import io.openmessaging.message.Message;
-import io.openmessaging.Promise;
-import io.openmessaging.exception.OMSRuntimeException;
 import io.openmessaging.interceptor.ProducerInterceptor;
+import io.openmessaging.message.Message;
 import io.openmessaging.producer.Producer;
 import io.openmessaging.producer.SendResult;
 import io.openmessaging.producer.TransactionalResult;
 import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.MessageExtension;
 import io.openmessaging.rocketmq.promise.DefaultPromise;
-import io.openmessaging.rocketmq.utils.OMSUtil;
+import io.openmessaging.rocketmq.utils.OMSClientUtil;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.MessageQueue;
 
-import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert;
+import static io.openmessaging.rocketmq.utils.OMSClientUtil.msgConvert;
 
 public class ProducerImpl extends AbstractOMSProducer implements Producer {
 
+    private final Extension extension;
+
     public ProducerImpl(final KeyValue properties) {
         super(properties);
+        extension = new MessageExtension(this);
     }
 
     @Override
@@ -60,7 +67,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
                 throw new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.");
             }
             message.header().setMessageId(rmqResult.getMsgId());
-            return OMSUtil.sendResultConvert(rmqResult);
+            return OMSClientUtil.sendResultConvert(rmqResult);
         } catch (Exception e) {
             log.error(String.format("Send message to RocketMQ failed, %s", message), e);
             throw checkProducerException(rmqMessage.getTopic(), message.header().getMessageId(), e);
@@ -81,7 +88,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
                 @Override
                 public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
                     message.header().setMessageId(rmqResult.getMsgId());
-                    promise.set(OMSUtil.sendResultConvert(rmqResult));
+                    promise.set(OMSClientUtil.sendResultConvert(rmqResult));
                 }
 
                 @Override
@@ -112,7 +119,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
         }
 
         for (Message message : messages) {
-            sendOneway(messages);
+            sendOneway(message);
         }
     }
 
@@ -128,7 +135,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
         }
 
         for (Message message : messages) {
-            sendOneway(messages);
+            sendOneway(message);
         }
     }
 
@@ -152,12 +159,19 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
 
     @Override
     public Optional<Extension> getExtension() {
-        return null;
+        return Optional.of(extension);
     }
 
     @Override
-    public QueueMetaData getQueueMetaData(String queueName) {
-        return null;
+    public Set<QueueMetaData> getQueueMetaData(String queueName) {
+        List<MessageQueue> messageQueues;
+        try {
+            messageQueues = this.rocketmqProducer.fetchPublishMessageQueues(queueName);
+        } catch (MQClientException e) {
+            log.error("A error occurred when get queue metadata.", e);
+            return null;
+        }
+        return OMSClientUtil.queueMetaDataConvert(messageQueues);
     }
 
     @Override
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSClientUtil.java
similarity index 77%
rename from openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
rename to openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSClientUtil.java
index 5b095ee..ab3ff34 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSClientUtil.java
@@ -18,19 +18,24 @@ package io.openmessaging.rocketmq.utils;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.OMS;
+import io.openmessaging.extension.QueueMetaData;
 import io.openmessaging.message.Header;
 import io.openmessaging.producer.SendResult;
+import io.openmessaging.rocketmq.domain.DefaultQueueMetaData;
 import io.openmessaging.rocketmq.domain.BytesMessageImpl;
 import io.openmessaging.rocketmq.domain.RocketMQConstants;
 import io.openmessaging.rocketmq.domain.SendResultImpl;
 import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageQueue;
 
-public class OMSUtil {
+public class OMSClientUtil {
 
     /**
      * Builds a OMS client instance name.
@@ -56,7 +61,6 @@ public class OMSUtil {
             rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
         }
 
-
         for (String key : userHeaders.keySet()) {
             MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
         }
@@ -82,6 +86,13 @@ public class OMSUtil {
         omsMsg.header().setBornHost(String.valueOf(rmqMsg.getBornHost()));
         omsMsg.header().setBornTimestamp(rmqMsg.getBornTimestamp());
         omsMsg.header().setDeliveryCount(rmqMsg.getDelayTimeLevel());
+        omsMsg.extensionHeader().setPartition(rmqMsg.getQueueId());
+        omsMsg.extensionHeader().setOffset(rmqMsg.getQueueOffset());
+        omsMsg.extensionHeader().setDelayTime(rmqMsg.getDelayTimeLevel());
+        omsMsg.extensionHeader().setMessageKey(rmqMsg.getKeys());
+        omsMsg.extensionHeader().setStoreHost(rmqMsg.getStoreHost().toString());
+        omsMsg.extensionHeader().setStoreTimestamp(rmqMsg.getStoreTimestamp());
+        omsMsg.extensionHeader().setTransactionId(rmqMsg.getTransactionId());
 
         return omsMsg;
     }
@@ -116,4 +127,17 @@ public class OMSUtil {
         }
         return keyValue;
     }
+
+    public static Set<QueueMetaData> queueMetaDataConvert(Collection<MessageQueue> messageQueues) {
+        Set<QueueMetaData> queueMetaDatas = new HashSet<>(32);
+        if (null != messageQueues && !messageQueues.isEmpty()) {
+            for (MessageQueue messageQueue : messageQueues) {
+                QueueMetaData queueMetaData = new DefaultQueueMetaData(messageQueue.getTopic(), messageQueue.getQueueId());
+                queueMetaDatas.add(queueMetaData);
+            }
+        } else {
+            return null;
+        }
+        return queueMetaDatas;
+    }
 }
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
index 851c283..2a27076 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
@@ -16,10 +16,17 @@
  */
 package io.openmessaging.rocketmq.consumer;
 
+import io.openmessaging.KeyValue;
+import io.openmessaging.extension.QueueMetaData;
+import io.openmessaging.internal.DefaultKeyValue;
 import io.openmessaging.rocketmq.config.ClientConfig;
 import io.openmessaging.rocketmq.domain.ConsumeRequest;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.junit.Before;
@@ -40,6 +47,8 @@ public class LocalMessageCacheTest {
     private DefaultMQPullConsumer rocketmqPullConsume;
     @Mock
     private ConsumeRequest consumeRequest;
+    @Mock
+    private ConsumeRequest consumeRequest1;
 
     @Before
     public void init() {
@@ -86,4 +95,85 @@ public class LocalMessageCacheTest {
         localMessageCache.submitConsumeRequest(consumeRequest);
         assertThat(localMessageCache.poll()).isEqualTo(consumedMsg);
     }
+
+    @Test
+    public void testBatchPollMessage() throws Exception {
+        byte[] body = new byte[] {'1', '2', '3'};
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(body);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic("HELLO_QUEUE");
+
+        byte[] body1 = new byte[] {'4', '5', '6'};
+        MessageExt consumedMsg1 = new MessageExt();
+        consumedMsg1.setMsgId("NewMsgId1");
+        consumedMsg1.setBody(body1);
+        consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg1.setTopic("HELLO_QUEUE1");
+
+        when(consumeRequest.getMessageExt()).thenReturn(consumedMsg);
+        when(consumeRequest1.getMessageExt()).thenReturn(consumedMsg1);
+        localMessageCache.submitConsumeRequest(consumeRequest);
+        localMessageCache.submitConsumeRequest(consumeRequest1);
+        KeyValue properties = new DefaultKeyValue();
+        properties.put(NonStandardKeys.TIMEOUT, 3000);
+        List<MessageExt> messageExts = localMessageCache.batchPoll(properties);
+        assertThat(messageExts.size()).isEqualTo(2);
+        MessageExt messageExt1 = null;
+        MessageExt messageExt2 = null;
+        for (MessageExt messageExt : messageExts) {
+            if (messageExt.getMsgId().equals("NewMsgId")) {
+                messageExt1 = messageExt;
+            }
+            if (messageExt.getMsgId().equals("NewMsgId1")) {
+                messageExt2 = messageExt;
+            }
+        }
+        assertThat(messageExt1).isNotNull();
+        assertThat(messageExt1.getBody()).isEqualTo(body);
+        assertThat(messageExt1.getTopic()).isEqualTo("HELLO_QUEUE");
+        assertThat(messageExt2).isNotNull();
+        assertThat(messageExt2.getBody()).isEqualTo(body1);
+        assertThat(messageExt2.getTopic()).isEqualTo("HELLO_QUEUE1");
+
+    }
+
+    @Test
+    public void getQueueMetaData() throws MQClientException {
+        MessageQueue messageQueue1 = new MessageQueue("topic1", "brockerName1", 0);
+        MessageQueue messageQueue2 = new MessageQueue("topic1", "brockerName2", 1);
+        MessageQueue messageQueue3 = new MessageQueue("topic1", "brockerName3", 2);
+        Set<MessageQueue> messageQueues = new HashSet<MessageQueue>() {
+            {
+                add(messageQueue1);
+                add(messageQueue2);
+                add(messageQueue3);
+            }
+        };
+
+        when(rocketmqPullConsume.fetchSubscribeMessageQueues("topic1")).thenReturn(messageQueues);
+        Set<QueueMetaData> queueMetaDatas = localMessageCache.getQueueMetaData("topic1");
+        assertThat(queueMetaDatas.size()).isEqualTo(3);
+        QueueMetaData queueMetaData1 = null;
+        QueueMetaData queueMetaData2 = null;
+        QueueMetaData queueMetaData3 = null;
+        for (QueueMetaData queueMetaData : queueMetaDatas) {
+            if (queueMetaData.partitionId() == 0) {
+                queueMetaData1 = queueMetaData;
+            }
+            if (queueMetaData.partitionId() == 1) {
+                queueMetaData2 = queueMetaData;
+            }
+            if (queueMetaData.partitionId() == 2) {
+                queueMetaData3 = queueMetaData;
+            }
+        }
+        assertThat(queueMetaData1).isNotNull();
+        assertThat(queueMetaData1.queueName()).isEqualTo("topic1");
+        assertThat(queueMetaData2).isNotNull();
+        assertThat(queueMetaData2.queueName()).isEqualTo("topic1");
+        assertThat(queueMetaData3).isNotNull();
+        assertThat(queueMetaData3.queueName()).isEqualTo("topic1");
+    }
 }
\ No newline at end of file
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
index 7cb5030..1f61b34 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -19,14 +19,28 @@ package io.openmessaging.rocketmq.consumer;
 import io.openmessaging.KeyValue;
 import io.openmessaging.MessagingAccessPoint;
 import io.openmessaging.OMS;
-import io.openmessaging.consumer.Consumer;
-import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.consumer.MessageReceipt;
+import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.extension.QueueMetaData;
 import io.openmessaging.message.Message;
 import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.DefaultMessageReceipt;
+import io.openmessaging.rocketmq.domain.DefaultQueueMetaData;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -35,12 +49,15 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
 @RunWith(MockitoJUnitRunner.class)
 public class PullConsumerImplTest {
-    private Consumer consumer;
+    private PullConsumer pullConsumer;
     private String queueName = "HELLO_QUEUE";
 
     @Mock
@@ -50,15 +67,17 @@ public class PullConsumerImplTest {
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
         final MessagingAccessPoint messagingAccessPoint = OMS
-                .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
-        final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
-        resourceManager.createNamespace(NonStandardKeys.PULL_CONSUMER +"_TestGroup");
-        consumer = messagingAccessPoint.createConsumer();
-        consumer.bindQueue(queueName);
+            .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
+        final KeyValue attributes = messagingAccessPoint.attributes();
+        attributes.put(NonStandardKeys.CONSUMER_ID, "TestGroup");
+        pullConsumer = messagingAccessPoint.createPullConsumer();
+        Set<String> queueNames = new HashSet<>(8);
+        queueNames.add(queueName);
+        pullConsumer.bindQueue(queueNames);
 
         Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
         field.setAccessible(true);
-        field.set(consumer, rocketmqPullConsumer); //Replace
+        field.set(pullConsumer, rocketmqPullConsumer); //Replace
 
         ClientConfig clientConfig = new ClientConfig();
         clientConfig.setOperationTimeout(200);
@@ -66,27 +85,133 @@ public class PullConsumerImplTest {
 
         field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
         field.setAccessible(true);
-        field.set(consumer, localMessageCache);
-        consumer.start();
+        field.set(pullConsumer, localMessageCache);
+        pullConsumer.start();
     }
 
     @Test
-    public void testPoll() {
-        final byte[] testBody = new byte[]{'a', 'b'};
+    public void testPoll() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        final byte[] testBody = new byte[] {'a', 'b'};
         MessageExt consumedMsg = new MessageExt();
         consumedMsg.setMsgId("NewMsgId");
         consumedMsg.setBody(testBody);
         consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
         consumedMsg.setTopic(queueName);
+        consumedMsg.setQueueId(0);
+        consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
         doReturn(consumedMsg).when(localMessageCache).poll(any(KeyValue.class));
-        Message message = consumer.receive(3 * 1000);
+        Message message = pullConsumer.receive(3 * 1000);
         assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
         assertThat(message.getData()).isEqualTo(testBody);
+
+        List<MessageExt> messageExts = new ArrayList<MessageExt>() {
+            {
+                add(consumedMsg);
+            }
+        };
+        PullResult pullResult = new PullResult(PullStatus.FOUND, 11, 1, 100, messageExts);
+        doReturn(pullResult).when(rocketmqPullConsumer).pull(any(MessageQueue.class), anyString(), anyLong(), anyInt(), anyLong());
+        MessageQueue messageQueue = new MessageQueue(queueName, "breakeName", 0);
+        Set<MessageQueue> messageQueues = new HashSet<MessageQueue>() {
+            {
+                add(messageQueue);
+            }
+        };
+        doReturn(messageQueues).when(rocketmqPullConsumer).fetchSubscribeMessageQueues(queueName);
+        QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, 0);
+        MessageReceipt messageReceipt = new DefaultMessageReceipt("NewMsgId", 10L);
+        long timeout = 3000L;
+        Message message1 = pullConsumer.receive(queueName, queueMetaData, messageReceipt, timeout);
+        assertThat(message1.header().getMessageId()).isEqualTo("NewMsgId");
+        assertThat(message1.getData()).isEqualTo(testBody);
+        assertThat(message1.header().getDestination()).isEqualTo(queueName);
+        assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0);
+    }
+
+    @Test
+    public void testBatchPoll() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        final byte[] testBody = new byte[] {'a', 'b'};
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(testBody);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic(queueName);
+        consumedMsg.setQueueId(0);
+        consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+        final byte[] testBody1 = new byte[] {'c', 'd'};
+        MessageExt consumedMsg1 = new MessageExt();
+        consumedMsg1.setMsgId("NewMsgId1");
+        consumedMsg1.setBody(testBody1);
+        consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg1.setTopic(queueName);
+        consumedMsg1.setQueueId(0);
+        consumedMsg1.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+        List<MessageExt> messageExts = new ArrayList<MessageExt>() {
+            {
+                add(consumedMsg);
+                add(consumedMsg1);
+            }
+        };
+        doReturn(messageExts).when(localMessageCache).batchPoll(any(KeyValue.class));
+        List<Message> messages = pullConsumer.batchReceive(3 * 1000);
+
+        Message message1 = null;
+        Message message2 = null;
+        assertThat(messages.size()).isEqualTo(2);
+        for (Message message : messages) {
+            if (message.header().getMessageId().equals("NewMsgId")) {
+                message1 = message;
+            }
+            if (message.header().getMessageId().equals("NewMsgId1")) {
+                message2 = message;
+            }
+        }
+        assertThat(message1).isNotNull();
+        assertThat(message1.getData()).isEqualTo(testBody);
+        assertThat(message1.header().getDestination()).isEqualTo(queueName);
+        assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0);
+        assertThat(message2).isNotNull();
+        assertThat(message2.getData()).isEqualTo(testBody1);
+        assertThat(message2.header().getDestination()).isEqualTo(queueName);
+        assertThat(message2.extensionHeader().getPartiton()).isEqualTo(0);
+
+        PullResult pullResult = new PullResult(PullStatus.FOUND, 11, 1, 100, messageExts);
+        doReturn(pullResult).when(rocketmqPullConsumer).pull(any(MessageQueue.class), anyString(), anyLong(), anyInt(), anyLong());
+        MessageQueue messageQueue = new MessageQueue(queueName, "breakeName", 0);
+        Set<MessageQueue> messageQueues = new HashSet<MessageQueue>() {
+            {
+                add(messageQueue);
+            }
+        };
+        doReturn(messageQueues).when(rocketmqPullConsumer).fetchSubscribeMessageQueues(queueName);
+        QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, 0);
+        MessageReceipt messageReceipt = new DefaultMessageReceipt("NewMsgId", 10L);
+        long timeout = 3000L;
+        List<Message> message1s = pullConsumer.batchReceive(queueName, queueMetaData, messageReceipt, timeout);
+        assertThat(message1s.size()).isEqualTo(2);
+        Message message3 = null;
+        Message message4 = null;
+        for (Message message : message1s) {
+            if (message.header().getMessageId().equals("NewMsgId")) {
+                message3 = message;
+            }
+            if (message.header().getMessageId().equals("NewMsgId1")) {
+                message4 = message;
+            }
+        }
+        assertThat(message3).isNotNull();
+        assertThat(message3.getData()).isEqualTo(testBody);
+        assertThat(message3.header().getDestination()).isEqualTo(queueName);
+        assertThat(message3.extensionHeader().getPartiton()).isEqualTo(0);
+        assertThat(message4).isNotNull();
+        assertThat(message4.getData()).isEqualTo(testBody1);
+        assertThat(message4.header().getDestination()).isEqualTo(queueName);
+        assertThat(message4.extensionHeader().getPartiton()).isEqualTo(0);
     }
 
     @Test
     public void testPoll_WithTimeout() {
-        Message message = consumer.receive(3 * 1000);
+        Message message = pullConsumer.receive(3 * 1000);
         assertThat(message).isNull();
     }
 }
\ No newline at end of file
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
index 51167e8..39af3c1 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -16,12 +16,21 @@
  */
 package io.openmessaging.rocketmq.consumer;
 
-import io.openmessaging.*;
-import io.openmessaging.consumer.Consumer;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.OMS;
+import io.openmessaging.consumer.BatchMessageListener;
 import io.openmessaging.consumer.MessageListener;
-import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.consumer.PushConsumer;
 import io.openmessaging.message.Message;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -31,15 +40,12 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.lang.reflect.Field;
-import java.util.Collections;
-
-import static org.mockito.Mockito.when;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class PushConsumerImplTest {
-    private Consumer consumer;
+    private PushConsumer pushConsumer;
 
     @Mock
     private DefaultMQPushConsumer rocketmqPushConsumer;
@@ -48,17 +54,17 @@ public class PushConsumerImplTest {
     public void init() throws NoSuchFieldException, IllegalAccessException {
         final MessagingAccessPoint messagingAccessPoint = OMS
                 .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
-        final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
-        resourceManager.createNamespace(NonStandardKeys.PUSH_CONSUMER + "_TestGroup");
-        consumer = messagingAccessPoint.createConsumer();
+        final KeyValue attributes = messagingAccessPoint.attributes();
+        attributes.put(NonStandardKeys.CONSUMER_ID, "TestGroup");
+        pushConsumer = messagingAccessPoint.createPushConsumer();
 
         Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
         field.setAccessible(true);
-        DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer);
-        field.set(consumer, rocketmqPushConsumer); //Replace
+        DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(pushConsumer);
+        field.set(pushConsumer, rocketmqPushConsumer); //Replace
 
         when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
-        consumer.start();
+        pushConsumer.start();
     }
 
     @Test
@@ -70,7 +76,11 @@ public class PushConsumerImplTest {
         consumedMsg.setBody(testBody);
         consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
         consumedMsg.setTopic("HELLO_QUEUE");
-        consumer.bindQueue("HELLO_QUEUE", new MessageListener() {
+        consumedMsg.setQueueId(0);
+        consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+        Set<String> queueNames = new HashSet<>(8);
+        queueNames.add("HELLO_QUEUE");
+        pushConsumer.bindQueue(queueNames, new MessageListener() {
             @Override
             public void onReceived(Message message, Context context) {
                 assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
@@ -81,4 +91,65 @@ public class PushConsumerImplTest {
         ((MessageListenerConcurrently) rocketmqPushConsumer
                 .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
     }
+
+    @Test
+    public void testBatchConsumeMessage() {
+        final byte[] testBody = new byte[]{'a', 'b'};
+
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(testBody);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic("HELLO_QUEUE");
+        consumedMsg.setQueueId(0);
+        consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+
+        final byte[] testBody1 = new byte[]{'c', 'd'};
+        MessageExt consumedMsg1 = new MessageExt();
+        consumedMsg1.setMsgId("NewMsgId1");
+        consumedMsg1.setBody(testBody1);
+        consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg1.setTopic("HELLO_QUEUE");
+        consumedMsg1.setQueueId(0);
+        consumedMsg1.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+        List<MessageExt> messageExts = new ArrayList<MessageExt>() {
+            {
+                add(consumedMsg);
+                add(consumedMsg1);
+            }
+        };
+
+        Set<String> queueNames = new HashSet<>(8);
+        queueNames.add("HELLO_QUEUE");
+        pushConsumer.bindQueue(queueNames, new BatchMessageListener() {
+            @Override public void onReceived(List<Message> batchMessage, Context context) {
+                assertThat(batchMessage.size()).isEqualTo(2);
+                Message message1 = null;
+                Message message2 = null;
+                for (Message message : batchMessage) {
+                    if (message.header().getMessageId().equals("NewMsgId")) {
+                        message1 = message;
+                    }
+                    if (message.header().getMessageId().equals("NewMsgId1")) {
+                        message2 = message;
+                    }
+                }
+                assertThat(message1).isNotNull();
+                assertThat(message1.getData()).isEqualTo(testBody);
+                assertThat(message1.header().getDestination()).isEqualTo("HELLO_QUEUE");
+                assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0);
+                assertThat(message2).isNotNull();
+                assertThat(message2.getData()).isEqualTo(testBody1);
+                assertThat(message2.header().getDestination()).isEqualTo("HELLO_QUEUE");
+                assertThat(message2.extensionHeader().getPartiton()).isEqualTo(0);
+
+                context.ack();
+            }
+        });
+
+        ((MessageListenerConcurrently) rocketmqPushConsumer
+            .getMessageListener()).consumeMessage(messageExts, null);
+
+    }
+
 }
\ No newline at end of file


Mime
View raw message