rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject incubator-rocketmq git commit: [ROCKETMQ-107] Fix possible concurrency problem on ServiceState when consumer start/shutdown, closes apache/incubator-rocketmq#68
Date Wed, 19 Apr 2017 03:59:07 GMT
Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop c183e0d40 -> f508f131f


[ROCKETMQ-107] Fix possible concurrency problem on ServiceState when consumer start/shutdown,
closes apache/incubator-rocketmq#68


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f508f131
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f508f131
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f508f131

Branch: refs/heads/develop
Commit: f508f131f7dee2bcb86e66a6beb7bbdedbe31bc6
Parents: c183e0d
Author: Jaskey <linjunjie1103@gmail.com>
Authored: Wed Apr 19 11:58:48 2017 +0800
Committer: yukon <yukon@apache.org>
Committed: Wed Apr 19 11:58:48 2017 +0800

----------------------------------------------------------------------
 .../consumer/DefaultMQPullConsumerImpl.java     | 21 +++++++++++++-------
 .../consumer/DefaultMQPushConsumerImpl.java     | 15 +++++++-------
 2 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f508f131/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index b26d062..7d43b37 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -70,7 +70,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     private final RPCHook rpcHook;
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
     private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
     private MQClientInstance mQClientFactory;
     private PullAPIWrapper pullAPIWrapper;
     private OffsetStore offsetStore;
@@ -161,7 +161,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
     }
 
-    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int
maxNums, boolean block, long timeout)
+    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int
maxNums, boolean block,
+        long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
         this.makeSureStateOK();
 
@@ -365,7 +366,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
     }
 
-    public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback
pullCallback, long timeout)
+    public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback
pullCallback,
+        long timeout)
         throws MQClientException, RemotingException, InterruptedException {
         this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
     }
@@ -449,7 +451,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         return defaultMQPullConsumer;
     }
 
-    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int
maxNums, PullCallback pullCallback)
+    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int
maxNums,
+        PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
         this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
             this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
@@ -510,7 +513,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         }
     }
 
-    public void shutdown() {
+    public synchronized void shutdown() {
         switch (this.serviceState) {
             case CREATE_JUST:
                 break;
@@ -528,7 +531,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         }
     }
 
-    public void start() throws MQClientException {
+    public synchronized void start() throws MQClientException {
         switch (this.serviceState) {
             case CREATE_JUST:
                 this.serviceState = ServiceState.START_FAILED;
@@ -593,6 +596,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             default:
                 break;
         }
+
     }
 
     private void checkConfig() throws MQClientException {
@@ -662,7 +666,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         this.offsetStore.updateOffset(mq, offset, false);
     }
 
-    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
+    public MessageExt viewMessage(String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException
{
         this.makeSureStateOK();
         return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
     }
@@ -692,6 +697,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         return serviceState;
     }
 
+    //Don't use this deprecated setter, which will be removed soon.
+    @Deprecated
     public void setServiceState(ServiceState serviceState) {
         this.serviceState = serviceState;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f508f131/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4f33732..67f3ebe 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -97,7 +97,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private final long consumerStartTimestamp = System.currentTimeMillis();
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
     private final RPCHook rpcHook;
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
     private MQClientInstance mQClientFactory;
     private PullAPIWrapper pullAPIWrapper;
     private volatile boolean pause = false;
@@ -515,7 +515,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
     }
 
-    public void shutdown() {
+    public synchronized void shutdown() {
         switch (this.serviceState) {
             case CREATE_JUST:
                 break;
@@ -535,7 +535,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
     }
 
-    public void start() throws MQClientException {
+    public synchronized void start() throws MQClientException {
         switch (this.serviceState) {
             case CREATE_JUST:
                 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}",
this.defaultMQPushConsumer.getConsumerGroup(),
@@ -615,9 +615,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
 
         this.updateTopicSubscribeInfoWhenSubscriptionChanged();
-
         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-
         this.mQClientFactory.rebalanceImmediately();
     }
 
@@ -855,7 +853,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         this.consumeMessageService.updateCorePoolSize(corePoolSize);
     }
 
-    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
+    public MessageExt viewMessage(String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException
{
         return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
     }
 
@@ -1014,7 +1013,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         return serviceState;
     }
 
-    public void setServiceState(ServiceState serviceState) {
+    //Don't use this deprecated setter, which will be removed soon.
+    @Deprecated
+    public synchronized void setServiceState(ServiceState serviceState) {
         this.serviceState = serviceState;
     }
 


Mime
View raw message