rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [4/6] incubator-rocketmq git commit: Add PullConsumer related implementation for OpenMessaging.
Date Wed, 19 Apr 2017 09:50:05 GMT
Add PullConsumer related implementation for OpenMessaging.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/625ba077
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/625ba077
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/625ba077

Branch: refs/heads/openmessaging-impl
Commit: 625ba07728524d778a8faa73867a6bb39cb9976c
Parents: a5ea4e4
Author: yukon <yukon@apache.org>
Authored: Tue Apr 18 22:42:00 2017 +0800
Committer: yukon <yukon@apache.org>
Committed: Tue Apr 18 22:42:00 2017 +0800

----------------------------------------------------------------------
 .../openmessaging/SimplePullConsumer.java       |  56 ++++++++
 .../openmessaging/SimplePushConsumer.java       |   3 +-
 .../rocketmq/MessagingAccessPointImpl.java      |   4 +-
 .../java/io/openmessaging/rocketmq/OMSUtil.java |  50 ++++++-
 .../rocketmq/consumer/LocalMessageCache.java    | 134 +++++++++++++++++++
 .../rocketmq/consumer/PullConsumerImpl.java     | 115 ++++++++++++++--
 .../rocketmq/consumer/PushConsumerImpl.java     |   8 +-
 .../rocketmq/domain/ConsumeRequest.java         |  55 ++++++++
 .../rocketmq/domain/NonStandardKeys.java        |   2 +
 9 files changed, 413 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
