From commits-return-1437-apmail-rocketmq-commits-archive=rocketmq.apache.org@rocketmq.incubator.apache.org Tue Jun 6 03:38:31 2017 Return-Path: X-Original-To: apmail-rocketmq-commits-archive@minotaur.apache.org Delivered-To: apmail-rocketmq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E3602191A3 for ; Tue, 6 Jun 2017 03:38:30 +0000 (UTC) Received: (qmail 8763 invoked by uid 500); 6 Jun 2017 03:38:30 -0000 Delivered-To: apmail-rocketmq-commits-archive@rocketmq.apache.org Received: (qmail 8734 invoked by uid 500); 6 Jun 2017 03:38:30 -0000 Mailing-List: contact commits-help@rocketmq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@rocketmq.incubator.apache.org Delivered-To: mailing list commits@rocketmq.incubator.apache.org Received: (qmail 8725 invoked by uid 99); 6 Jun 2017 03:38:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jun 2017 03:38:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 616D91A03FD for ; Tue, 6 Jun 2017 03:38:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.721 X-Spam-Level: X-Spam-Status: No, score=-3.721 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_NUMSUBJECT=0.5, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id k-ZQxgZjULqc for ; Tue, 6 Jun 2017 03:38:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id B206060F06 for ; Tue, 6 Jun 2017 03:38:24 +0000 (UTC) Received: (qmail 6243 invoked by uid 99); 6 Jun 2017 03:38:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jun 2017 03:38:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E470EE04AA; Tue, 6 Jun 2017 03:38:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dongeforever@apache.org To: commits@rocketmq.incubator.apache.org Date: Tue, 06 Jun 2017 03:39:05 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [45/51] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61 [ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61 Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/031347db Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/031347db Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/031347db Branch: refs/heads/master Commit: 031347db7314b511ea7356ac892001ac1349489e Parents: 16c8d43 Author: Jaskey Authored: Sat May 27 11:21:09 2017 +0800 Committer: dongeforever Committed: Tue Jun 6 11:37:29 2017 +0800 ---------------------------------------------------------------------- .../apache/rocketmq/common/BrokerConfig.java | 3 ++ .../org/apache/rocketmq/store/CommitLog.java | 40 ++++-------------- .../apache/rocketmq/store/PutMessageLock.java | 25 ++++++++++++ .../rocketmq/store/PutMessageReentrantLock.java | 37 +++++++++++++++++ .../rocketmq/store/PutMessageSpinLock.java | 43 ++++++++++++++++++++ .../store/config/MessageStoreConfig.java | 4 ++ 6 files changed, 119 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index f0a73bd..5bce013 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -47,6 +47,9 @@ public class BrokerConfig { private boolean autoCreateSubscriptionGroup = true; private String messageStorePlugIn = ""; + /** + * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1. + */ private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; private int adminBrokerThreadPoolNums = 16; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 7841feb..7b29263 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -63,12 +61,7 @@ public class CommitLog { private volatile long confirmOffset = -1L; private volatile long beginTimeInLock = 0; - - //true: Can lock, false : in lock. - private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); - - private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync - + private final PutMessageLock putMessageLock; public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); @@ -88,6 +81,8 @@ public class CommitLog { return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; + this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); + } public boolean load() { @@ -577,7 +572,7 @@ public class CommitLog { MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); - lockForPutMessage(); //spin... + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; @@ -626,7 +621,7 @@ public class CommitLog { eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { - releasePutMessageLock(); + putMessageLock.unlock(); } if (eclipseTimeInLock > 500) { @@ -861,7 +856,7 @@ public class CommitLog { } public boolean appendData(long startOffset, byte[] data) { - lockForPutMessage(); //spin... + putMessageLock.lock(); try { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); if (null == mappedFile) { @@ -871,7 +866,7 @@ public class CommitLog { return mappedFile.appendMessage(data); } finally { - releasePutMessageLock(); + putMessageLock.unlock(); } } @@ -906,28 +901,7 @@ public class CommitLog { return diff; } - /** - * Spin util acquired the lock. - */ - private void lockForPutMessage() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.lock(); - } else { - boolean flag; - do { - flag = this.putMessageSpinLock.compareAndSet(true, false); - } - while (!flag); - } - } - private void releasePutMessageLock() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.unlock(); - } else { - this.putMessageSpinLock.compareAndSet(false, true); - } - } public static class GroupCommitRequest { private final long nextOffset; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java new file mode 100644 index 0000000..a03e41a --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +/** + * Used when trying to put message + */ +public interface PutMessageLock { + void lock(); + void unlock(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java new file mode 100644 index 0000000..9198f1c --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.concurrent.locks.ReentrantLock; + +/** + * Exclusive lock implementation to put message + */ +public class PutMessageReentrantLock implements PutMessageLock { + private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync + + @Override + public void lock() { + putMessageNormalLock.lock(); + } + + @Override + public void unlock() { + putMessageNormalLock.unlock(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java new file mode 100644 index 0000000..baa809d --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Spin lock Implementation to put message, suggest using this witb low race conditions + * + */ +public class PutMessageSpinLock implements PutMessageLock { + //true: Can lock, false : in lock. + private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); + + @Override + public void lock() { + boolean flag; + do { + flag = this.putMessageSpinLock.compareAndSet(true, false); + } + while (!flag); + } + + @Override + public void unlock() { + this.putMessageSpinLock.compareAndSet(false, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 29f800c..19ed211 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -52,6 +52,10 @@ public class MessageStoreConfig { @ImportantField private int commitIntervalCommitLog = 200; + /** + * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.
+ * By default it is set to false indicating using spin lock when putting message. + */ private boolean useReentrantLockWhenPutMessage = false; // Whether schedule flush,default is real-time