Add PullConsumer related implementation for OpenMessaging.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/625ba077
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/625ba077
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/625ba077
Branch: refs/heads/openmessaging-impl
Commit: 625ba07728524d778a8faa73867a6bb39cb9976c
Parents: a5ea4e4
Author: yukon <yukon@apache.org>
Authored: Tue Apr 18 22:42:00 2017 +0800
Committer: yukon <yukon@apache.org>
Committed: Tue Apr 18 22:42:00 2017 +0800
----------------------------------------------------------------------
.../openmessaging/SimplePullConsumer.java | 56 ++++++++
.../openmessaging/SimplePushConsumer.java | 3 +-
.../rocketmq/MessagingAccessPointImpl.java | 4 +-
.../java/io/openmessaging/rocketmq/OMSUtil.java | 50 ++++++-
.../rocketmq/consumer/LocalMessageCache.java | 134 +++++++++++++++++++
.../rocketmq/consumer/PullConsumerImpl.java | 115 ++++++++++++++--
.../rocketmq/consumer/PushConsumerImpl.java | 8 +-
.../rocketmq/domain/ConsumeRequest.java | 55 ++++++++
.../rocketmq/domain/NonStandardKeys.java | 2 +
9 files changed, 413 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
new file mode 100644
index 0000000..8dd7b23
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.openmessaging;
+
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class SimplePullConsumer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
+
+ final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
+ OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+ messagingAccessPoint.startup();
+ System.out.println("messagingAccessPoint startup OK");
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+ }));
+
+ consumer.startup();
+ System.out.println("consumer startup OK");
+
+ while (true) {
+ Message message = consumer.poll();
+ String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
+ System.out.println("Received one message: " + msgId);
+ consumer.ack(msgId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
index 6fc8e39..813e301 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
import io.openmessaging.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
@@ -47,7 +48,7 @@ public class SimplePushConsumer {
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context)
{
- System.out.println("Received one message: " + message);
+ System.out.println("Received one message: " + message.headers().getString(MessageHeader.MESSAGE_ID));
context.ack();
}
});
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index fecd69f..af1695b 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -76,12 +76,12 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint
{
@Override
public PullConsumer createPullConsumer(String queueName) {
- return new PullConsumerImpl(accessPointProperties);
+ return new PullConsumerImpl(queueName, accessPointProperties);
}
@Override
public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
- return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+ return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties,
properties));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
index dd591a6..87037ee 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
@@ -25,7 +25,9 @@ import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
+import java.util.Iterator;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.UtilAll;
@@ -88,7 +90,8 @@ public class OMSUtil {
}
omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
- if (rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC"))
{
+ if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
+ rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC"))
{
omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
} else {
omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
@@ -131,4 +134,49 @@ public class OMSUtil {
}
return keyValue;
}
+
+ /**
+ * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
+ */
+ public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
+ return new Iterator<T>() {
+ Iterator<T> iterator = new Iterator<T>() {
+ @Override
+ public synchronized boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public synchronized T next() {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public synchronized void remove() {
+ //Ignore
+ }
+ };
+
+ @Override
+ public synchronized boolean hasNext() {
+ return iterator.hasNext() || iterable.iterator().hasNext();
+ }
+
+ @Override
+ public synchronized T next() {
+ if (!iterator.hasNext()) {
+ iterator = iterable.iterator();
+ if (!iterator.hasNext()) {
+ throw new NoSuchElementException();
+ }
+ }
+ return iterator.next();
+ }
+
+ @Override
+ public synchronized void remove() {
+ iterator.remove();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
new file mode 100644
index 0000000..968229a
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+class LocalMessageCache {
+ private final BlockingQueue<ConsumeRequest> consumeRequestCache;
+ private final Map<String, ConsumeRequest> consumedRequest;
+ private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
+ private final DefaultMQPullConsumer rocketmqPullConsumer;
+ private int pullBatchNums = 32;
+ private int pollTimeout = -1;
+ private final static Logger log = ClientLogger.getLog();
+
+ LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final KeyValue properties)
{
+ int cacheCapacity = 1000;
+ if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY)) {
+ cacheCapacity = properties.getInt(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY);
+ }
+ consumeRequestCache = new LinkedBlockingQueue<>(cacheCapacity);
+
+ if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS)) {
+ pullBatchNums = properties.getInt(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS);
+ }
+
+ if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
+ pollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+ }
+
+ this.consumedRequest = new ConcurrentHashMap<>();
+ this.pullOffsetTable = new ConcurrentHashMap<>();
+ this.rocketmqPullConsumer = rocketmqPullConsumer;
+ }
+
+ int nextPullBatchNums() {
+ return Math.min(pullBatchNums, consumeRequestCache.remainingCapacity());
+ }
+
+ long nextPullOffset(MessageQueue remoteQueue) {
+ if (!pullOffsetTable.containsKey(remoteQueue)) {
+ try {
+ pullOffsetTable.putIfAbsent(remoteQueue,
+ rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
+ } catch (MQClientException e) {
+ log.error("A error occurred in fetch consume offset process.", e);
+ }
+ }
+ return pullOffsetTable.get(remoteQueue);
+ }
+
+ void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+ pullOffsetTable.put(remoteQueue, nextPullOffset);
+ }
+
+ void submitConsumeRequest(ConsumeRequest consumeRequest) {
+ try {
+ consumeRequestCache.put(consumeRequest);
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ MessageExt poll() {
+ try {
+ ConsumeRequest consumeRequest = consumeRequestCache.take();
+ consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+ consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
+ return consumeRequest.getMessageExt();
+ } catch (InterruptedException ignore) {
+ }
+ return null;
+ }
+
+ MessageExt poll(final KeyValue properties) {
+ int currentPollTimeout = pollTimeout;
+ if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
+ currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+ }
+
+ if (currentPollTimeout == -1) {
+ return poll();
+ }
+
+ try {
+ ConsumeRequest consumeRequest = consumeRequestCache.poll(currentPollTimeout,
TimeUnit.MILLISECONDS);
+ consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+ consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
+ return consumeRequest.getMessageExt();
+ } catch (InterruptedException ignore) {
+ }
+ return null;
+ }
+
+ void ack(final String messageId) {
+ ConsumeRequest consumeRequest = consumedRequest.remove(messageId);
+ if (consumeRequest != null) {
+ long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt()));
+ try {
+ rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(),
offset);
+ } catch (MQClientException e) {
+ log.error("A error occurred in update consume offset process.", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index 6730b1f..bd33d78 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -18,45 +18,142 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
+import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.OMSUtil;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullTaskCallback;
+import org.apache.rocketmq.client.consumer.PullTaskContext;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
public class PullConsumerImpl implements PullConsumer {
- public PullConsumerImpl(final KeyValue properties) {
+ private final DefaultMQPullConsumer rocketmqPullConsumer;
+ private final KeyValue properties;
+ private boolean started = false;
+ private String targetQueueName;
+ private final MQPullConsumerScheduleService pullConsumerScheduleService;
+ private final LocalMessageCache localMessageCache;
+ final static Logger log = ClientLogger.getLog();
+
+ public PullConsumerImpl(final String queueName, final KeyValue properties) {
+ this.properties = properties;
+ this.targetQueueName = queueName;
+
+ String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP);
+ if (null == consumerGroup || consumerGroup.isEmpty()) {
+ throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ,
please set it.");
+ }
+ pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup);
+
+ this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
+
+ String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
+ if (accessPoints == null || accessPoints.isEmpty()) {
+ throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+ }
+ this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
+
+ this.rocketmqPullConsumer.setConsumerGroup(consumerGroup);
+
+ int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES);
+ if (maxReDeliveryTimes != 0) {
+ this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
+ }
+
+ String consumerId = OMSUtil.buildInstanceName();
+ this.rocketmqPullConsumer.setInstanceName(consumerId);
+ properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+
+ this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, properties);
}
@Override
public KeyValue properties() {
- return null;
+ return properties;
}
@Override
public Message poll() {
- return null;
+ return OMSUtil.msgConvert(localMessageCache.poll());
}
@Override
public Message poll(final KeyValue properties) {
- return null;
+ return OMSUtil.msgConvert(localMessageCache.poll(properties));
}
@Override
public void ack(final String messageId) {
-
+ localMessageCache.ack(messageId);
}
@Override
public void ack(final String messageId, final KeyValue properties) {
-
+ localMessageCache.ack(messageId);
}
@Override
- public void startup() {
+ public synchronized void startup() {
+ if (!started) {
+ try {
+ registerPullTaskCallback();
+ this.pullConsumerScheduleService.start();
+ } catch (MQClientException e) {
+ throw new OMSRuntimeException("-1", e);
+ }
+ }
+ this.started = true;
+ }
+
+ private void registerPullTaskCallback() {
+ this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback()
{
+ @Override
+ public void doPullTask(final MessageQueue mq, final PullTaskContext context)
{
+ MQPullConsumer consumer = context.getPullConsumer();
+ try {
+ long offset = localMessageCache.nextPullOffset(mq);
+ PullResult pullResult = consumer.pull(mq, "*",
+ offset, localMessageCache.nextPullBatchNums());
+ ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
+ .getProcessQueueTable().get(mq);
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ if (pq != null) {
+ for (final MessageExt messageExt : pullResult.getMsgFoundList())
{
+ localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt,
mq, pq));
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());
+ } catch (Exception e) {
+ log.error("A error occurred in pull message process.", e);
+ }
+ }
+ });
}
@Override
- public void shutdown() {
-
+ public synchronized void shutdown() {
+ if (this.started) {
+ this.pullConsumerScheduleService.shutdown();
+ this.rocketmqPullConsumer.shutdown();
+ }
+ this.started = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index cd83212..9c3b6a9 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -156,6 +156,8 @@ public class PushConsumerImpl implements PushConsumer {
final KeyValue contextProperties = OMS.newKeyValue();
final CountDownLatch sync = new CountDownLatch(1);
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+
ReceivedMessageContext context = new ReceivedMessageContext() {
@Override
public KeyValue properties() {
@@ -176,9 +178,13 @@ public class PushConsumerImpl implements PushConsumer {
properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
};
+ long begin = System.currentTimeMillis();
listener.onMessage(omsMsg, context);
+ long costs = System.currentTimeMillis() - begin;
+
try {
- sync.await(PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout(),
TimeUnit.MILLISECONDS);
+ sync.await(Math.max(0, PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout()
- costs)
+ , TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
new file mode 100644
index 0000000..7ce4a9b
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class ConsumeRequest {
+ private final MessageExt messageExt;
+ private final MessageQueue messageQueue;
+ private final ProcessQueue processQueue;
+ private long startConsumeTimeMillis;
+
+ public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue,
+ final ProcessQueue processQueue) {
+ this.messageExt = messageExt;
+ this.messageQueue = messageQueue;
+ this.processQueue = processQueue;
+ }
+
+ public MessageExt getMessageExt() {
+ return messageExt;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ public ProcessQueue getProcessQueue() {
+ return processQueue;
+ }
+
+ public long getStartConsumeTimeMillis() {
+ return startConsumeTimeMillis;
+ }
+
+ public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
+ this.startConsumeTimeMillis = startConsumeTimeMillis;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/625ba077/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
index 566a17d..3639a3f 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -25,4 +25,6 @@ public interface NonStandardKeys {
String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums";
String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status";
String MESSAGE_DESTINATION = "rmq.message.destination";
+ String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
+ String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
}
|