This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 3530b76e7bd1a493ac302948378b7d080430e2ec
Author: maowei.ymw <maowei.ymw@alibaba-inc.com>
AuthorDate: Fri Nov 2 17:03:04 2018 +0800
MQPullConsumer support MessageSelector
---
.../client/consumer/DefaultMQPullConsumer.java | 26 ++++++
.../rocketmq/client/consumer/MQPullConsumer.java | 47 +++++++++++
.../impl/consumer/DefaultMQPullConsumerImpl.java | 95 ++++++++++++++++------
.../client/impl/consumer/PullAPIWrapper.java | 29 +------
.../client/impl/factory/MQClientInstance.java | 13 +++
5 files changed, 157 insertions(+), 53 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index cd70670..6befbf3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -258,6 +258,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
}
@Override
+ public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset,
int maxNums)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
+ return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums);
+ }
+
+ @Override
+ public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset,
int maxNums, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
+ return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums,
timeout);
+ }
+
+ @Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback
pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback);
@@ -271,6 +283,20 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
}
@Override
+ public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+ PullCallback pullCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback);
+ }
+
+ @Override
+ public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+ PullCallback pullCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback,
timeout);
+ }
+
+ @Override
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset,
int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset,
maxNums);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 33002c9..28b807c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -67,6 +67,39 @@ public interface MQPullConsumer extends MQConsumer {
MQBrokerException, InterruptedException;
/**
+ * Pulling the messages, not blocking
+ * <p>
+ * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+ * </p>
+ *
+ * @param mq from which message queue
+ * @param selector message selector({@link MessageSelector}), can be null.
+ * @param offset from where to pull
+ * @param maxNums max pulling numbers
+ * @return The resulting {@code PullRequest}
+ */
+ PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+ final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+
+ /**
+ * Pulling the messages in the specified timeout
+ * <p>
+ * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+ * </p>
+ *
+ * @param mq from which message queue
+ * @param selector message selector({@link MessageSelector}), can be null.
+ * @param offset from where to pull
+ * @param maxNums max pulling numbers
+ * @param timeout Pulling the messages in the specified timeout
+ * @return The resulting {@code PullRequest}
+ */
+ PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+ final int maxNums, final long timeout) throws MQClientException, RemotingException,
MQBrokerException,
+ InterruptedException;
+
+ /**
* Pulling the messages in a async. way
*/
void pull(final MessageQueue mq, final String subExpression, final long offset, final
int maxNums,
@@ -81,6 +114,20 @@ public interface MQPullConsumer extends MQConsumer {
InterruptedException;
/**
+ * Pulling the messages in a async. way. Support message selection
+ */
+ void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final
int maxNums,
+ final PullCallback pullCallback) throws MQClientException, RemotingException,
+ InterruptedException;
+
+ /**
+ * Pulling the messages in a async. way. Support message selection
+ */
+ void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final
int maxNums,
+ final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
+ InterruptedException;
+
+ /**
* Pulling the messages,if no message arrival,blocking some time
*
* @return The resulting {@code PullRequest}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 420d89b..1804df2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -158,17 +159,58 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums,
long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
- return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
+ SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+ return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
}
- private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int
maxNums, boolean block,
+ public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset,
int maxNums)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
+ return pull(mq, messageSelector, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+ }
+
+ public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset,
int maxNums, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
+ SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+ return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
+ }
+
+ private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
+ throws MQClientException {
+
+ if (null == mq) {
+ throw new MQClientException("mq is null", null);
+ }
+
+ try {
+ return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
+ mq.getTopic(), subExpression);
+ } catch (Exception e) {
+ throw new MQClientException("parse subscription error", e);
+ }
+ }
+
+ private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
+ throws MQClientException {
+
+ if (null == mq) {
+ throw new MQClientException("mq is null", null);
+ }
+
+ try {
+ return FilterAPI.build(mq.getTopic(),
+ messageSelector.getExpression(), messageSelector.getExpressionType());
+ } catch (Exception e) {
+ throw new MQClientException("parse subscription error", e);
+ }
+ }
+
+ private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long
offset, int maxNums, boolean block,
long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
this.makeSureStateOK();
if (null == mq) {
throw new MQClientException("mq is null", null);
-
}
if (offset < 0) {
@@ -183,20 +225,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
- SubscriptionData subscriptionData;
- try {
- subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
- mq.getTopic(), subExpression);
- } catch (Exception e) {
- throw new MQClientException("parse subscription error", e);
- }
-
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend()
: timeout;
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
- 0L,
+ subscriptionData.getExpressionType(),
+ subscriptionData.getSubVersion(),
offset,
maxNums,
sysFlag,
@@ -369,12 +404,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback
pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
- this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
+ SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+ this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
+ }
+
+ public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+ PullCallback pullCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ pull(mq, messageSelector, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+ }
+
+ public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+ PullCallback pullCallback,
+ long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+ this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
}
private void pullAsyncImpl(
final MessageQueue mq,
- final String subExpression,
+ final SubscriptionData subscriptionData,
final long offset,
final int maxNums,
final PullCallback pullCallback,
@@ -403,20 +453,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
try {
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
- final SubscriptionData subscriptionData;
- try {
- subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
- mq.getTopic(), subExpression);
- } catch (Exception e) {
- throw new MQClientException("parse subscription error", e);
- }
-
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend()
: timeout;
this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
- 0L,
+ subscriptionData.getExpressionType(),
+ subscriptionData.getSubVersion(),
offset,
maxNums,
sysFlag,
@@ -444,7 +487,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset,
int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
- return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+ SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+ return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
public DefaultMQPullConsumer getDefaultMQPullConsumer() {
@@ -454,7 +498,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int
maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
- this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
+ SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+ this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true,
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index b650e35..ac44df4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -163,6 +163,7 @@ public class PullAPIWrapper {
this.recalculatePullFromWhichNode(mq), false);
}
+
if (findBrokerResult != null) {
{
// check version
@@ -209,34 +210,6 @@ public class PullAPIWrapper {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
- public PullResult pullKernelImpl(
- final MessageQueue mq,
- final String subExpression,
- final long subVersion,
- final long offset,
- final int maxNums,
- final int sysFlag,
- final long commitOffset,
- final long brokerSuspendMaxTimeMillis,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- final PullCallback pullCallback
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
- return pullKernelImpl(
- mq,
- subExpression,
- ExpressionType.TAG,
- subVersion, offset,
- maxNums,
- sysFlag,
- commitOffset,
- brokerSuspendMaxTimeMillis,
- timeoutMillis,
- communicationMode,
- pullCallback
- );
- }
-
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9ffaed0..5172211 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1046,6 +1046,19 @@ public class MQClientInstance {
if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
return this.brokerVersionTable.get(brokerName).get(brokerAddr);
}
+ }else{
+ HeartbeatData heartbeatData = prepareHeartbeatData();
+ try {
+ int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData,
3000);
+ return version;
+ } catch (Exception e) {
+ if (this.isBrokerInNameServer(brokerAddr)) {
+ log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr);
+ } else {
+ log.info("send heart beat to broker[{} {}] exception, because the broker
not up, forget it", brokerName,
+ brokerAddr);
+ }
+ }
}
return 0;
}
|