rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [45/51] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61
Date Tue, 06 Jun 2017 03:39:05 GMT
[ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/031347db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/031347db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/031347db

Branch: refs/heads/master
Commit: 031347db7314b511ea7356ac892001ac1349489e
Parents: 16c8d43
Author: Jaskey <linjunjie1103@gmail.com>
Authored: Sat May 27 11:21:09 2017 +0800
Committer: dongeforever <zhendongliu92@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800

----------------------------------------------------------------------
 .../apache/rocketmq/common/BrokerConfig.java    |  3 ++
 .../org/apache/rocketmq/store/CommitLog.java    | 40 ++++--------------
 .../apache/rocketmq/store/PutMessageLock.java   | 25 ++++++++++++
 .../rocketmq/store/PutMessageReentrantLock.java | 37 +++++++++++++++++
 .../rocketmq/store/PutMessageSpinLock.java      | 43 ++++++++++++++++++++
 .../store/config/MessageStoreConfig.java        |  4 ++
 6 files changed, 119 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f0a73bd..5bce013 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -47,6 +47,9 @@ public class BrokerConfig {
     private boolean autoCreateSubscriptionGroup = true;
     private String messageStorePlugIn = "";
 
+    /**
+     * thread numbers for send message thread pool, since spin lock will be used by default
since 4.0.x, the default value is 1.
+     */
     private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors()
* 4;
     private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors()
* 2;
     private int adminBrokerThreadPoolNums = 16;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 7841feb..7b29263 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -63,12 +61,7 @@ public class CommitLog {
     private volatile long confirmOffset = -1L;
 
     private volatile long beginTimeInLock = 0;
-
-    //true: Can lock, false : in lock.
-    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
-
-    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
-
+    private final PutMessageLock putMessageLock;
     public CommitLog(final DefaultMessageStore defaultMessageStore) {
         this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
             defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
@@ -88,6 +81,8 @@ public class CommitLog {
                 return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
             }
         };
+        this.putMessageLock =  defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()
? new PutMessageReentrantLock() : new PutMessageSpinLock();
+
     }
 
     public boolean load() {
@@ -577,7 +572,7 @@ public class CommitLog {
         MappedFile unlockMappedFile = null;
         MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 
-        lockForPutMessage(); //spin...
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
         try {
             long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
             this.beginTimeInLock = beginLockTimestamp;
@@ -626,7 +621,7 @@ public class CommitLog {
             eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
             beginTimeInLock = 0;
         } finally {
-            releasePutMessageLock();
+            putMessageLock.unlock();
         }
 
         if (eclipseTimeInLock > 500) {
@@ -861,7 +856,7 @@ public class CommitLog {
     }
 
     public boolean appendData(long startOffset, byte[] data) {
-        lockForPutMessage(); //spin...
+        putMessageLock.lock();
         try {
             MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
             if (null == mappedFile) {
@@ -871,7 +866,7 @@ public class CommitLog {
 
             return mappedFile.appendMessage(data);
         } finally {
-            releasePutMessageLock();
+            putMessageLock.unlock();
         }
     }
 
@@ -906,28 +901,7 @@ public class CommitLog {
         return diff;
     }
 
-    /**
-     * Spin util acquired the lock.
-     */
-    private void lockForPutMessage() {
-        if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage())
{
-            putMessageNormalLock.lock();
-        } else {
-            boolean flag;
-            do {
-                flag = this.putMessageSpinLock.compareAndSet(true, false);
-            }
-            while (!flag);
-        }
-    }
 
-    private void releasePutMessageLock() {
-        if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage())
{
-            putMessageNormalLock.unlock();
-        } else {
-            this.putMessageSpinLock.compareAndSet(false, true);
-        }
-    }
 
     public static class GroupCommitRequest {
         private final long nextOffset;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
new file mode 100644
index 0000000..a03e41a
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+/**
+ * Used when trying to put message
+ */
+public interface PutMessageLock {
+    void lock();
+    void unlock();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
new file mode 100644
index 0000000..9198f1c
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Exclusive lock implementation to put message
+ */
+public class PutMessageReentrantLock implements PutMessageLock {
+    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
+
+    @Override
+    public void lock() {
+        putMessageNormalLock.lock();
+    }
+
+    @Override
+    public void unlock() {
+        putMessageNormalLock.unlock();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
new file mode 100644
index 0000000..baa809d
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Spin lock Implementation to put message, suggest using this witb low race conditions
+ *
+ */
+public class PutMessageSpinLock implements PutMessageLock {
+    //true: Can lock, false : in lock.
+    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+    @Override
+    public void lock() {
+        boolean flag;
+        do {
+            flag = this.putMessageSpinLock.compareAndSet(true, false);
+        }
+        while (!flag);
+    }
+
+    @Override
+    public void unlock() {
+        this.putMessageSpinLock.compareAndSet(false, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 29f800c..19ed211 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -52,6 +52,10 @@ public class MessageStoreConfig {
     @ImportantField
     private int commitIntervalCommitLog = 200;
 
+    /**
+     * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting
message.<br/>
+     * By default it is set to false indicating using spin lock when putting message.
+     */
     private boolean useReentrantLockWhenPutMessage = false;
 
     // Whether schedule flush,default is real-time


Mime
View raw message