rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq] 01/03: MQPullConsumer support MessageSelector
Date Tue, 27 Nov 2018 02:27:49 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 3530b76e7bd1a493ac302948378b7d080430e2ec
Author: maowei.ymw <maowei.ymw@alibaba-inc.com>
AuthorDate: Fri Nov 2 17:03:04 2018 +0800

    MQPullConsumer support MessageSelector
---
 .../client/consumer/DefaultMQPullConsumer.java     | 26 ++++++
 .../rocketmq/client/consumer/MQPullConsumer.java   | 47 +++++++++++
 .../impl/consumer/DefaultMQPullConsumerImpl.java   | 95 ++++++++++++++++------
 .../client/impl/consumer/PullAPIWrapper.java       | 29 +------
 .../client/impl/factory/MQClientInstance.java      | 13 +++
 5 files changed, 157 insertions(+), 53 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index cd70670..6befbf3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -258,6 +258,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
     }
 
     @Override
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset,
int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
+        return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums);
+    }
+
+    @Override
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset,
int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
+        return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums,
timeout);
+    }
+
+    @Override
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback
pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback);
@@ -271,6 +283,20 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
     }
 
     @Override
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback)
+        throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback);
+    }
+
+    @Override
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback, long timeout)
+        throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback,
timeout);
+    }
+
+    @Override
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset,
int maxNums)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
         return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset,
maxNums);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 33002c9..28b807c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -67,6 +67,39 @@ public interface MQPullConsumer extends MQConsumer {
         MQBrokerException, InterruptedException;
 
     /**
+     * Pulling the messages, not blocking
+     * <p>
+     * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+     * </p>
+     *
+     * @param mq from which message queue
+     * @param selector message selector({@link MessageSelector}), can be null.
+     * @param offset from where to pull
+     * @param maxNums max pulling numbers
+     * @return The resulting {@code PullRequest}
+     */
+    PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+        final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
+
+    /**
+     * Pulling the messages in the specified timeout
+     * <p>
+     * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+     * </p>
+     *
+     * @param mq from which message queue
+     * @param selector message selector({@link MessageSelector}), can be null.
+     * @param offset from where to pull
+     * @param maxNums max pulling numbers
+     * @param timeout Pulling the messages in the specified timeout
+     * @return The resulting {@code PullRequest}
+     */
+    PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+        final int maxNums, final long timeout) throws MQClientException, RemotingException,
MQBrokerException,
+        InterruptedException;
+
+    /**
      * Pulling the messages in a async. way
      */
     void pull(final MessageQueue mq, final String subExpression, final long offset, final
int maxNums,
@@ -81,6 +114,20 @@ public interface MQPullConsumer extends MQConsumer {
         InterruptedException;
 
     /**
+     * Pulling the messages in a async. way. Support message selection
+     */
+    void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final
int maxNums,
+        final PullCallback pullCallback) throws MQClientException, RemotingException,
+        InterruptedException;
+
+    /**
+     * Pulling the messages in a async. way. Support message selection
+     */
+    void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final
int maxNums,
+        final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
+        InterruptedException;
+
+    /**
      * Pulling the messages,if no message arrival,blocking some time
      *
      * @return The resulting {@code PullRequest}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 420d89b..1804df2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -158,17 +159,58 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
     public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums,
long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
-        return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
     }
 
-    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int
maxNums, boolean block,
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset,
int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
+        return pull(mq, messageSelector, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset,
int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
+        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
+                mq.getTopic(), subExpression);
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.build(mq.getTopic(),
+                messageSelector.getExpression(), messageSelector.getExpressionType());
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long
offset, int maxNums, boolean block,
         long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
         this.makeSureStateOK();
 
         if (null == mq) {
             throw new MQClientException("mq is null", null);
-
         }
 
         if (offset < 0) {
@@ -183,20 +225,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
         int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
 
-        SubscriptionData subscriptionData;
-        try {
-            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
-                mq.getTopic(), subExpression);
-        } catch (Exception e) {
-            throw new MQClientException("parse subscription error", e);
-        }
-
         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend()
: timeout;
 
         PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
             mq,
             subscriptionData.getSubString(),
-            0L,
+            subscriptionData.getExpressionType(),
+            subscriptionData.getSubVersion(),
             offset,
             maxNums,
             sysFlag,
@@ -369,12 +404,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback
pullCallback,
         long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
+    }
+
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback)
+        throws MQClientException, RemotingException, InterruptedException {
+        pull(mq, messageSelector, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback,
+        long timeout)
+        throws MQClientException, RemotingException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
     }
 
     private void pullAsyncImpl(
         final MessageQueue mq,
-        final String subExpression,
+        final SubscriptionData subscriptionData,
         final long offset,
         final int maxNums,
         final PullCallback pullCallback,
@@ -403,20 +453,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         try {
             int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
 
-            final SubscriptionData subscriptionData;
-            try {
-                subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
-                    mq.getTopic(), subExpression);
-            } catch (Exception e) {
-                throw new MQClientException("parse subscription error", e);
-            }
-
             long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend()
: timeout;
 
             this.pullAPIWrapper.pullKernelImpl(
                 mq,
                 subscriptionData.getSubString(),
-                0L,
+                subscriptionData.getExpressionType(),
+                subscriptionData.getSubVersion(),
                 offset,
                 maxNums,
                 sysFlag,
@@ -444,7 +487,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset,
int maxNums)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
-        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
     public DefaultMQPullConsumer getDefaultMQPullConsumer() {
@@ -454,7 +498,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int
maxNums,
         PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
-        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true,
             this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index b650e35..ac44df4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -163,6 +163,7 @@ public class PullAPIWrapper {
                     this.recalculatePullFromWhichNode(mq), false);
         }
 
+
         if (findBrokerResult != null) {
             {
                 // check version
@@ -209,34 +210,6 @@ public class PullAPIWrapper {
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
     }
 
-    public PullResult pullKernelImpl(
-        final MessageQueue mq,
-        final String subExpression,
-        final long subVersion,
-        final long offset,
-        final int maxNums,
-        final int sysFlag,
-        final long commitOffset,
-        final long brokerSuspendMaxTimeMillis,
-        final long timeoutMillis,
-        final CommunicationMode communicationMode,
-        final PullCallback pullCallback
-    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
-        return pullKernelImpl(
-            mq,
-            subExpression,
-            ExpressionType.TAG,
-            subVersion, offset,
-            maxNums,
-            sysFlag,
-            commitOffset,
-            brokerSuspendMaxTimeMillis,
-            timeoutMillis,
-            communicationMode,
-            pullCallback
-        );
-    }
-
     public long recalculatePullFromWhichNode(final MessageQueue mq) {
         if (this.isConnectBrokerByUser()) {
             return this.defaultBrokerId;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9ffaed0..5172211 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1046,6 +1046,19 @@ public class MQClientInstance {
             if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
                 return this.brokerVersionTable.get(brokerName).get(brokerAddr);
             }
+        }else{
+            HeartbeatData heartbeatData = prepareHeartbeatData();
+            try {
+                int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData,
3000);
+                return version;
+            } catch (Exception e) {
+                if (this.isBrokerInNameServer(brokerAddr)) {
+                    log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr);
+                } else {
+                    log.info("send heart beat to broker[{} {}] exception, because the broker
not up, forget it", brokerName,
+                        brokerAddr);
+                }
+            }
         }
         return 0;
     }


Mime
View raw message