storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [19/50] [abbrv] git commit: Rankings: make updateWith() thread-safe
Date Thu, 20 Mar 2014 21:22:42 GMT
Rankings: make updateWith() thread-safe

This fixes a ConcurrentModificationException that could be triggered when
using log4j to log Rankings#toString() from within a bolt's execute()
method.  Apparently a separate thread is used by log4j to perform the
logging, whose execution path could interweave with the bolt's own
thread.

Even though this is not necessarily a deficiency of the Rankings class
(which does not claim to be thread-safe), we still decided to make
Rankings#updateWith() thread-safe so that users do not run into errors
when using log4j & Co. for logging purposes.  In other words, we decided
to add this fix for the convenience of our users.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4c714d03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4c714d03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4c714d03

Branch: refs/heads/master
Commit: 4c714d03035fd6075392bd0df7577c68d993a366
Parents: 8e1e3cc
Author: Michael G. Noll <mnoll@verisign.com>
Authored: Thu Aug 29 12:36:48 2013 +0200
Committer: Michael G. Noll <mnoll@verisign.com>
Committed: Thu Aug 29 12:36:48 2013 +0200

----------------------------------------------------------------------
 m2-pom.xml                                     |  6 +++
 src/jvm/storm/starter/tools/Rankings.java      |  8 ++--
 test/jvm/storm/starter/tools/RankingsTest.java | 42 +++++++++++++++++++++
 3 files changed, 53 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4c714d03/m2-pom.xml
----------------------------------------------------------------------
diff --git a/m2-pom.xml b/m2-pom.xml
index ba5c760..24293bc 100644
--- a/m2-pom.xml
+++ b/m2-pom.xml
@@ -54,6 +54,12 @@
 			<scope>test</scope>
 		</dependency>
 		<dependency>
+			<groupId>org.jmock</groupId>
+			<artifactId>jmock</artifactId>
+			<version>2.6.0</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
 			<groupId>storm</groupId>
 			<artifactId>storm</artifactId>
 			<version>0.8.2</version>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4c714d03/src/jvm/storm/starter/tools/Rankings.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/Rankings.java b/src/jvm/storm/starter/tools/Rankings.java
index 4eee168..d6fd34c 100644
--- a/src/jvm/storm/starter/tools/Rankings.java
+++ b/src/jvm/storm/starter/tools/Rankings.java
@@ -50,9 +50,11 @@ public class Rankings implements Serializable {
     }
 
     public void updateWith(Rankable r) {
-        addOrReplace(r);
-        rerank();
-        shrinkRankingsIfNeeded();
+        synchronized(rankedItems) {
+            addOrReplace(r);
+            rerank();
+            shrinkRankingsIfNeeded();
+        }
     }
 
     private void addOrReplace(Rankable r) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4c714d03/test/jvm/storm/starter/tools/RankingsTest.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/tools/RankingsTest.java b/test/jvm/storm/starter/tools/RankingsTest.java
index 682b504..b95a86f 100644
--- a/test/jvm/storm/starter/tools/RankingsTest.java
+++ b/test/jvm/storm/starter/tools/RankingsTest.java
@@ -4,6 +4,9 @@ import static org.fest.assertions.api.Assertions.assertThat;
 
 import java.util.List;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import org.jmock.lib.concurrent.Blitzer;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -203,4 +206,43 @@ public class RankingsTest {
         // then
         assertThat(rankings.size()).isEqualTo(1);
     }
+
+    @Test
+    public void updatingWithNewRankablesShouldBeThreadSafe() throws InterruptedException
{
+        // given
+        final List<Rankable> entries = ImmutableList.of(A, B, C, D);
+        final Rankings rankings = new Rankings(entries.size());
+
+        // We are capturing exceptions thrown in Blitzer's child threads into this data structure
so that we can properly
+        // pass/fail this test.  The reason is that Blitzer doesn't report exceptions, which
is a known bug in Blitzer
+        // (JMOCK-263).  See https://github.com/jmock-developers/jmock-library/issues/22
for more information.
+        final List<Exception> exceptions = Lists.newArrayList();
+        Blitzer blitzer = new Blitzer(1000);
+
+        // when
+        blitzer.blitz(new Runnable() {
+            public void run() {
+                for (Rankable r : entries) {
+                    try {
+                        rankings.updateWith(r);
+                    }
+                    catch (RuntimeException e) {
+                        synchronized(exceptions) {
+                            exceptions.add(e);
+                        }
+                    }
+                }
+            }
+        });
+        blitzer.shutdown();
+
+        // then
+        //
+        if (!exceptions.isEmpty()) {
+            for (Exception e : exceptions) {
+                System.err.println(Throwables.getStackTraceAsString(e));
+            }
+        }
+        assertThat(exceptions).isEmpty();
+    }
 }


Mime
View raw message