rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] branch develop updated: [for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableChannel; Fix problem that tx check may loss when channel is busy (#1627)
Date Fri, 10 Jan 2020 12:51:17 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 958eb74  [for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableChannel;
Fix problem that tx check may loss when channel is busy (#1627)
958eb74 is described below

commit 958eb7498112429aecb19785feff117080c01465
Author: huangli <areyouok@gmail.com>
AuthorDate: Fri Jan 10 20:50:59 2020 +0800

    [for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableChannel; Fix problem
that tx check may loss when channel is busy (#1627)
---
 .../rocketmq/broker/client/ProducerManager.java    | 262 ++++++++-------------
 .../broker/processor/AdminBrokerProcessor.java     |   2 +-
 .../broker/client/ProducerManagerTest.java         |  30 ++-
 .../processor/ClientManageProcessorTest.java       |   3 +-
 4 files changed, 127 insertions(+), 170 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 12f632b..860b349 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -18,15 +18,11 @@ package org.apache.rocketmq.broker.client;
 
 import io.netty.channel.Channel;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -36,205 +32,145 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
 
 public class ProducerManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
     private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
-    private final Lock groupChannelLock = new ReentrantLock();
-    private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>>
groupChannelTable =
-        new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+    private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel,
ClientChannelInfo>> groupChannelTable =
+        new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
     private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
 
     public ProducerManager() {
     }
 
-    public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable()
{
-        HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>>
newGroupChannelTable =
-            new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
-                try {
-                    Iterator<Map.Entry<String, HashMap<Channel, ClientChannelInfo>>>
iter = groupChannelTable.entrySet().iterator();
-                    while (iter.hasNext()) {
-                        Map.Entry<String, HashMap<Channel, ClientChannelInfo>>
entry = iter.next();
-                        String key = entry.getKey();
-                        HashMap<Channel, ClientChannelInfo> val = entry.getValue();
-                        HashMap<Channel, ClientChannelInfo> tmp = new HashMap<Channel,
ClientChannelInfo>();
-                        tmp.putAll(val);
-                        newGroupChannelTable.put(key, tmp);
-                    }
-                } finally {
-                    groupChannelLock.unlock();
-                }
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
-        }
-        return newGroupChannelTable;
+    public ConcurrentHashMap<String, ConcurrentHashMap<Channel, ClientChannelInfo>>
getGroupChannelTable() {
+        return groupChannelTable;
     }
 
     public void scanNotActiveChannel() {
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
-                try {
-                    for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>>
entry : this.groupChannelTable
-                        .entrySet()) {
-                        final String group = entry.getKey();
-                        final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
-
-                        Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
-                        while (it.hasNext()) {
-                            Entry<Channel, ClientChannelInfo> item = it.next();
-                            // final Integer id = item.getKey();
-                            final ClientChannelInfo info = item.getValue();
-
-                            long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
-                            if (diff > CHANNEL_EXPIRED_TIMEOUT) {
-                                it.remove();
-                                clientChannelTable.remove(info.getClientId());
-                                log.warn(
-                                    "SCAN: remove expired channel[{}] from ProducerManager
groupChannelTable, producer group name: {}",
-                                    RemotingHelper.parseChannelRemoteAddr(info.getChannel()),
group);
-                                RemotingUtil.closeChannel(info.getChannel());
-                            }
-                        }
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
+        for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>
entry : this.groupChannelTable
+                .entrySet()) {
+            final String group = entry.getKey();
+            final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
+
+            Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<Channel, ClientChannelInfo> item = it.next();
+                // final Integer id = item.getKey();
+                final ClientChannelInfo info = item.getValue();
+
+                long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
+                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+                    it.remove();
+                    clientChannelTable.remove(info.getClientId());
+                    log.warn(
+                            "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable,
producer group name: {}",
+                            RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+                    RemotingUtil.closeChannel(info.getChannel());
                 }
-            } else {
-                log.warn("ProducerManager scanNotActiveChannel lock timeout");
             }
-        } catch (InterruptedException e) {
-            log.error("", e);
         }
     }
 
-    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+    public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel)
{
         if (channel != null) {
-            try {
-                if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
-                    try {
-                        for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>>
entry : this.groupChannelTable
-                            .entrySet()) {
-                            final String group = entry.getKey();
-                            final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable
=
-                                entry.getValue();
-                            final ClientChannelInfo clientChannelInfo =
-                                clientChannelInfoTable.remove(channel);
-                            if (clientChannelInfo != null) {
-                                clientChannelTable.remove(clientChannelInfo.getClientId());
-                                log.info(
-                                    "NETTY EVENT: remove channel[{}][{}] from ProducerManager
groupChannelTable, producer group: {}",
-                                    clientChannelInfo.toString(), remoteAddr, group);
-                            }
-
-                        }
-                    } finally {
-                        this.groupChannelLock.unlock();
-                    }
-                } else {
-                    log.warn("ProducerManager doChannelCloseEvent lock timeout");
+            for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>
entry : this.groupChannelTable
+                    .entrySet()) {
+                final String group = entry.getKey();
+                final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable
=
+                        entry.getValue();
+                final ClientChannelInfo clientChannelInfo =
+                        clientChannelInfoTable.remove(channel);
+                if (clientChannelInfo != null) {
+                    clientChannelTable.remove(clientChannelInfo.getClientId());
+                    log.info(
+                            "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable,
producer group: {}",
+                            clientChannelInfo.toString(), remoteAddr, group);
                 }
-            } catch (InterruptedException e) {
-                log.error("", e);
+
             }
         }
     }
 
-    public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo)
{
-        try {
-            ClientChannelInfo clientChannelInfoFound = null;
-
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
-                try {
-                    HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
-                    if (null == channelTable) {
-                        channelTable = new HashMap<>();
-                        this.groupChannelTable.put(group, channelTable);
-                    }
-
-                    clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
-                    if (null == clientChannelInfoFound) {
-                        channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
-                        clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
-                        log.info("new producer connected, group: {} channel: {}", group,
-                            clientChannelInfo.toString());
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
-                }
+    public synchronized void registerProducer(final String group, final ClientChannelInfo
clientChannelInfo) {
+        ClientChannelInfo clientChannelInfoFound = null;
 
-                if (clientChannelInfoFound != null) {
-                    clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
-                }
-            } else {
-                log.warn("ProducerManager registerProducer lock timeout");
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
+        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+        if (null == channelTable) {
+            channelTable = new ConcurrentHashMap<>();
+            this.groupChannelTable.put(group, channelTable);
+        }
+
+        clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
+        if (null == clientChannelInfoFound) {
+            channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+            clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
+            log.info("new producer connected, group: {} channel: {}", group,
+                    clientChannelInfo.toString());
+        }
+
+
+        if (clientChannelInfoFound != null) {
+            clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
         }
     }
 
-    public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo)
{
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
-                try {
-                    HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
-                    if (null != channelTable && !channelTable.isEmpty()) {
-                        ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
-                        clientChannelTable.remove(clientChannelInfo.getClientId());
-                        if (old != null) {
-                            log.info("unregister a producer[{}] from groupChannelTable {}",
group,
-                                clientChannelInfo.toString());
-                        }
-
-                        if (channelTable.isEmpty()) {
-                            this.groupChannelTable.remove(group);
-                            log.info("unregister a producer group[{}] from groupChannelTable",
group);
-                        }
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
-                }
-            } else {
-                log.warn("ProducerManager unregisterProducer lock timeout");
+    public synchronized void unregisterProducer(final String group, final ClientChannelInfo
clientChannelInfo) {
+        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+        if (null != channelTable && !channelTable.isEmpty()) {
+            ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+            clientChannelTable.remove(clientChannelInfo.getClientId());
+            if (old != null) {
+                log.info("unregister a producer[{}] from groupChannelTable {}", group,
+                        clientChannelInfo.toString());
+            }
+
+            if (channelTable.isEmpty()) {
+                this.groupChannelTable.remove(group);
+                log.info("unregister a producer group[{}] from groupChannelTable", group);
             }
-        } catch (InterruptedException e) {
-            log.error("", e);
         }
     }
 
     public Channel getAvaliableChannel(String groupId) {
-        HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
+        if (groupId == null) {
+            return null;
+        }
         List<Channel> channelList = new ArrayList<Channel>();
+        ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap
= groupChannelTable.get(groupId);
         if (channelClientChannelInfoHashMap != null) {
             for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
                 channelList.add(channel);
             }
-            int size = channelList.size();
-            if (0 == size) {
-                log.warn("Channel list is empty. groupId={}", groupId);
-                return null;
-            }
-
-            int index = positiveAtomicCounter.incrementAndGet() % size;
-            Channel channel = channelList.get(index);
-            int count = 0;
-            boolean isOk = channel.isActive() && channel.isWritable();
-            while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
-                if (isOk) {
-                    return channel;
-                }
-                index = (++index) % size;
-                channel = channelList.get(index);
-                isOk = channel.isActive() && channel.isWritable();
-            }
         } else {
             log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
             return null;
         }
-        return null;
+
+        int size = channelList.size();
+        if (0 == size) {
+            log.warn("Channel list is empty. groupId={}", groupId);
+            return null;
+        }
+
+        Channel lastActiveChannel = null;
+
+        int index = positiveAtomicCounter.incrementAndGet() % size;
+        Channel channel = channelList.get(index);
+        int count = 0;
+        boolean isOk = channel.isActive() && channel.isWritable();
+        while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
+            if (isOk) {
+                return channel;
+            }
+            if (channel.isActive()) {
+                lastActiveChannel = channel;
+            }
+            index = (++index) % size;
+            channel = channelList.get(index);
+            isOk = channel.isActive() && channel.isWritable();
+        }
+
+        return lastActiveChannel;
     }
 
     public Channel findChannel(String clientId) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index df0ec90..b2edc1a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -815,7 +815,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
             (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
 
         ProducerConnection bodydata = new ProducerConnection();
-        HashMap<Channel, ClientChannelInfo> channelInfoHashMap =
+        Map<Channel, ClientChannelInfo> channelInfoHashMap =
             this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
         if (channelInfoHashMap != null) {
             Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator();
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index d9539b6..4791ab1 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -19,7 +19,8 @@ package org.apache.rocketmq.broker.client;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import java.lang.reflect.Field;
-import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.junit.Before;
 import org.junit.Test;
@@ -74,7 +75,7 @@ public class ProducerManagerTest {
     @Test
     public void testRegisterProducer() throws Exception {
         producerManager.registerProducer(group, clientInfo);
-        HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
+        Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
         Channel channel1 = producerManager.findChannel("clientId");
         assertThat(channelMap).isNotNull();
         assertThat(channel1).isNotNull();
@@ -85,7 +86,7 @@ public class ProducerManagerTest {
     @Test
     public void unregisterProducer() throws Exception {
         producerManager.registerProducer(group, clientInfo);
-        HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
+        Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
         assertThat(channelMap).isNotNull();
         assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
         Channel channel1 = producerManager.findChannel("clientId");
@@ -102,9 +103,28 @@ public class ProducerManagerTest {
     @Test
     public void testGetGroupChannelTable() throws Exception {
         producerManager.registerProducer(group, clientInfo);
-        HashMap<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group);
+        Map<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group);
         
         producerManager.unregisterProducer(group, clientInfo);
-        assertThat(oldMap.size()).isNotEqualTo(0);
+        assertThat(oldMap.size()).isEqualTo(0);
     }
+
+    @Test
+    public void testGetAvaliableChannel() {
+        producerManager.registerProducer(group, clientInfo);
+
+        when(channel.isActive()).thenReturn(true);
+        when(channel.isWritable()).thenReturn(true);
+        Channel c = producerManager.getAvaliableChannel(group);
+        assertThat(c).isSameAs(channel);
+
+        when(channel.isWritable()).thenReturn(false);
+        c = producerManager.getAvaliableChannel(group);
+        assertThat(c).isSameAs(channel);
+
+        when(channel.isActive()).thenReturn(false);
+        c = producerManager.getAvaliableChannel(group);
+        assertThat(c).isNull();
+    }
+
 }
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
index 147c732..3d893ac 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
@@ -81,7 +82,7 @@ public class ClientManageProcessorTest {
     @Test
     public void processRequest_UnRegisterProducer() throws Exception {
         brokerController.getProducerManager().registerProducer(group, clientChannelInfo);
-        HashMap<Channel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
+        Map<Channel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
         assertThat(channelMap).isNotNull();
         assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo);
 


Mime
View raw message