storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [2/7] storm git commit: STORM-1289: Port integration-test.clj to Java
Date Sat, 05 Jan 2019 15:02:52 GMT
STORM-1289: Port integration-test.clj to Java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d6a1e730
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d6a1e730
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d6a1e730

Branch: refs/heads/master
Commit: d6a1e730ea200b36c42475f9ce45d11ee940c949
Parents: a9c2f3a
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Fri Dec 7 22:07:36 2018 +0100
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Wed Dec 19 16:48:03 2018 +0100

----------------------------------------------------------------------
 DEVELOPER.md                                    |    2 +-
 examples/storm-starter/pom.xml                  |    8 +
 external/storm-elasticsearch/pom.xml            |   11 +
 .../bolt/AbstractEsBoltIntegrationTest.java     |   19 +-
 .../elasticsearch/bolt/AbstractEsBoltTest.java  |   16 +-
 .../elasticsearch/bolt/EsIndexBoltTest.java     |    5 +-
 .../bolt/EsLookupBoltIntegrationTest.java       |   15 +-
 .../elasticsearch/bolt/EsLookupBoltTest.java    |   23 +-
 .../elasticsearch/bolt/EsPercolateBoltTest.java |    9 +-
 .../elasticsearch/common/EsConfigTest.java      |    7 +-
 .../trident/EsStateFactoryTest.java             |    2 +-
 .../elasticsearch/trident/EsStateTest.java      |   24 +-
 external/storm-mqtt/pom.xml                     |    7 +
 .../storm/mqtt/StormMqttIntegrationTest.java    |   13 +-
 integration-test/pom.xml                        |    2 +
 pom.xml                                         |   40 +-
 .../apache/storm/testing/IntegrationTest.java   |   32 -
 .../apache/storm/testing/PerformanceTest.java   |   37 -
 .../apache/storm/testing/IntegrationTest.java   |   43 +
 .../apache/storm/testing/PerformanceTest.java   |   48 +
 .../apache/storm/utils/PredicateMatcher.java    |   49 +
 .../utils/ThrowableNestedCauseMatcher.java      |   10 +-
 storm-core/pom.xml                              |   13 +-
 .../org/apache/storm/integration_test.clj       |  651 -----------
 .../org/apache/storm/testing4j_test.clj         |   83 +-
 .../apache/storm/TopologyIntegrationTest.java   | 1011 ++++++++++++++++++
 storm-server/pom.xml                            |    7 +
 .../test/java/org/apache/storm/TestingTest.java |   16 +-
 .../resource/TestResourceAwareScheduler.java    |   32 +-
 29 files changed, 1380 insertions(+), 855 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/DEVELOPER.md
----------------------------------------------------------------------
diff --git a/DEVELOPER.md b/DEVELOPER.md
index 20ac51f..2145c92 100644
--- a/DEVELOPER.md
+++ b/DEVELOPER.md
@@ -123,7 +123,7 @@ GitHub.
 
 Unit tests and Integration tests are an essential part of code contributions.
 