new file mode 100644
index 0000000..8dd7b23
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.openmessaging;
+
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class SimplePullConsumer {
+    public static void main(String[] args) {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
+
+        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+        messagingAccessPoint.startup();
+        System.out.println("messagingAccessPoint startup OK");
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                consumer.shutdown();
+                messagingAccessPoint.shutdown();
+            }
+        }));
+
+        consumer.startup();
+        System.out.println("consumer startup OK");
+
+        while (true) {
+            Message message = consumer.poll();
+            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
+            System.out.println("Received one message: " + msgId);
+            consumer.ack(msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
index 6fc8e39..813e301 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.example.openmessaging;
 
 import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
 import io.openmessaging.MessageListener;
 import io.openmessaging.MessagingAccessPoint;
 import io.openmessaging.MessagingAccessPointFactory;
@@ -47,7 +48,7 @@ public class SimplePushConsumer {
         consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
             @Override
             public void onMessage(final Message message, final ReceivedMessageContext context)
{
-                System.out.println("Received one message: " + message);
+                System.out.println("Received one message: " + message.headers().getString(MessageHeader.MESSAGE_ID));
                 context.ack();
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index fecd69f..af1695b 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -76,12 +76,12 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint
{
 
     @Override
     public PullConsumer createPullConsumer(String queueName) {
-        return new PullConsumerImpl(accessPointProperties);
+        return new PullConsumerImpl(queueName, accessPointProperties);
     }
 
     @Override
     public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
-        return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+        return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties,
properties));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
index dd591a6..87037ee 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
@@ -25,7 +25,9 @@ import io.openmessaging.rocketmq.domain.BytesMessageImpl;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import io.openmessaging.rocketmq.domain.SendResultImpl;
 import java.lang.reflect.Field;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.UtilAll;
@@ -88,7 +90,8 @@ public class OMSUtil {
         }
 
         omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
-        if (rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC"))
{
+        if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
+            rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC"))
{
             omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
         } else {
             omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
@@ -131,4 +134,49 @@ public class OMSUtil {
         }
         return keyValue;
     }
+
+    /**
+     * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
+     */
+    public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
+        return new Iterator<T>() {
+            Iterator<T> iterator = new Iterator<T>() {
+                @Override
+                public synchronized boolean hasNext() {
+                    return false;
+                }
+
+                @Override
+                public synchronized T next() {
+                    throw new NoSuchElementException();
+                }
+
+                @Override
+                public synchronized void remove() {
+                    //Ignore
+                }
+            };
+
+            @Override
+            public synchronized boolean hasNext() {
+                return iterator.hasNext() || iterable.iterator().hasNext();
+            }
+
+            @Override
+            public synchronized T next() {
+                if (!iterator.hasNext()) {
+                    iterator = iterable.iterator();
+                    if (!iterator.hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+                }
+                return iterator.next();
+            }
+
+            @Override
+            public synchronized void remove() {
+                iterator.remove();
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
new file mode 100644
index 0000000..968229a
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+class LocalMessageCache {
+    private final BlockingQueue<ConsumeRequest> consumeRequestCache;
+    private final Map<String, ConsumeRequest> consumedRequest;
+    private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
+    private final DefaultMQPullConsumer rocketmqPullConsumer;
+    private int pullBatchNums = 32;
+    private int pollTimeout = -1;
+    private final static Logger log = ClientLogger.getLog();
+
+    LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final KeyValue properties)
{
+        int cacheCapacity = 1000;
+        if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY)) {
+            cacheCapacity = properties.getInt(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY);
+        }
+        consumeRequestCache = new LinkedBlockingQueue<>(cacheCapacity);
+
+        if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS)) {
+            pullBatchNums = properties.getInt(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS);
+        }
+
+        if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
+            pollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+        }
+
+        this.consumedRequest = new ConcurrentHashMap<>();
+        this.pullOffsetTable = new ConcurrentHashMap<>();
+        this.rocketmqPullConsumer = rocketmqPullConsumer;
+    }
+
+    int nextPullBatchNums() {
+        return Math.min(pullBatchNums, consumeRequestCache.remainingCapacity());
+    }
+
+    long nextPullOffset(MessageQueue remoteQueue) {
+        if (!pullOffsetTable.containsKey(remoteQueue)) {
+            try {
+                pullOffsetTable.putIfAbsent(remoteQueue,
+                    rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
+            } catch (MQClientException e) {
+                log.error("A error occurred in fetch consume offset process.", e);
+            }
+        }
+        return pullOffsetTable.get(remoteQueue);
+    }
+
+    void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+        pullOffsetTable.put(remoteQueue, nextPullOffset);
+    }
+
+    void submitConsumeRequest(ConsumeRequest consumeRequest) {
+        try {
+            consumeRequestCache.put(consumeRequest);
+        } catch (InterruptedException ignore) {
+        }
+    }
+
+    MessageExt poll() {
+        try {
+            ConsumeRequest consumeRequest = consumeRequestCache.take();
+            consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+            consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
+            return consumeRequest.getMessageExt();
+        } catch (InterruptedException ignore) {
+        }
+        return null;
+    }
+
+    MessageExt poll(final KeyValue properties) {
+        int currentPollTimeout = pollTimeout;
+        if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
+            currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+        }
+
+        if (currentPollTimeout == -1) {
+            return poll();
+        }
+
+        try {
+            ConsumeRequest consumeRequest = consumeRequestCache.poll(currentPollTimeout,
TimeUnit.MILLISECONDS);
+            consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+            consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
+            return consumeRequest.getMessageExt();
+        } catch (InterruptedException ignore) {
+        }
+        return null;
+    }
+
+    void ack(final String messageId) {
+        ConsumeRequest consumeRequest = consumedRequest.remove(messageId);
+        if (consumeRequest != null) {
+            long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt()));
+            try {
+                rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(),
offset);
+            } catch (MQClientException e) {
+                log.error("A error occurred in update consume offset process.", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index 6730b1f..bd33d78 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -18,45 +18,142 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.Message;
+import io.openmessaging.PropertyKeys;
 import io.openmessaging.PullConsumer;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.OMSUtil;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullTaskCallback;
+import org.apache.rocketmq.client.consumer.PullTaskContext;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
 
 public class PullConsumerImpl implements PullConsumer {
-    public PullConsumerImpl(final KeyValue properties) {
+    private final DefaultMQPullConsumer rocketmqPullConsumer;
+    private final KeyValue properties;
+    private boolean started = false;
+    private String targetQueueName;
+    private final MQPullConsumerScheduleService pullConsumerScheduleService;
+    private final LocalMessageCache localMessageCache;
 
+    final static Logger log = ClientLogger.getLog();
+
+    public PullConsumerImpl(final String queueName, final KeyValue properties) {
+        this.properties = properties;
+        this.targetQueueName = queueName;
+
+        String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP);
+        if (null == consumerGroup || consumerGroup.isEmpty()) {
+            throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ,
please set it.");
+        }
+        pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup);
+
+        this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
+
+        String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
+        if (accessPoints == null || accessPoints.isEmpty()) {
+            throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+        }
+        this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
+
+        this.rocketmqPullConsumer.setConsumerGroup(consumerGroup);
+
+        int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES);
+        if (maxReDeliveryTimes != 0) {
+            this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
+        }
+
+        String consumerId = OMSUtil.buildInstanceName();
+        this.rocketmqPullConsumer.setInstanceName(consumerId);
+        properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+
+        this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, properties);
     }
 
     @Override
     public KeyValue properties() {
-        return null;
+        return properties;
     }
 
     @Override
     public Message poll() {
-        return null;
+        return OMSUtil.msgConvert(localMessageCache.poll());
     }
 
     @Override
     public Message poll(final KeyValue properties) {
-        return null;
+        return OMSUtil.msgConvert(localMessageCache.poll(properties));
     }
 
     @Override
     public void ack(final String messageId) {
-
+        localMessageCache.ack(messageId);
     }
 
     @Override
     public void ack(final String messageId, final KeyValue properties) {
-
+        localMessageCache.ack(messageId);
     }
 
     @Override
-    public void startup() {
+    public synchronized void startup() {
+        if (!started) {
+            try {
+                registerPullTaskCallback();
+                this.pullConsumerScheduleService.start();
+            } catch (MQClientException e) {
+                throw new OMSRuntimeException("-1", e);
+            }
+        }
+        this.started = true;
+    }
+
+    private void registerPullTaskCallback() {
+        this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback()
{
+            @Override
+            public void doPullTask(final MessageQueue mq, final PullTaskContext context)
{
+                MQPullConsumer consumer = context.getPullConsumer();
+                try {
+                    long offset = localMessageCache.nextPullOffset(mq);
 
+                    PullResult pullResult = consumer.pull(mq, "*",
+                        offset, localMessageCache.nextPullBatchNums());
+                    ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
+                        .getProcessQueueTable().get(mq);
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            if (pq != null) {
+                                for (final MessageExt messageExt : pullResult.getMsgFoundList())
{
+                                    localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt,
mq, pq));
+                                }
+                            }
+                            break;
+                        default:
+                            break;
+                    }
+                    localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());
+                } catch (Exception e) {
+                    log.error("A error occurred in pull message process.", e);
+                }
+            }
+        });
     }
 
     @Override
