This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 958eb74 [for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableChannel;
Fix problem that tx check may loss when channel is busy (#1627)
958eb74 is described below
commit 958eb7498112429aecb19785feff117080c01465
Author: huangli <areyouok@gmail.com>
AuthorDate: Fri Jan 10 20:50:59 2020 +0800
[for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableChannel; Fix problem
that tx check may loss when channel is busy (#1627)
---
.../rocketmq/broker/client/ProducerManager.java | 262 ++++++++-------------
.../broker/processor/AdminBrokerProcessor.java | 2 +-
.../broker/client/ProducerManagerTest.java | 30 ++-
.../processor/ClientManageProcessorTest.java | 3 +-
4 files changed, 127 insertions(+), 170 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 12f632b..860b349 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -18,15 +18,11 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -36,205 +32,145 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
- private final Lock groupChannelLock = new ReentrantLock();
- private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>>
groupChannelTable =
- new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel,
ClientChannelInfo>> groupChannelTable =
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
public ProducerManager() {
}
- public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable()
{
- HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>>
newGroupChannelTable =
- new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
- try {
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
- try {
- Iterator<Map.Entry<String, HashMap<Channel, ClientChannelInfo>>>
iter = groupChannelTable.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, HashMap<Channel, ClientChannelInfo>>
entry = iter.next();
- String key = entry.getKey();
- HashMap<Channel, ClientChannelInfo> val = entry.getValue();
- HashMap<Channel, ClientChannelInfo> tmp = new HashMap<Channel,
ClientChannelInfo>();
- tmp.putAll(val);
- newGroupChannelTable.put(key, tmp);
- }
- } finally {
- groupChannelLock.unlock();
- }
- }
- } catch (InterruptedException e) {
- log.error("", e);
- }
- return newGroupChannelTable;
+ public ConcurrentHashMap<String, ConcurrentHashMap<Channel, ClientChannelInfo>>
getGroupChannelTable() {
+ return groupChannelTable;
}
public void scanNotActiveChannel() {
- try {
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
- try {
- for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>>
entry : this.groupChannelTable
- .entrySet()) {
- final String group = entry.getKey();
- final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
-
- Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Channel, ClientChannelInfo> item = it.next();
- // final Integer id = item.getKey();
- final ClientChannelInfo info = item.getValue();
-
- long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
- if (diff > CHANNEL_EXPIRED_TIMEOUT) {
- it.remove();
- clientChannelTable.remove(info.getClientId());
- log.warn(
- "SCAN: remove expired channel[{}] from ProducerManager
groupChannelTable, producer group name: {}",
- RemotingHelper.parseChannelRemoteAddr(info.getChannel()),
group);
- RemotingUtil.closeChannel(info.getChannel());
- }
- }
- }
- } finally {
- this.groupChannelLock.unlock();
+ for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>
entry : this.groupChannelTable
+ .entrySet()) {
+ final String group = entry.getKey();
+ final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
+
+ Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, ClientChannelInfo> item = it.next();
+ // final Integer id = item.getKey();
+ final ClientChannelInfo info = item.getValue();
+
+ long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
+ if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+ it.remove();
+ clientChannelTable.remove(info.getClientId());
+ log.warn(
+ "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable,
producer group name: {}",
+ RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+ RemotingUtil.closeChannel(info.getChannel());
}
- } else {
- log.warn("ProducerManager scanNotActiveChannel lock timeout");
}
- } catch (InterruptedException e) {
- log.error("", e);
}
}
- public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel)
{
if (channel != null) {
- try {
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
- try {
- for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>>
entry : this.groupChannelTable
- .entrySet()) {
- final String group = entry.getKey();
- final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable
=
- entry.getValue();
- final ClientChannelInfo clientChannelInfo =
- clientChannelInfoTable.remove(channel);
- if (clientChannelInfo != null) {
- clientChannelTable.remove(clientChannelInfo.getClientId());
- log.info(
- "NETTY EVENT: remove channel[{}][{}] from ProducerManager
groupChannelTable, producer group: {}",
- clientChannelInfo.toString(), remoteAddr, group);
- }
-
- }
- } finally {
- this.groupChannelLock.unlock();
- }
- } else {
- log.warn("ProducerManager doChannelCloseEvent lock timeout");
+ for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>
entry : this.groupChannelTable
+ .entrySet()) {
+ final String group = entry.getKey();
+ final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable
=
+ entry.getValue();
+ final ClientChannelInfo clientChannelInfo =
+ clientChannelInfoTable.remove(channel);
+ if (clientChannelInfo != null) {
+ clientChannelTable.remove(clientChannelInfo.getClientId());
+ log.info(
+ "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable,
producer group: {}",
+ clientChannelInfo.toString(), remoteAddr, group);
}
- } catch (InterruptedException e) {
- log.error("", e);
+
}
}
}
- public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo)
{
- try {
- ClientChannelInfo clientChannelInfoFound = null;
-
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
- try {
- HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
- if (null == channelTable) {
- channelTable = new HashMap<>();
- this.groupChannelTable.put(group, channelTable);
- }
-
- clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
- if (null == clientChannelInfoFound) {
- channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
- clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
- log.info("new producer connected, group: {} channel: {}", group,
- clientChannelInfo.toString());
- }
- } finally {
- this.groupChannelLock.unlock();
- }
+ public synchronized void registerProducer(final String group, final ClientChannelInfo
clientChannelInfo) {
+ ClientChannelInfo clientChannelInfoFound = null;
- if (clientChannelInfoFound != null) {
- clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
- }
- } else {
- log.warn("ProducerManager registerProducer lock timeout");
- }
- } catch (InterruptedException e) {
- log.error("", e);
+ ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ if (null == channelTable) {
+ channelTable = new ConcurrentHashMap<>();
+ this.groupChannelTable.put(group, channelTable);
+ }
+
+ clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
+ if (null == clientChannelInfoFound) {
+ channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+ clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
+ log.info("new producer connected, group: {} channel: {}", group,
+ clientChannelInfo.toString());
+ }
+
+
+ if (clientChannelInfoFound != null) {
+ clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
}
- public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo)
{
- try {
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
{
- try {
- HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
- if (null != channelTable && !channelTable.isEmpty()) {
- ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
- clientChannelTable.remove(clientChannelInfo.getClientId());
- if (old != null) {
- log.info("unregister a producer[{}] from groupChannelTable {}",
group,
- clientChannelInfo.toString());
- }
-
- if (channelTable.isEmpty()) {
- this.groupChannelTable.remove(group);
- log.info("unregister a producer group[{}] from groupChannelTable",
group);
- }
- }
- } finally {
- this.groupChannelLock.unlock();
- }
- } else {
- log.warn("ProducerManager unregisterProducer lock timeout");
+ public synchronized void unregisterProducer(final String group, final ClientChannelInfo
clientChannelInfo) {
+ ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ if (null != channelTable && !channelTable.isEmpty()) {
+ ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+ clientChannelTable.remove(clientChannelInfo.getClientId());
+ if (old != null) {
+ log.info("unregister a producer[{}] from groupChannelTable {}", group,
+ clientChannelInfo.toString());
+ }
+
+ if (channelTable.isEmpty()) {
+ this.groupChannelTable.remove(group);
+ log.info("unregister a producer group[{}] from groupChannelTable", group);
}
- } catch (InterruptedException e) {
- log.error("", e);
}
}
public Channel getAvaliableChannel(String groupId) {
- HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
+ if (groupId == null) {
+ return null;
+ }
List<Channel> channelList = new ArrayList<Channel>();
+ ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap
= groupChannelTable.get(groupId);
if (channelClientChannelInfoHashMap != null) {
for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
channelList.add(channel);
}
- int size = channelList.size();
- if (0 == size) {
- log.warn("Channel list is empty. groupId={}", groupId);
- return null;
- }
-
- int index = positiveAtomicCounter.incrementAndGet() % size;
- Channel channel = channelList.get(index);
- int count = 0;
- boolean isOk = channel.isActive() && channel.isWritable();
- while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
- if (isOk) {
- return channel;
- }
- index = (++index) % size;
- channel = channelList.get(index);
- isOk = channel.isActive() && channel.isWritable();
- }
} else {
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
return null;
}
- return null;
+
+ int size = channelList.size();
+ if (0 == size) {
+ log.warn("Channel list is empty. groupId={}", groupId);
+ return null;
+ }
+
+ Channel lastActiveChannel = null;
+
+ int index = positiveAtomicCounter.incrementAndGet() % size;
+ Channel channel = channelList.get(index);
+ int count = 0;
+ boolean isOk = channel.isActive() && channel.isWritable();
+ while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
+ if (isOk) {
+ return channel;
+ }
+ if (channel.isActive()) {
+ lastActiveChannel = channel;
+ }
+ index = (++index) % size;
+ channel = channelList.get(index);
+ isOk = channel.isActive() && channel.isWritable();
+ }
+
+ return lastActiveChannel;
}
public Channel findChannel(String clientId) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index df0ec90..b2edc1a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -815,7 +815,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
(GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
ProducerConnection bodydata = new ProducerConnection();
- HashMap<Channel, ClientChannelInfo> channelInfoHashMap =
+ Map<Channel, ClientChannelInfo> channelInfoHashMap =
this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
if (channelInfoHashMap != null) {
Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator();
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index d9539b6..4791ab1 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -19,7 +19,8 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field;
-import java.util.HashMap;
+import java.util.Map;
+
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
@@ -74,7 +75,7 @@ public class ProducerManagerTest {
@Test
public void testRegisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
- HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
+ Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
Channel channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNotNull();
assertThat(channel1).isNotNull();
@@ -85,7 +86,7 @@ public class ProducerManagerTest {
@Test
public void unregisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
- HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
+ Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
Channel channel1 = producerManager.findChannel("clientId");
@@ -102,9 +103,28 @@ public class ProducerManagerTest {
@Test
public void testGetGroupChannelTable() throws Exception {
producerManager.registerProducer(group, clientInfo);
- HashMap<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group);
+ Map<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group);
producerManager.unregisterProducer(group, clientInfo);
- assertThat(oldMap.size()).isNotEqualTo(0);
+ assertThat(oldMap.size()).isEqualTo(0);
}
+
+ @Test
+ public void testGetAvaliableChannel() {
+ producerManager.registerProducer(group, clientInfo);
+
+ when(channel.isActive()).thenReturn(true);
+ when(channel.isWritable()).thenReturn(true);
+ Channel c = producerManager.getAvaliableChannel(group);
+ assertThat(c).isSameAs(channel);
+
+ when(channel.isWritable()).thenReturn(false);
+ c = producerManager.getAvaliableChannel(group);
+ assertThat(c).isSameAs(channel);
+
+ when(channel.isActive()).thenReturn(false);
+ c = producerManager.getAvaliableChannel(group);
+ assertThat(c).isNull();
+ }
+
}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
index 147c732..3d893ac 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
@@ -81,7 +82,7 @@ public class ClientManageProcessorTest {
@Test
public void processRequest_UnRegisterProducer() throws Exception {
brokerController.getProducerManager().registerProducer(group, clientChannelInfo);
- HashMap<Channel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
+ Map<Channel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo);
|