-To mark a Java test as a Java integration test, add the annotation `@Category(IntegrationTest.class)` to the test class definition as well as to its hierarchy of superclasses. Java integration tests can be in the same package as Java unit tests.
+To mark a Java test as a Java integration test, add the annotation `@IntegrationTest` to the test class definition or test method. Make sure the test is a JUnit 5 test. Java integration tests can be in the same package as Java unit tests.
  
 ```java
     @Category(IntegrationTest.class)

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index c2ca52f..2841e93 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -217,6 +217,14 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-surefire-plugin</artifactId>
+          <configuration>
+              <!-- TestNG provider does not understand the JUnit 5 groups syntax used in the rest of the project, make sure to override it -->
+              <excludedGroups>none</excludedGroups>
+          </configuration>
+      </plugin>
         <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>exec-maven-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index 3df78cc..bbc9bf0 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -90,6 +90,10 @@
             <artifactId>mockito-core</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>java-hamcrest</artifactId>
         </dependency>
@@ -100,6 +104,13 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-client</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>18.0</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
index 559dedf..afb75db 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
@@ -21,34 +21,33 @@ import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.testing.IntegrationTest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.node.Node;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 
-@Category(IntegrationTest.class)
+@IntegrationTest
 public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> extends AbstractEsBoltTest<Bolt> {
 
     protected static Node node;
 
-    @BeforeClass
+    @BeforeAll
     public static void startElasticSearchNode() throws Exception {
         node = EsTestUtil.startEsNode();
         EsTestUtil.ensureEsGreen(node);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeElasticSearchNode() throws Exception {
         EsTestUtil.stopEsNode(node);
     }
 
-    @Before
+    @BeforeEach
     public void createIndex() {
         node.client().admin().indices().create(new CreateIndexRequest(index)).actionGet();
     }
 
-    @After
+    @AfterEach
     public void clearIndex() throws Exception {
         EsTestUtil.clearIndex(node, index);
         EsTestUtil.clearIndex(node, "missing");

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index d6ef979..5944831 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -25,14 +25,14 @@ import java.util.UUID;
 import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.elasticsearch.common.EsConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
 
-@RunWith(MockitoJUnitRunner.class)
+@ExtendWith(MockitoExtension.class)
 public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
 
     protected static Config config = new Config();
@@ -46,7 +46,7 @@ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
 
     protected Bolt bolt;
 
-    @Before
+    @BeforeEach
     public void createBolt() throws Exception {
         bolt = createBolt(esConfig());
         bolt.prepare(config, null, outputCollector);
@@ -58,7 +58,7 @@ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
         return new EsConfig();
     }
 
-    @After
+    @AfterEach
     public void cleanupBolt() throws Exception {
         bolt.cleanup();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index ebed80e..42ff2ce 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -21,15 +21,12 @@ import static org.mockito.Mockito.verify;
 
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.apache.storm.testing.IntegrationTest;
 import org.apache.storm.tuple.Tuple;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Test;
 
-@Category(IntegrationTest.class)
 public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> {
 
     @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
index c266069..b5311a9 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
@@ -25,20 +25,17 @@ import static org.mockito.Mockito.verify;
 
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.apache.storm.testing.IntegrationTest;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.elasticsearch.client.ResponseException;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
-import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
 
-@Category(IntegrationTest.class)
-@RunWith(MockitoJUnitRunner.class)
+@ExtendWith(MockitoExtension.class)
 public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<EsLookupBolt> {
 
     @Captor
@@ -54,7 +51,7 @@ public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<E
         return new EsLookupBolt(esConfig);
     }
 
-    @Before
+    @BeforeEach
     public void populateIndexWithTestData() throws Exception {
         node.client().prepareIndex(index, type, documentId).setSource(source).execute().actionGet();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
index 786bab1..7613a2d 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
@@ -19,13 +19,11 @@ package org.apache.storm.elasticsearch.bolt;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -37,20 +35,21 @@ import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
 import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
 
-@RunWith(MockitoJUnitRunner.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.WARN)
 public class EsLookupBoltTest extends AbstractEsBoltTest<EsLookupBolt> {
 
     static final Map<String, String> params = new HashMap<>();
@@ -82,12 +81,12 @@ public class EsLookupBoltTest extends AbstractEsBoltTest<EsLookupBolt> {
         return new EsLookupBolt(esConfig, tupleMapper, output);
     }
 
-    @After
+    @AfterEach
     public void replaceClientWithOriginal() throws Exception {
         EsLookupBolt.replaceClient(originalClient);
     }
 
-    @Before
+    @BeforeEach
     public void configureBoltDependencies() throws Exception {
         when(tupleMapper.getIndex(tuple)).thenReturn(index);
         when(tupleMapper.getType(tuple)).thenReturn(type);

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index 135089e..c8f135f 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -27,16 +27,13 @@ import static org.mockito.Mockito.verify;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.elasticsearch.response.PercolateResponse;
-import org.apache.storm.testing.IntegrationTest;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.elasticsearch.client.ResponseException;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
-@Category(IntegrationTest.class)
 public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercolateBolt> {
 
     private final String source = "{\"user\":\"user1\"}";
@@ -46,7 +43,7 @@ public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercola
         return new EsPercolateBolt(esConfig);
     }
 
-    @Before
+    @BeforeEach
     public void populateIndexWithTestData() throws Exception {
         node.client().prepareIndex(index, ".percolator", documentId)
             .setSource("{\"query\":{\"match\":" + source + "}}")

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
index 96b4ffd..23f00ca 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
@@ -20,15 +20,16 @@ package org.apache.storm.elasticsearch.common;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.http.HttpHost;
-import org.junit.Test;
 
 import com.google.common.testing.NullPointerTester;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
 public class EsConfigTest {
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void urlsCannotBeEmpty() throws Exception {
-        new EsConfig(new String[] {});
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new EsConfig(new String[] {}));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
index cf296a8..a1b4fe8 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
@@ -20,7 +20,7 @@ package org.apache.storm.elasticsearch.trident;
 import com.google.common.testing.NullPointerTester;
 
 import org.apache.storm.elasticsearch.common.EsConfig;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class EsStateFactoryTest {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
index e5474c6..bcabe9e 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java
@@ -25,19 +25,17 @@ import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.testing.IntegrationTest;
-import org.apache.storm.topology.FailedException;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.elasticsearch.node.Node;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.mockito.runners.MockitoJUnitRunner;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
 
-@Category(IntegrationTest.class)
-@RunWith(MockitoJUnitRunner.class)
+@IntegrationTest
+@ExtendWith(MockitoExtension.class)
 public class EsStateTest {
     
     private static Node node;
@@ -49,18 +47,18 @@ public class EsStateTest {
 
     private EsState state = createEsState();
     
-    @BeforeClass
+    @BeforeAll
     public static void startElasticSearchNode() throws Exception {
         node = EsTestUtil.startEsNode();
         EsTestUtil.ensureEsGreen(node);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeElasticSearchNode() throws Exception {
         EsTestUtil.stopEsNode(node);
     }
     
-    @After
+    @AfterEach
     public void clearIndex() throws Exception {
         EsTestUtil.clearIndex(node, index);
         EsTestUtil.clearIndex(node, "missing");

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml
index 282a07c..3a9e5a9 100644
--- a/external/storm-mqtt/pom.xml
+++ b/external/storm-mqtt/pom.xml
@@ -80,6 +80,13 @@
             <artifactId>commons-lang</artifactId>
             <version>2.5</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-client</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
index ba214eb..f1e0a23 100644
--- a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
+++ b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
@@ -33,15 +33,14 @@ import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Category(IntegrationTest.class)
+@IntegrationTest
 public class StormMqttIntegrationTest implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(StormMqttIntegrationTest.class);
     private static final String TEST_TOPIC = "/mqtt-topology";
@@ -50,12 +49,12 @@ public class StormMqttIntegrationTest implements Serializable {
     static boolean spoutActivated = false;
     private static BrokerService broker;
 
-    @AfterClass
+    @AfterAll
     public static void cleanup() throws Exception {
         broker.stop();
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void start() throws Exception {
         LOG.warn("Starting broker...");
         broker = new BrokerService();

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 3753498..6d6de9d 100755
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -107,6 +107,8 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
                 <configuration>
+                    <!-- TestNG provider does not understand the JUnit 5 groups syntax used in the rest of the project, make sure to override it -->
+                    <excludedGroups>none</excludedGroups>
                     <skipTests>${skipTests}</skipTests>
                     <redirectTestOutputToFile>${redirectTestOutputToFile}</redirectTestOutputToFile>
                     <argLine>-Xmx1024m</argLine>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b44bb20..6f9e053 100644
--- a/pom.xml
+++ b/pom.xml
@@ -285,8 +285,8 @@
         <servlet.version>3.1.0</servlet.version>
         <joda-time.version>2.3</joda-time.version>
         <thrift.version>0.11.0</thrift.version>
-        <junit.jupiter.version>5.2.0</junit.jupiter.version>
-        <surefire.version>2.22.0</surefire.version>
+        <junit.jupiter.version>5.3.2</junit.jupiter.version>
+        <surefire.version>2.22.1</surefire.version>
         <awaitility.version>3.1.0</awaitility.version>
         <hdrhistogram.version>2.1.10</hdrhistogram.version>
         <hamcrest.version>2.0.0.0</hamcrest.version>
@@ -306,11 +306,10 @@
         <storm.kafka.client.version>0.11.0.0</storm.kafka.client.version>
 
         <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
-        <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest, org.apache.storm.testing.PerformanceTest</java.unit.test.exclude>
-        <java.unit.test.include>**/Test*.java, **/*Test.java, **/*TestCase.java</java.unit.test.include>    <!--maven surefire plugin default test list-->
-        <java.integration.test.include>no.tests</java.integration.test.include>
+        <java.unit.test.exclude.groups>PerformanceTest</java.unit.test.exclude.groups>
         <!-- by default the clojure test set are all clojure tests that are not integration tests. This property is overridden in the profiles -->
         <clojure.test.set>!integration.*</clojure.test.set>
+        <skipITs>true</skipITs>
 
         <aetherVersion>1.1.0</aetherVersion>
         <mavenVersion>3.1.0</mavenVersion>
@@ -570,29 +569,41 @@
         <profile>
             <id>all-tests</id>
             <properties>
-                <java.integration.test.include>**/*.java</java.integration.test.include>
                 <!-- add perf tests back in -->
-                <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
+                <java.unit.test.exclude.groups>nothing</java.unit.test.exclude.groups>
                 <clojure.test.set>*.*</clojure.test.set>
+                <skipITs>false</skipITs>
             </properties>
         </profile>
         <profile>
             <id>performance-tests</id>
             <properties>
                 <!-- add perf tests back in -->
-                <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
+                <java.unit.test.exclude.groups>nothing</java.unit.test.exclude.groups>
             </properties>
         </profile> 
         <profile>
             <id>integration-tests-only</id>
             <properties>
                 <!--Java-->
-                <java.unit.test.include>no.unit.tests</java.unit.test.include>
-                <java.integration.test.include>**/*.java</java.integration.test.include>
+                <skipITs>false</skipITs>
                 <!--Clojure-->
                 <clojure.test.set>integration.*</clojure.test.set>
                 <clojure.test.declared.namespace.only>true</clojure.test.declared.namespace.only>
             </properties>
+            <build>
+                <pluginManagement>
+                    <plugins>
+                        <plugin>
+                            <groupId>org.apache.maven.plugins</groupId>
+                            <artifactId>maven-surefire-plugin</artifactId>
+                            <configuration>
+                                <skipTests>true</skipTests>
+                            </configuration>
+                        </plugin>
+                    </plugins>
+                </pluginManagement>
+            </build>
         </profile>
         <profile>
             <id>externals</id>
@@ -1098,10 +1109,7 @@
                     <version>${surefire.version}</version>
                     <configuration>
                         <redirectTestOutputToFile>true</redirectTestOutputToFile>
-                        <excludedGroups>${java.unit.test.exclude}</excludedGroups>
-                        <includes>
-                            <include>${java.unit.test.include}</include>
-                        </includes>
+                        <excludedGroups>IntegrationTest | ${java.unit.test.exclude.groups}</excludedGroups>
                         <argLine>-Xmx3g -XX:+HeapDumpOnOutOfMemoryError</argLine>
                         <trimStackTrace>false</trimStackTrace>
                         <forkCount>1.0C</forkCount>
@@ -1118,9 +1126,9 @@
                     <configuration>
                         <redirectTestOutputToFile>true</redirectTestOutputToFile>
                         <includes>
-                            <include>${java.integration.test.include}</include>
+                            <include>**/*.java</include>
                         </includes>
-                        <groups>org.apache.storm.testing.IntegrationTest</groups>
+                        <groups>IntegrationTest</groups>
                         <argLine>-Xmx1536m</argLine>
                     </configuration>
                     <executions>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-client/src/jvm/org/apache/storm/testing/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/IntegrationTest.java b/storm-client/src/jvm/org/apache/storm/testing/IntegrationTest.java
deleted file mode 100644
index 94bab9f..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/IntegrationTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-/**
- * Marker interface used to mark integration tests. Integration tests will be run during the Maven
- * <b><i>integration-test</i></b> phase, whereas unit tests will be run during the Maven <b><i>test</i></b> phase.
- * <p/>
- * Integration tests can be in the same package as unit tests. To mark a test as integration test, add the annotation
- *
- * @Category(IntegrationTest.class) to the class definition as well as to its hierarchy of superclasses. For example:
- *     <p/>
- * @ Category(IntegrationTest.class)<br/> public class MyIntegrationTest {<br/> ...<br/> }
- */
-public interface IntegrationTest {
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
deleted file mode 100644
index c5eba2f..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-/**
- * Marker interface used to mark performance tests. Performance tests will be run if the profile performance-tests or all-tests are enabled.
- * <p/>
- * Performance tests can be in the same package as unit tests. To mark a test as a performance test,
- * add the annotation @Category(PerformanceTest.class) to the class definition as well as to its hierarchy of superclasses.
- * For example:
- * <p/>
- * @ Category(PerformanceTest.class)<br/>
- * public class MyPerformanceTest {<br/>
- *  ...<br/>
- * }
- * <p/>
- *  In general performance tests should have a time limit on them, but the time limit should be liberal enough to account
- *  for running on CI systems like travis ci, or the apache jenkins build.
- */
-public interface PerformanceTest {
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-client/test/jvm/org/apache/storm/testing/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/testing/IntegrationTest.java b/storm-client/test/jvm/org/apache/storm/testing/IntegrationTest.java
new file mode 100644
index 0000000..6ecb574
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/testing/IntegrationTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.junit.jupiter.api.Tag;
+
+/**
+ * Annotation to mark integration tests. Integration tests will be run during the Maven
+ * <b><i>integration-test</i></b> phase, whereas unit tests will be run during the Maven <b><i>test</i></b> phase.
+ * <p/>
+ * Integration tests can be in the same package as unit tests. To mark a test as integration test, add the annotation
+ *
+ * {@literal @}IntegrationTest to the class definition, or to any methods you want to run during the integration test phase. For example:
+ *     <p/>
+ * {@literal @}IntegrationTest <br/> public class MyIntegrationTest {<br/> ...<br/> }
+ */
+@Target({ ElementType.TYPE, ElementType.METHOD })
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+@Tag("IntegrationTest")
+public @interface IntegrationTest {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-client/test/jvm/org/apache/storm/testing/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/testing/PerformanceTest.java b/storm-client/test/jvm/org/apache/storm/testing/PerformanceTest.java
new file mode 100644
index 0000000..6e25018
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/testing/PerformanceTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.junit.jupiter.api.Tag;
+
+/**
+ * Annotation to mark performance tests. Performance tests will be run if the profile performance-tests or all-tests are enabled.
+ * <p/>
+ * Performance tests can be in the same package as unit tests. To mark a test as a performance test,
+ * add the annotation @PerformanceTest to the class definition.
+ * For example:
+ * <p/>
+ * {@literal @}PerformanceTest<br/>
+ * public class MyPerformanceTest {<br/>
+ *  ...<br/>
+ * }
+ * <p/>
+ *  In general performance tests should have a time limit on them, but the time limit should be liberal enough to account
+ *  for running on CI systems like travis ci, or the apache jenkins build.
+ */
+@Target({ ElementType.TYPE, ElementType.METHOD })
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+@Tag("PerformanceTest")
+public @interface PerformanceTest {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-client/test/jvm/org/apache/storm/utils/PredicateMatcher.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/PredicateMatcher.java b/storm-client/test/jvm/org/apache/storm/utils/PredicateMatcher.java
new file mode 100644
index 0000000..9621888
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/utils/PredicateMatcher.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.util.function.Predicate;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+public class PredicateMatcher<T> extends TypeSafeMatcher<T> {
+
+    private final Predicate<T> predicate;
+    
+    private PredicateMatcher(Predicate<T> predicate) {
+        this.predicate = predicate;
+    }
+    
+    public static <T> PredicateMatcher<T> matchesPredicate(Predicate<T> predicate) {
+        return new PredicateMatcher<>(predicate);
+    }
+
+    @Override
+    protected boolean matchesSafely(T item) {
+        return predicate.test(item);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendText("matches predicate ").appendValue(predicate);
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description mismatchDescription) {
+        mismatchDescription.appendText("failed to match predicate ").appendValue(item);
+    }  
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-client/test/jvm/org/apache/storm/utils/ThrowableNestedCauseMatcher.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/ThrowableNestedCauseMatcher.java b/storm-client/test/jvm/org/apache/storm/utils/ThrowableNestedCauseMatcher.java
index d7a0d5d..7e15d30 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/ThrowableNestedCauseMatcher.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/ThrowableNestedCauseMatcher.java
@@ -18,12 +18,12 @@
 
 package org.apache.storm.utils;
 
-import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
 
-public class ThrowableNestedCauseMatcher extends BaseMatcher<Throwable> {
+public class ThrowableNestedCauseMatcher extends TypeSafeMatcher<Throwable> {
 
-    private Class<? extends Throwable> exceptionCause;
+    private final Class<? extends Throwable> exceptionCause;
 
     public ThrowableNestedCauseMatcher(Class<? extends Throwable> exceptionCause) {
         this.exceptionCause = exceptionCause;
@@ -34,8 +34,8 @@ public class ThrowableNestedCauseMatcher extends BaseMatcher<Throwable> {
     }
 
     @Override
-    public boolean matches(Object throwable) {
-        return Utils.exceptionCauseIsInstanceOf(exceptionCause, (Throwable) throwable);
+    protected boolean matchesSafely(Throwable throwable) {
+        return Utils.exceptionCauseIsInstanceOf(exceptionCause, throwable);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 2977d06..4121f89 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -113,8 +113,6 @@
             <artifactId>log4j-slf4j-impl</artifactId>
         </dependency>
 
-
-
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
@@ -125,6 +123,17 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-client</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
deleted file mode 100644
index 4257d23..0000000
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ /dev/null
@@ -1,651 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns integration.org.apache.storm.integration-test
-  (:use [clojure test])
-  (:import [org.apache.storm Testing LocalCluster$Builder Thrift])
-  (:import [org.apache.storm.topology TopologyBuilder])
-  (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
-  (:import [org.apache.storm.testing TrackedTopology MockedSources TestWordCounter TestWordSpout TestGlobalCount FeederSpout CompleteTopologyParam
-            TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
-  (:import [org.apache.storm.utils Time])
-  (:import [org.apache.storm.tuple Fields])
-  (:use [org.apache.storm clojure])
-  (:use [org.apache.storm config util])
-  (:import [org.apache.storm Thrift])
-  (:import [org.apache.storm.utils Utils]))
-
-(deftest test-basic-topology
-  (doseq [zmq-on? [true false]]
-    (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                  (.withSimulatedTime)
-                                  (.withSupervisors 4)
-                                  (.withDaemonConf {STORM-LOCAL-MODE-ZMQ zmq-on?})))]
-      (let [topology (Thrift/buildTopology
-                      {"1" (Thrift/prepareSpoutDetails
-                             (TestWordSpout. true) (Integer. 3))}
-                      {"2" (Thrift/prepareBoltDetails
-                             {(Utils/getGlobalStreamId "1" nil)
-                              (Thrift/prepareFieldsGrouping ["word"])}
-                             (TestWordCounter.) (Integer. 4))
-                       "3" (Thrift/prepareBoltDetails
-                             {(Utils/getGlobalStreamId "1" nil)
-                              (Thrift/prepareGlobalGrouping)}
-                             (TestGlobalCount.))
-                       "4" (Thrift/prepareBoltDetails
-                             {(Utils/getGlobalStreamId "2" nil)
-                              (Thrift/prepareGlobalGrouping)}
-                             (TestAggregatesCounter.))})
-            results (Testing/completeTopology cluster
-                                       topology
-                                       (doto (CompleteTopologyParam.)
-                                         (.setMockedSources (MockedSources. {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}))
-                                         (.setStormConf {TOPOLOGY-WORKERS 2
-                                                    TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE true})))]
-        (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
-                 (Testing/readTuples results "1")))
-        (is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
-                 (Testing/readTuples results "2")))
-        (is (= [[1] [2] [3] [4]]
-               (Testing/readTuples results "3")))
-        (is (= [[1] [2] [3] [4]]
-               (Testing/readTuples results "4")))
-        ))))
-
-(defbolt emit-task-id ["tid"] {:prepare true}
-  [conf context collector]
-  (let [tid (.getThisTaskIndex context)]
-    (bolt
-      (execute [tuple]
-        (emit-bolt! collector [tid] :anchor tuple)
-        (ack! collector tuple)
-        ))))
-
-(deftest test-multi-tasks-per-executor
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                  (.withSimulatedTime)
-                                (.withSupervisors 4)))]
-    (let [topology (Thrift/buildTopology
-                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true))}
-                    {"2" (Thrift/prepareBoltDetails
-                           {(Utils/getGlobalStreamId "1" nil)
-                            (Thrift/prepareAllGrouping)}
-                           emit-task-id
-                           (Integer. 3)
-                           {TOPOLOGY-TASKS 6})})
-          results (Testing/completeTopology cluster
-                                     topology
-                                     (doto (CompleteTopologyParam.)
-                                       (.setMockedSources (MockedSources. {"1" [["a"]]}))))]
-      (is (Testing/multiseteq [[(int 0)] [(int 1)] [(int 2)] [(int 3)] [(int 4)] [(int 5)]]
-               (Testing/readTuples results "2")))
-      )))
-
-(defbolt ack-every-other {} {:prepare true}
-  [conf context collector]
-  (let [state (atom -1)]
-    (bolt
-      (execute [tuple]
-        (let [val (swap! state -)]
-          (when (pos? val)
-            (ack! collector tuple)
-            ))))))
-
-(defn assert-loop 
-([afn ids] (assert-loop afn ids 10))
-([afn ids timeout-secs]
-  (loop [remaining-time (* timeout-secs 1000)]
-    (let [start-time (System/currentTimeMillis)
-          assertion-is-true (every? afn ids)]
-      (if (or assertion-is-true (neg? remaining-time))
-        (is assertion-is-true)
-        (do
-          (Thread/sleep 1)
-          (recur (- remaining-time (- (System/currentTimeMillis) start-time)))
-        ))))))
-
-(defn assert-acked [tracker & ids]
-  (assert-loop #(.isAcked tracker %) ids))
-
-(defn assert-failed [tracker & ids]
-  (assert-loop #(.isFailed tracker %) ids))
-
-(deftest test-timeout
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withSupervisors 4)
-                                (.withDaemonConf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
-    (let [feeder (FeederSpout. ["field1"])
-          tracker (AckFailMapTracker.)
-          _ (.setAckFailDelegate feeder tracker)
-          topology (Thrift/buildTopology
-                     {"1" (Thrift/prepareSpoutDetails feeder)}
-                     {"2" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "1" nil)
-                             (Thrift/prepareGlobalGrouping)} ack-every-other)})]
-      (.submitTopology cluster
-                             "timeout-tester"
-                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
-                             topology)
-      (.advanceClusterTime cluster 11)
-      (.feed feeder ["a"] 1)
-      (.feed feeder ["b"] 2)
-      (.feed feeder ["c"] 3)
-      (.advanceClusterTime cluster 9)
-      (assert-acked tracker 1 3)
-      (is (not (.isFailed tracker 2)))
-      (.advanceClusterTime cluster 12)
-      (assert-failed tracker 2)
-      )))
-
-(defbolt reset-timeout-bolt {} {:prepare true}
-  [conf context collector]
-  (let [tuple-counter (atom 1)
-        first-tuple (atom nil)]
-    (bolt
-      (execute [tuple]
-        (do 
-          (condp = @tuple-counter
-            1 (reset! first-tuple tuple)
-            2 (reset-timeout! collector @first-tuple)
-            5 (do 
-                (ack! collector @first-tuple)
-                (ack! collector tuple)
-              )
-            (do 
-              (reset-timeout! collector @first-tuple)
-              (ack! collector tuple)))
-          (swap! tuple-counter inc)
-        )))))
-
-(deftest test-reset-timeout
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withDaemonConf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
-    (let [feeder (FeederSpout. ["field1"])
-          tracker (AckFailMapTracker.)
-          _ (.setAckFailDelegate feeder tracker)
-          topology (Thrift/buildTopology
-                     {"1" (Thrift/prepareSpoutDetails feeder)}
-                     {"2" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "1" nil)
-                             (Thrift/prepareGlobalGrouping)} reset-timeout-bolt)})]
-    (.submitTopology cluster
-                           "timeout-tester"
-                           {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
-                           topology)
-    ;The first tuple will be used to check timeout reset
-    (.feed feeder ["a"] 1)
-    ;The second tuple is used to wait for the spout to rotate the spout's pending map
-    (.feed feeder ["b"] 2)
-    (.advanceClusterTime cluster 9)
-    ;The other tuples are used to reset the first tuple's timeout,
-    ;and to wait for the message to get through to the spout (acks use the same path as timeout resets)
-    (.feed feeder ["c"] 3)
-    (assert-acked tracker 3)
-    (.advanceClusterTime cluster 9)
-    (.feed feeder ["d"], 4)
-    (assert-acked tracker 4)
-    (.advanceClusterTime cluster 2)
-    ;The time is now twice the message timeout, the second tuple should expire since it was not acked
-    ;Waiting for this also ensures that the first tuple gets failed if reset-timeout doesn't work
-    (assert-failed tracker 2)
-    ;Put in a tuple to cause the first tuple to be acked
-    (.feed feeder ["e"], 5)
-    (assert-acked tracker 5)
-    ;The first tuple should be acked and should not have failed
-    (is (not (.isFailed tracker 1)))
-    (is (.isAcked tracker 1))
-    )))
-
-(defn mk-validate-topology-1 []
-  (Thrift/buildTopology
-                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
-                    {"2" (Thrift/prepareBoltDetails
-                           {(Utils/getGlobalStreamId "1" nil)
-                            (Thrift/prepareFieldsGrouping ["word"])}
-                           (TestWordCounter.) (Integer. 4))}))
-
-(defn mk-invalidate-topology-1 []
-  (Thrift/buildTopology
-                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
-                    {"2" (Thrift/prepareBoltDetails
-                           {(Utils/getGlobalStreamId "3" nil)
-                            (Thrift/prepareFieldsGrouping ["word"])}
-                           (TestWordCounter.) (Integer. 4))}))
-
-(defn mk-invalidate-topology-2 []
-  (Thrift/buildTopology
-                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
-                    {"2" (Thrift/prepareBoltDetails
-                           {(Utils/getGlobalStreamId "1" nil)
-                            (Thrift/prepareFieldsGrouping ["non-exists-field"])}
-                           (TestWordCounter.) (Integer. 4))}))
-
-(defn mk-invalidate-topology-3 []
-  (Thrift/buildTopology
-                    {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
-                    {"2" (Thrift/prepareBoltDetails
-                           {(Utils/getGlobalStreamId "1" "non-exists-stream")
-                            (Thrift/prepareFieldsGrouping ["word"])}
-                           (TestWordCounter.) (Integer. 4))}))
-
-(defn try-complete-wc-topology [cluster topology]
-  (try (do
-         (Testing/completeTopology cluster
-                            topology
-                            (doto (CompleteTopologyParam.)
-                              (.setMockedSources (MockedSources. {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}))
-                              (.setStormConf {TOPOLOGY-WORKERS 2})))
-         false)
-       (catch InvalidTopologyException e true)))
-
-(deftest test-validate-topology-structure
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSupervisors 4)
-                                (.withSimulatedTime)))]
-    (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
-          any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
-          any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
-          any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))]
-      (is (= any-error1? false))
-      (is (= any-error2? true))
-      (is (= any-error3? true))
-      (is (= any-error4? true)))))
-
-(defbolt identity-bolt ["num"]
-  [tuple collector]
-  (emit-bolt! collector (.getValues tuple) :anchor tuple)
-  (ack! collector tuple))
-
-(deftest test-system-stream
-  ;; this test works because mocking a spout splits up the tuples evenly among the tasks
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)))]
-      (let [topology (Thrift/buildTopology
-                       {"1" (Thrift/prepareSpoutDetails
-                              (TestWordSpout. true) (Integer. 3))}
-                       {"2" (Thrift/prepareBoltDetails
-                              {(Utils/getGlobalStreamId "1" nil)
-                               (Thrift/prepareFieldsGrouping ["word"])
-                               (Utils/getGlobalStreamId "1" "__system")
-                               (Thrift/prepareGlobalGrouping)}
-                               identity-bolt (Integer. 1))})
-            results (Testing/completeTopology cluster
-                                       topology
-                                       (doto (CompleteTopologyParam.)
-                                         (.setMockedSources (MockedSources. {"1" [["a"] ["b"] ["c"]]}))
-                                         (.setStormConf {TOPOLOGY-WORKERS 2})))]
-        (is (Testing/multiseteq [["a"] ["b"] ["c"] ]
-                 (Testing/readTuples results "2")))
-        )))
-
-(defn ack-tracking-feeder [fields]
-  (let [tracker (AckTracker.)]
-    [(doto (FeederSpout. fields)
-       (.setAckFailDelegate tracker))
-     (fn [val]
-       (is (= (.getNumAcks tracker) val))
-       (.resetNumAcks tracker)
-       )]
-    ))
-
-(defbolt branching-bolt ["num"]
-  {:params [amt]}
-  [tuple collector]
-  (doseq [i (range amt)]
-    (emit-bolt! collector [i] :anchor tuple))
-  (ack! collector tuple))
-
-(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
-  [conf context collector]
-  (let [seen (atom [])]
-    (bolt
-      (execute [tuple]
-        (swap! seen conj tuple)
-        (when (= (count @seen) amt)
-          (emit-bolt! collector [1] :anchor @seen)
-          (doseq [s @seen]
-            (ack! collector s))
-          (reset! seen [])
-          )))
-      ))
-
-(defbolt ack-bolt {}
-  [tuple collector]
-  (ack! collector tuple))
-
-(deftest test-acking
-  (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
-    (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
-          [feeder2 checker2] (ack-tracking-feeder ["num"])
-          [feeder3 checker3] (ack-tracking-feeder ["num"])
-          tracked (TrackedTopology.
-                   (Thrift/buildTopology
-                     {"1" (Thrift/prepareSpoutDetails feeder1)
-                      "2" (Thrift/prepareSpoutDetails feeder2)
-                      "3" (Thrift/prepareSpoutDetails feeder3)}
-                     {"4" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "1" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            (branching-bolt 2))
-                      "5" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "2" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            (branching-bolt 4))
-                      "6" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "3" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            (branching-bolt 1))
-                      "7" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "4" nil)
-                             (Thrift/prepareShuffleGrouping)
-                             (Utils/getGlobalStreamId "5" nil)
-                             (Thrift/prepareShuffleGrouping)
-                             (Utils/getGlobalStreamId "6" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            (agg-bolt 3))
-                      "8" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "7" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            (branching-bolt 2))
-                      "9" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "8" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            ack-bolt)}
-                     )
-                     cluster)]
-      (.submitTopology cluster
-                       "acking-test1"
-                       {}
-                       tracked)
-      (.advanceClusterTime cluster 11)
-      (.feed feeder1 [1])
-      (Testing/trackedWait tracked (int 1))
-      (checker1 0)
-      (.feed feeder2 [1])
-      (Testing/trackedWait tracked (int 1))
-      (checker1 1)
-      (checker2 1)
-      (.feed feeder1 [1])
-      (Testing/trackedWait tracked (int 1))
-      (checker1 0)
-      (.feed feeder1 [1])
-      (Testing/trackedWait tracked (int 1))
-      (checker1 1)
-      (.feed feeder3 [1])
-      (Testing/trackedWait tracked (int 1))
-      (checker1 0)
-      (checker3 0)
-      (.feed feeder2 [1])
-      (Testing/trackedWait tracked (int 1))
-      (checker1 1)
-      (checker2 1)
-      (checker3 1))))
-
-(deftest test-ack-branching
-  (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
-    (let [[feeder checker] (ack-tracking-feeder ["num"])
-          tracked (TrackedTopology.
-                   (Thrift/buildTopology
-                     {"1" (Thrift/prepareSpoutDetails feeder)}
-                     {"2" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "1" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            identity-bolt)
-                      "3" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "1" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            identity-bolt)
-                      "4" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "2" nil)
-                             (Thrift/prepareShuffleGrouping)
-                             (Utils/getGlobalStreamId "3" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                             (agg-bolt 4))})
-                    cluster)]
-      (.submitTopology cluster
-                       "test-acking2"
-                       {}
-                       tracked)
-      (.advanceClusterTime cluster 11)
-      (.feed feeder [1])
-      (Testing/trackedWait tracked (int 1))
-      (checker 0)
-      (.feed feeder [1])
-      (Testing/trackedWait tracked (int 1))
-      (checker 2))))
-
-(defbolt dup-anchor ["num"]
-  [tuple collector]
-  (emit-bolt! collector [1] :anchor [tuple tuple])
-  (ack! collector tuple))
-
-(def bolt-prepared? (atom false))
-(defbolt prepare-tracked-bolt [] {:prepare true}
-  [conf context collector]  
-  (bolt
-   (execute [tuple]
-            (reset! bolt-prepared? true)
-            (ack! collector tuple))))
-
-(def spout-opened? (atom false))
-(defspout open-tracked-spout ["val"]
-  [conf context collector]
-  (spout
-    (nextTuple []
-      (reset! spout-opened? true)
-      )))
-
-(deftest test-submit-inactive-topology
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withDaemonConf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
-    (let [feeder (FeederSpout. ["field1"])
-          tracker (AckFailMapTracker.)
-          _ (.setAckFailDelegate feeder tracker)
-          topology (Thrift/buildTopology
-                    {"1" (Thrift/prepareSpoutDetails feeder)
-                     "2" (Thrift/prepareSpoutDetails open-tracked-spout)}
-                    {"3" (Thrift/prepareBoltDetails
-                           {(Utils/getGlobalStreamId "1" nil)
-                            (Thrift/prepareGlobalGrouping)}
-                           prepare-tracked-bolt)})]
-      (reset! bolt-prepared? false)
-      (reset! spout-opened? false)
-
-      (.submitTopologyWithOpts cluster
-        "test"
-        {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
-        topology
-        (SubmitOptions. TopologyInitialStatus/INACTIVE))
-      (.advanceClusterTime cluster 11)
-      (.feed feeder ["a"] 1)
-      (.advanceClusterTime cluster 9)
-      (is (not @bolt-prepared?))
-      (is (not @spout-opened?))
-      (.activate (.getNimbus cluster) "test")
-
-      (.advanceClusterTime cluster 12)
-      (assert-acked tracker 1)
-      (is @bolt-prepared?)
-      (is @spout-opened?))))
-
-(deftest test-acking-self-anchor
-  (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
-    (let [[feeder checker] (ack-tracking-feeder ["num"])
-          tracked (TrackedTopology.
-                   (Thrift/buildTopology
-                     {"1" (Thrift/prepareSpoutDetails feeder)}
-                     {"2" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "1" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            dup-anchor)
-                      "3" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "2" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            ack-bolt)})
-                   cluster)]
-      (.submitTopology cluster
-                       "test"
-                       {}
-                       tracked)
-      (.advanceClusterTime cluster 11)
-      (.feed feeder [1])
-      (Testing/trackedWait tracked (int 1))
-      (checker 1)
-      (.feed feeder [1])
-      (.feed feeder [1])
-      (.feed feeder [1])
-      (Testing/trackedWait tracked (int 3))
-      (checker 3))))
-
-(defspout IncSpout ["word"]
-  [conf context collector]
-  (let [state (atom 0)]
-    (spout
-     (nextTuple []
-       (Thread/sleep 100)
-       (emit-spout! collector [@state] :id 1)         
-       )
-     (ack [id]
-       (swap! state inc))
-     )))
-
-
-(defspout IncSpout2 ["word"] {:params [prefix]}
-  [conf context collector]
-  (let [state (atom 0)]
-    (spout
-     (nextTuple []
-       (Thread/sleep 100)
-       (swap! state inc)
-       (emit-spout! collector [(str prefix "-" @state)])         
-       )
-     )))
-
-(deftest test-kryo-decorators-config
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withDaemonConf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
-                                                  TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]})))]
-    (let [builder (TopologyBuilder.)
-          _ (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
-          _ (-> builder (.setBolt "2" (TestConfBolt. {TOPOLOGY-KRYO-DECORATORS ["one" "two"]})) (.shuffleGrouping "1"))
-          results (Testing/completeTopology cluster
-                                     (.createTopology builder)
-                                     (doto (CompleteTopologyParam.)
-                                       (.setStormConf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]})
-                                       (.setMockedSources (MockedSources. {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))))]
-      (is (= {"topology.kryo.decorators" (list "one" "two" "three")}
-             (->> (Testing/readTuples results "2") (apply concat) (apply hash-map)))))))
-
-(deftest test-component-specific-config
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withDaemonConf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true})))]
-    (let [builder (TopologyBuilder.)
-          _ (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
-          _ (-> builder
-                (.setBolt "2"
-                          (TestConfBolt.
-                            {"fake.config" 123
-                             TOPOLOGY-MAX-TASK-PARALLELISM 20
-                             TOPOLOGY-MAX-SPOUT-PENDING 30
-                             TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
-                                                     {"fake.type2" "a.serializer"}]}))
-                (.shuffleGrouping "1")
-                (.setMaxTaskParallelism (int 2))
-                (.addConfiguration "fake.config2" 987))
-          results (Testing/completeTopology cluster
-                                     (.createTopology builder)
-                                     (doto (CompleteTopologyParam.)
-                                       (.setStormConf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer", "fake.type3" "a.serializer3"}]})
-                                        (.setMockedSources (MockedSources. {"1" [["fake.config"]
-                                                         [TOPOLOGY-MAX-TASK-PARALLELISM]
-                                                         [TOPOLOGY-MAX-SPOUT-PENDING]
-                                                         ["fake.config2"]
-                                                         [TOPOLOGY-KRYO-REGISTER]]}))))]
-      (is (= {"fake.config" 123
-              "fake.config2" 987
-              TOPOLOGY-MAX-TASK-PARALLELISM 2
-              TOPOLOGY-MAX-SPOUT-PENDING 30
-              TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
-                                      "fake.type2" "a.serializer"
-                                      "fake.type3" "a.serializer3"}}
-             (->> (Testing/readTuples results "2")
-                  (apply concat)
-                  (apply hash-map)))))))
-
-(defbolt hooks-bolt ["emit" "ack" "fail" "executed"] {:prepare true}
-  [conf context collector]
-  (let [acked (atom 0)
-        failed (atom 0)
-        executed (atom 0)
-        emitted (atom 0)]
-    (.addTaskHook context
-                  (reify org.apache.storm.hooks.ITaskHook
-                    (prepare [this conf context]
-                      )
-                    (cleanup [this]
-                      )
-                    (emit [this info]
-                      (swap! emitted inc))
-                    (boltAck [this info]
-                      (swap! acked inc))
-                    (boltFail [this info]
-                      (swap! failed inc))
-                    (boltExecute [this info]
-                      (swap! executed inc))
-                      ))
-    (bolt
-     (execute [tuple]
-        (emit-bolt! collector [@emitted @acked @failed @executed])
-        (if (= 0 (- @acked @failed))
-          (ack! collector tuple)
-          (fail! collector tuple))
-        ))))
-
-(deftest test-hooks
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)))]
-    (let [topology (Thrift/buildTopology
-                     {"1" (Thrift/prepareSpoutDetails
-                            (TestPlannerSpout. (Fields. ["conf"])))}
-                     {"2" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "1" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            hooks-bolt)})
-          results (Testing/completeTopology cluster
-                                     topology
-                                     (doto (CompleteTopologyParam.)
-                                       (.setMockedSources (MockedSources. {"1" [[1]
-                                                         [1]
-                                                         [1]
-                                                         [1]
-                                                         ]}))))]
-      (is (= [[0 0 0 0]
-              [2 1 0 1]
-              [4 1 1 2]
-              [6 2 1 3]]
-             (Testing/readTuples results "2")
-             )))))
-
-(defbolt report-errors-bolt {}
-  [tuple collector]
-  (doseq [i (range (.getValue tuple 0))]
-    (report-error! collector (RuntimeException.)))
-  (ack! collector tuple))
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 2b42354..75a0432 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -17,17 +17,76 @@
   (:use [clojure.test])
   (:use [org.apache.storm daemon-config config util])
   (:use [org.apache.storm clojure])
-  (:require [integration.org.apache.storm.integration-test :as it])
   (:import [org.apache.storm Testing Config]
-           [org.apache.storm.generated GlobalStreamId])
+           [org.apache.storm.generated GlobalStreamId])           
   (:import [org.apache.storm.tuple Values])
   (:import [org.apache.storm.utils Time])
   (:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout FeederSpout
             TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
-            AckFailMapTracker MkTupleParam])
+            AckFailMapTracker AckTracker MkTupleParam])
   (:import [org.apache.storm.utils Utils])
   (:import [org.apache.storm Thrift ILocalCluster]))
 
+; If you are working on porting this test to Java, be aware that there are ports of these methods/bolts in TopologyIntegrationTest.java.
+(defn assert-loop 
+([afn ids] (assert-loop afn ids 10))
+([afn ids timeout-secs]
+  (loop [remaining-time (* timeout-secs 1000)]
+    (let [start-time (System/currentTimeMillis)
+          assertion-is-true (every? afn ids)]
+      (if (or assertion-is-true (neg? remaining-time))
+        (is assertion-is-true)
+        (do
+          (Thread/sleep 1)
+          (recur (- remaining-time (- (System/currentTimeMillis) start-time)))
+        ))))))
+
+(defn assert-acked [tracker & ids]
+  (assert-loop #(.isAcked tracker %) ids))
+
+(defn assert-failed [tracker & ids]
+  (assert-loop #(.isFailed tracker %) ids))
+  
+(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
+  [conf context collector]
+  (let [seen (atom [])]
+    (bolt
+      (execute [tuple]
+        (swap! seen conj tuple)
+        (when (= (count @seen) amt)
+          (emit-bolt! collector [1] :anchor @seen)
+          (doseq [s @seen]
+            (ack! collector s))
+          (reset! seen [])
+          )))
+      ))
+      
+(defbolt identity-bolt ["num"]
+  [tuple collector]
+  (emit-bolt! collector (.getValues tuple) :anchor tuple)
+  (ack! collector tuple))
+      
+(defbolt ack-every-other {} {:prepare true}
+  [conf context collector]
+  (let [state (atom -1)]
+    (bolt
+      (execute [tuple]
+        (let [val (swap! state -)]
+          (when (pos? val)
+            (ack! collector tuple)
+            ))))))
+
+(defn ack-tracking-feeder [fields]
+  (let [tracker (AckTracker.)]
+    [(doto (FeederSpout. fields)
+       (.setAckFailDelegate tracker))
+     (fn [val]
+       (is (= (.getNumAcks tracker) val))
+       (.resetNumAcks tracker)
+       )]
+    ))
+; End section with methods/bolts that also exist in Java.
+  
 (deftest test-with-simulated-time
   (is (= false (Time/isSimulating)))
   (Testing/withSimulatedTime (fn []
@@ -64,7 +123,7 @@
   (Testing/withTrackedCluster
    (reify TestJob
      (^void run [this ^ILocalCluster cluster]
-       (let [[feeder checker] (it/ack-tracking-feeder ["num"])
+       (let [[feeder checker] (ack-tracking-feeder ["num"])
              tracked (Testing/mkTrackedTopology
                       cluster
                       (Thrift/buildTopology
@@ -72,17 +131,17 @@
                        {"2" (Thrift/prepareBoltDetails
                               {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
                                (Thrift/prepareShuffleGrouping)}
-                              it/identity-bolt)
+                              identity-bolt)
                         "3" (Thrift/prepareBoltDetails
                               {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
                                (Thrift/prepareShuffleGrouping)}
-                              it/identity-bolt)
+                              identity-bolt)
                         "4" (Thrift/prepareBoltDetails
                              {(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID)
                               (Thrift/prepareShuffleGrouping)
                               (GlobalStreamId. "3" Utils/DEFAULT_STREAM_ID)
                               (Thrift/prepareShuffleGrouping)}
-                             (it/agg-bolt 4))}))]
+                             (agg-bolt 4))}))]
          (.submitTopology cluster
                           "test-acking2"
                           (Config.)
@@ -113,7 +172,7 @@
                          {"2" (Thrift/prepareBoltDetails
                                 {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
                                  (Thrift/prepareGlobalGrouping)}
-                                it/ack-every-other)})
+                                ack-every-other)})
                storm-conf (doto (Config.)
                             (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
            (.submitTopology cluster
@@ -124,10 +183,10 @@
            (.feed feeder ["b"] 2)
            (.feed feeder ["c"] 3)
            (Testing/advanceClusterTime cluster (int 9))
-           (it/assert-acked tracker 1 3)
+           (assert-acked tracker 1 3)
            (is (not (.isFailed tracker 2)))
            (Testing/advanceClusterTime cluster (int 12))
-           (it/assert-failed tracker 2)
+           (assert-failed tracker 2)
            ))))))
 
 (deftest test-disable-tuple-timeout
@@ -147,7 +206,7 @@
                            {"2" (Thrift/prepareBoltDetails
                                   {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
                                    (Thrift/prepareGlobalGrouping)}
-                                  it/ack-every-other)})
+                                  ack-every-other)})
                 storm-conf (doto (Config.)
                              (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
                              (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
@@ -159,7 +218,7 @@
             (.feed feeder ["b"] 2)
             (.feed feeder ["c"] 3)
             (Testing/advanceClusterTime cluster (int 9))
-            (it/assert-acked tracker 1 3)
+            (assert-acked tracker 1 3)
             (is (not (.isFailed tracker 2)))
             (Testing/advanceClusterTime cluster (int 12))
             (is (not (.isFailed tracker 2)))


Mime
View raw message