-    public void shutdown() {
-
+    public synchronized void shutdown() {
+        if (this.started) {
+            this.pullConsumerScheduleService.shutdown();
+            this.rocketmqPullConsumer.shutdown();
+        }
+        this.started = false;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index cd83212..9c3b6a9 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -156,6 +156,8 @@ public class PushConsumerImpl implements PushConsumer {
             final KeyValue contextProperties = OMS.newKeyValue();
             final CountDownLatch sync = new CountDownLatch(1);
 
+            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+
             ReceivedMessageContext context = new ReceivedMessageContext() {
                 @Override
                 public KeyValue properties() {
@@ -176,9 +178,13 @@ public class PushConsumerImpl implements PushConsumer {
                         properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
                 }
             };
+            long begin = System.currentTimeMillis();
             listener.onMessage(omsMsg, context);
+            long costs = System.currentTimeMillis() - begin;
+
             try {
-                sync.await(PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout(),
TimeUnit.MILLISECONDS);
+                sync.await(Math.max(0, PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout()
- costs)
+                    , TimeUnit.MILLISECONDS);
             } catch (InterruptedException ignore) {
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
new file mode 100644
index 0000000..7ce4a9b
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class ConsumeRequest {
+    private final MessageExt messageExt;
+    private final MessageQueue messageQueue;
+    private final ProcessQueue processQueue;
+    private long startConsumeTimeMillis;
+
+    public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue,
+        final ProcessQueue processQueue) {
+        this.messageExt = messageExt;
+        this.messageQueue = messageQueue;
+        this.processQueue = processQueue;
+    }
+
+    public MessageExt getMessageExt() {
+        return messageExt;
+    }
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+    public ProcessQueue getProcessQueue() {
+        return processQueue;
+    }
+
+    public long getStartConsumeTimeMillis() {
+        return startConsumeTimeMillis;
+    }
+
+    public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
+        this.startConsumeTimeMillis = startConsumeTimeMillis;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
index 566a17d..3639a3f 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -25,4 +25,6 @@ public interface NonStandardKeys {
     String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums";
     String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status";
     String MESSAGE_DESTINATION = "rmq.message.destination";
+    String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
+    String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
 }


Mime
View raw message