storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [1/7] storm git commit: STORM-1289: Port integration-test.clj to Java
Date Sat, 05 Jan 2019 15:02:51 GMT
Repository: storm
Updated Branches:
  refs/heads/master 7575a0027 -> 7908fac03


http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java b/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java
new file mode 100644
index 0000000..7cc7f65
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java
@@ -0,0 +1,1011 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import static org.apache.storm.utils.PredicateMatcher.matchesPredicate;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.storm.Thrift.BoltDetails;
+import org.apache.storm.Thrift.SpoutDetails;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.hooks.BaseTaskHook;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.AckTracker;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestAggregatesCounter;
+import org.apache.storm.testing.TestConfBolt;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestPlannerSpout;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@IntegrationTest
+public class TopologyIntegrationTest {
+
+    @ParameterizedTest
+    @ValueSource(strings = {"true", "false"})
+    public void testBasicTopology(boolean useLocalMessaging) throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withSupervisors(4)
+            .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
+            .build()) {
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+                    new TestWordCounter(), 4));
+            boltMap.put("3",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareGlobalGrouping()),
+                    new TestGlobalCount()));
+            boltMap.put("4",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("2", null),
+                        Thrift.prepareGlobalGrouping()),
+                    new TestAggregatesCounter()));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            Map<String, Object> stormConf = new HashMap<>();
+            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+            stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
+
+            List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+            completeTopologyParams.setStormConf(stormConf);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
+
+            assertThat(Testing.readTuples(results, "1"), containsInAnyOrder(
+                new Values("nathan"),
+                new Values("nathan"),
+                new Values("bob"),
+                new Values("joey")));
+            assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
+                new Values("nathan", 1),
+                new Values("nathan", 2),
+                new Values("bob", 1),
+                new Values("joey", 1)
+            ));
+            assertThat(Testing.readTuples(results, "3"), contains(
+                new Values(1),
+                new Values(2),
+                new Values(3),
+                new Values(4)
+            ));
+            assertThat(Testing.readTuples(results, "4"), contains(
+                new Values(1),
+                new Values(2),
+                new Values(3),
+                new Values(4)
+            ));
+        }
+    }
+
+    private static class EmitTaskIdBolt extends BaseRichBolt {
+
+        private int taskIndex;
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("tid"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+            this.taskIndex = context.getThisTaskIndex();
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            collector.emit(input, new Values(taskIndex));
+            collector.ack(input);
+        }
+
+    }
+
+    @Test
+    public void testMultiTasksPerCluster() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withSupervisors(4)
+            .build()) {
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true)));
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareAllGrouping()),
+                    new EmitTaskIdBolt(), 3, Collections.singletonMap(Config.TOPOLOGY_TASKS, 6)));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", Collections.singletonList(new FixedTuple(new Values("a")))));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
+
+            assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
+                new Values(0),
+                new Values(1),
+                new Values(2),
+                new Values(3),
+                new Values(4),
+                new Values(5)
+            ));
+        }
+    }
+
+    private static class AckEveryOtherBolt extends BaseRichBolt {
+
+        private boolean state = true;
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            if (state) {
+                collector.ack(input);
+            }
+            state = !state;
+        }
+
+    }
+
+    private void assertLoop(Predicate<Object> condition, Object... conditionParams) {
+        try {
+            Awaitility.with()
+                .pollInterval(1, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertThat(Arrays.asList(conditionParams), everyItem(matchesPredicate(condition))));
+        } catch (ConditionTimeoutException e) {
+            throw new AssertionError(e.getMessage());
+        }
+    }
+
+    private void assertAcked(AckFailMapTracker tracker, Object... ids) {
+        assertLoop(tracker::isAcked, ids);
+    }
+
+    private void assertFailed(AckFailMapTracker tracker, Object... ids) {
+        assertLoop(tracker::isFailed, ids);
+    }
+
+    @Test
+    public void testTimeout() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withSupervisors(4)
+            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
+            .build()) {
+            FeederSpout feeder = new FeederSpout(new Fields("field1"));
+            AckFailMapTracker tracker = new AckFailMapTracker();
+            feeder.setAckFailDelegate(tracker);
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareGlobalGrouping()),
+                    new AckEveryOtherBolt()));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            cluster.submitTopology("timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
+
+            cluster.advanceClusterTime(11);
+            feeder.feed(new Values("a"), 1);
+            feeder.feed(new Values("b"), 2);
+            feeder.feed(new Values("c"), 3);
+            cluster.advanceClusterTime(9);
+            assertAcked(tracker, 1, 3);
+            assertThat(tracker.isFailed(2), is(false));
+            cluster.advanceClusterTime(12);
+            assertFailed(tracker, 2);
+        }
+    }
+
+    private static class ResetTimeoutBolt extends BaseRichBolt {
+
+        private int tupleCounter = 1;
+        private Tuple firstTuple = null;
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            if (tupleCounter == 1) {
+                firstTuple = input;
+            } else if (tupleCounter == 2) {
+                collector.resetTimeout(firstTuple);
+            } else if (tupleCounter == 5) {
+                collector.ack(firstTuple);
+                collector.ack(input);
+            } else {
+                collector.resetTimeout(firstTuple);
+                collector.ack(input);
+            }
+            tupleCounter++;
+        }
+    }
+
+    @Test
+    public void testResetTimeout() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
+            .build()) {
+            FeederSpout feeder = new FeederSpout(new Fields("field1"));
+            AckFailMapTracker tracker = new AckFailMapTracker();
+            feeder.setAckFailDelegate(tracker);
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareGlobalGrouping()),
+                    new ResetTimeoutBolt()));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            cluster.submitTopology("reset-timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
+
+            //The first tuple wil be used to check timeout reset
+            feeder.feed(new Values("a"), 1);
+            //The second tuple is used to wait for the spout to rotate its pending map
+            feeder.feed(new Values("b"), 2);
+            cluster.advanceClusterTime(9);
+            //The other tuples are used to reset the first tuple's timeout,
+            //and to wait for the message to get through to the spout (acks use the same path as timeout resets)
+            feeder.feed(new Values("c"), 3);
+            assertAcked(tracker, 3);
+            cluster.advanceClusterTime(9);
+            feeder.feed(new Values("d"), 4);
+            assertAcked(tracker, 4);
+            cluster.advanceClusterTime(2);
+            //The time is now twice the message timeout, the second tuple should expire since it was not acked
+            //Waiting for this also ensures that the first tuple gets failed if reset-timeout doesn't work
+            assertFailed(tracker, 2);
+            //Put in a tuple to cause the first tuple to be acked
+            feeder.feed(new Values("e"), 5);
+            assertAcked(tracker, 5);
+            //The first tuple should be acked, and should not have failed
+            assertThat(tracker.isFailed(1), is(false));
+            assertAcked(tracker, 1);
+        }
+    }
+
+    private StormTopology mkValidateTopology() {
+        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+            Thrift.prepareBoltDetails(
+                Collections.singletonMap(
+                    Utils.getGlobalStreamId("1", null),
+                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+                new TestWordCounter(), 4));
+        return Thrift.buildTopology(spoutMap, boltMap);
+    }
+
+    private StormTopology mkInvalidateTopology1() {
+        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+            Thrift.prepareBoltDetails(
+                Collections.singletonMap(
+                    Utils.getGlobalStreamId("3", null),
+                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+                new TestWordCounter(), 4));
+        return Thrift.buildTopology(spoutMap, boltMap);
+    }
+
+    private StormTopology mkInvalidateTopology2() {
+        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+            Thrift.prepareBoltDetails(
+                Collections.singletonMap(
+                    Utils.getGlobalStreamId("1", null),
+                    Thrift.prepareFieldsGrouping(Collections.singletonList("non-exists-field"))),
+                new TestWordCounter(), 4));
+        return Thrift.buildTopology(spoutMap, boltMap);
+    }
+
+    private StormTopology mkInvalidateTopology3() {
+        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+            Thrift.prepareBoltDetails(
+                Collections.singletonMap(
+                    Utils.getGlobalStreamId("1", "non-exists-stream"),
+                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+                new TestWordCounter(), 4));
+        return Thrift.buildTopology(spoutMap, boltMap);
+    }
+
+    private boolean tryCompleteWordCountTopology(LocalCluster cluster, StormTopology topology) throws Exception {
+        try {
+            List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
+            CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
+            completeTopologyParam.setMockedSources(mockedSources);
+            completeTopologyParam.setStormConf(Collections.singletonMap(Config.TOPOLOGY_WORKERS, 2));
+            Testing.completeTopology(cluster, topology, completeTopologyParam);
+            return false;
+        } catch (InvalidTopologyException e) {
+            return true;
+        }
+    }
+
+    @Test
+    public void testValidateTopologystructure() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
+            .build()) {
+            assertThat(tryCompleteWordCountTopology(cluster, mkValidateTopology()), is(false));
+            assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology1()), is(true));
+            assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology2()), is(true));
+            assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology3()), is(true));
+        }
+    }
+
+    private static class IdentityBolt extends BaseRichBolt {
+
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("num"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            collector.emit(input, input.getValues());
+            collector.ack(input);
+        }
+
+    }
+
+    @Test
+    public void testSystemStream() throws Exception {
+        //this test works because mocking a spout splits up the tuples evenly among the tasks
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .build()) {
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+
+            Map<GlobalStreamId, Grouping> boltInputs = new HashMap<>();
+            boltInputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareFieldsGrouping(Collections.singletonList("word")));
+            boltInputs.put(Utils.getGlobalStreamId("1", "__system"), Thrift.prepareGlobalGrouping());
+            Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+                Thrift.prepareBoltDetails(
+                    boltInputs,
+                    new IdentityBolt(), 1));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            Map<String, Object> stormConf = new HashMap<>();
+            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+
+            List<FixedTuple> testTuples = Arrays.asList("a", "b", "c").stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+            completeTopologyParams.setStormConf(stormConf);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
+
+            assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
+                new Values("a"),
+                new Values("b"),
+                new Values("c")
+            ));
+        }
+    }
+
+    private static class SpoutAndChecker {
+
+        private final FeederSpout spout;
+        private final Consumer<Integer> checker;
+
+        public SpoutAndChecker(FeederSpout spout, Consumer<Integer> checker) {
+            this.spout = spout;
+            this.checker = checker;
+        }
+    }
+
+    private SpoutAndChecker ackTrackingFeeder(String... fields) {
+        AckTracker tracker = new AckTracker();
+        FeederSpout spout = new FeederSpout(new Fields(fields));
+        spout.setAckFailDelegate(tracker);
+        return new SpoutAndChecker(spout, expectedNumAcks -> {
+            assertThat(tracker.getNumAcks(), is(expectedNumAcks));
+            tracker.resetNumAcks();
+        });
+    }
+
+    private static class BranchingBolt extends BaseRichBolt {
+
+        private final int branches;
+        private OutputCollector collector;
+
+        public BranchingBolt(int branches) {
+            this.branches = branches;
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("num"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            IntStream.range(0, branches)
+                .forEach(i -> collector.emit(input, new Values(i)));
+            collector.ack(input);
+        }
+    }
+
+    private static class AggBolt extends BaseRichBolt {
+
+        private final int branches;
+        private final List<Tuple> seen = new ArrayList<>();
+        private OutputCollector collector;
+
+        public AggBolt(int branches) {
+            this.branches = branches;
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("num"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            seen.add(input);
+            if (seen.size() == branches) {
+                collector.emit(seen, new Values(1));
+                seen.forEach(t -> collector.ack(t));
+                seen.clear();
+            }
+        }
+    }
+
+    private static class AckBolt extends BaseRichBolt {
+
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            collector.ack(input);
+        }
+    }
+
+    @Test
+    public void testAcking() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withTracked()
+            .build()) {
+            SpoutAndChecker feeder1 = ackTrackingFeeder("num");
+            SpoutAndChecker feeder2 = ackTrackingFeeder("num");
+            SpoutAndChecker feeder3 = ackTrackingFeeder("num");
+
+            Map<String, SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder1.spout));
+            spoutMap.put("2", Thrift.prepareSpoutDetails(feeder2.spout));
+            spoutMap.put("3", Thrift.prepareSpoutDetails(feeder3.spout));
+
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("4", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
+            boltMap.put("5", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(4)));
+            boltMap.put("6", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(1)));
+
+            Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
+            aggregatorInputs.put(Utils.getGlobalStreamId("4", null), Thrift.prepareShuffleGrouping());
+            aggregatorInputs.put(Utils.getGlobalStreamId("5", null), Thrift.prepareShuffleGrouping());
+            aggregatorInputs.put(Utils.getGlobalStreamId("6", null), Thrift.prepareShuffleGrouping());
+            boltMap.put("7", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(3)));
+
+            boltMap.put("8", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("7", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
+            boltMap.put("9", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("8", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
+
+            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+
+            cluster.submitTopology("acking-test1", Collections.emptyMap(), tracked);
+
+            cluster.advanceClusterTime(11);
+            feeder1.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.checker.accept(0);
+            feeder2.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.checker.accept(1);
+            feeder2.checker.accept(1);
+            feeder1.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.checker.accept(0);
+            feeder1.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.checker.accept(1);
+            feeder3.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.checker.accept(0);
+            feeder3.checker.accept(0);
+            feeder2.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.spout.feed(new Values(1));
+            feeder2.spout.feed(new Values(1));
+            feeder3.spout.feed(new Values(1));
+        }
+    }
+
+    @Test
+    public void testAckBranching() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withTracked()
+            .build()) {
+            SpoutAndChecker feeder = ackTrackingFeeder("num");
+
+            Map<String, SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.spout));
+
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
+            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
+
+            Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
+            aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
+            aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
+            boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
+
+            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+
+            cluster.submitTopology("test-acking2", Collections.emptyMap(), tracked);
+
+            cluster.advanceClusterTime(11);
+            feeder.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder.checker.accept(0);
+            feeder.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder.checker.accept(2);
+        }
+    }
+
+    private static class DupAnchorBolt extends BaseRichBolt {
+
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("num"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            ArrayList<Tuple> anchors = new ArrayList<>();
+            anchors.add(input);
+            anchors.add(input);
+            collector.emit(anchors, new Values(1));
+            collector.ack(input);
+        }
+    }
+
+    private static boolean boltPrepared = false;
+
+    private static class PrepareTrackedBolt extends BaseRichBolt {
+
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            boltPrepared = true;
+            collector.ack(input);
+        }
+    }
+
+    private static boolean spoutOpened = false;
+
+    private static class OpenTrackedSpout extends BaseRichSpout {
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("val"));
+        }
+
+        @Override
+        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+        }
+
+        @Override
+        public void nextTuple() {
+            spoutOpened = true;
+        }
+
+    }
+
+    @Test
+    public void testSubmitInactiveTopology() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
+            .build()) {
+            FeederSpout feeder = new FeederSpout(new Fields("field1"));
+            AckFailMapTracker tracker = new AckFailMapTracker();
+            feeder.setAckFailDelegate(tracker);
+
+            Map<String, SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
+            spoutMap.put("2", Thrift.prepareSpoutDetails(new OpenTrackedSpout()));
+
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareGlobalGrouping()), new PrepareTrackedBolt()));
+
+            boltPrepared = false;
+            spoutOpened = false;
+
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            cluster.submitTopologyWithOpts("test", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology, new SubmitOptions(TopologyInitialStatus.INACTIVE));
+
+            cluster.advanceClusterTime(11);
+            feeder.feed(new Values("a"), 1);
+            cluster.advanceClusterTime(9);
+            assertThat(boltPrepared, is(false));
+            assertThat(spoutOpened, is(false));
+            cluster.getNimbus().activate("test");
+
+            cluster.advanceClusterTime(12);
+            assertAcked(tracker, 1);
+            assertThat(boltPrepared, is(true));
+            assertThat(spoutOpened, is(true));
+        }
+    }
+
+    @Test
+    public void testAckingSelfAnchor() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withTracked()
+            .build()) {
+            SpoutAndChecker feeder = ackTrackingFeeder("num");
+
+            Map<String, SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.spout));
+
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new DupAnchorBolt()));
+            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
+
+            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+
+            cluster.submitTopology("test", Collections.emptyMap(), tracked);
+
+            cluster.advanceClusterTime(11);
+            feeder.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder.checker.accept(1);
+            feeder.spout.feed(new Values(1));
+            feeder.spout.feed(new Values(1));
+            feeder.spout.feed(new Values(1));
+            Testing.trackedWait(tracked, 3);
+            feeder.checker.accept(3);
+        }
+    }
+
+    private Map<Object, Object> listToMap(List<Object> list) {
+        assertThat(list.size() % 2, is(0));
+        Map<Object, Object> res = new HashMap<>();
+        for (int i = 0; i < list.size(); i += 2) {
+            res.put(list.get(i), list.get(i + 1));
+        }
+        return res;
+    }
+
+    @Test
+    public void testKryoDecoratorsConfig() throws Exception {
+        Map<String, Object> daemonConf = new HashMap<>();
+        daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
+        daemonConf.put(Config.TOPOLOGY_KRYO_DECORATORS, "this-is-overridden");
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(daemonConf)
+            .build()) {
+            TopologyBuilder topologyBuilder = new TopologyBuilder();
+            topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
+            topologyBuilder.setBolt("2", new TestConfBolt(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS, Arrays.asList("one", "two"))))
+                .shuffleGrouping("1");
+
+            List<FixedTuple> testTuples = Arrays.asList(new Values(Config.TOPOLOGY_KRYO_DECORATORS)).stream()
+                .map(value -> new FixedTuple(value))
+                .collect(Collectors.toList());
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+            completeTopologyParams.setStormConf(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS, Arrays.asList("one", "three")));
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topologyBuilder.createTopology(), completeTopologyParams);
+
+            List<Object> concatValues = Testing.readTuples(results, "2").stream()
+                .flatMap(values -> values.stream())
+                .collect(Collectors.toList());
+            assertThat(concatValues.get(0), is(Config.TOPOLOGY_KRYO_DECORATORS));
+            assertThat(concatValues.get(1), is(Arrays.asList("one", "two", "three")));
+        }
+    }
+
+    @Test
+    public void testComponentSpecificConfig() throws Exception {
+        Map<String, Object> daemonConf = new HashMap<>();
+        daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(daemonConf)
+            .build()) {
+            TopologyBuilder topologyBuilder = new TopologyBuilder();
+            topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
+            Map<String, Object> componentConf = new HashMap<>();
+            componentConf.put("fake.config", 123);
+            componentConf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 20);
+            componentConf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30);
+            componentConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(Collections.singletonMap("fake.type", "bad.serializer"), Collections.singletonMap("fake.type2", "a.serializer")));
+            topologyBuilder.setBolt("2", new TestConfBolt(componentConf))
+                .shuffleGrouping("1")
+                .setMaxTaskParallelism(2)
+                .addConfiguration("fake.config2", 987);
+
+            List<FixedTuple> testTuples = Arrays.asList("fake.config", Config.TOPOLOGY_MAX_TASK_PARALLELISM, Config.TOPOLOGY_MAX_SPOUT_PENDING, "fake.config2", Config.TOPOLOGY_KRYO_REGISTER).stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+            Map<String, String> kryoRegister = new HashMap<>();
+            kryoRegister.put("fake.type", "good.serializer");
+            kryoRegister.put("fake.type3", "a.serializer3");
+            Map<String, Object> stormConf = new HashMap<>();
+            stormConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(kryoRegister));
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+            completeTopologyParams.setStormConf(stormConf);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topologyBuilder.createTopology(), completeTopologyParams);
+
+            Map<String, Object> expectedValues = new HashMap<>();
+            expectedValues.put("fake.config", 123L);
+            expectedValues.put("fake.config2", 987L);
+            expectedValues.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 2L);
+            expectedValues.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30L);
+            Map<String, String> expectedKryoRegister = new HashMap<>();
+            expectedKryoRegister.putAll(kryoRegister);
+            expectedKryoRegister.put("fake.type2", "a.serializer");
+            expectedValues.put(Config.TOPOLOGY_KRYO_REGISTER, expectedKryoRegister);
+            List<Object> concatValues = Testing.readTuples(results, "2").stream()
+                .flatMap(values -> values.stream())
+                .collect(Collectors.toList());
+            assertThat(listToMap(concatValues), is(expectedValues));
+        }
+    }
+    
+    private static class HooksBolt extends BaseRichBolt {
+
+        private int acked = 0;
+        private int failed = 0;
+        private int executed = 0;
+        private int emitted = 0;
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("emit", "ack", "fail", "executed"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+            context.addTaskHook(new BaseTaskHook() {
+                @Override
+                public void boltExecute(BoltExecuteInfo info) {
+                    executed++;
+                }
+
+                @Override
+                public void boltFail(BoltFailInfo info) {
+                    failed++;
+                }
+
+                @Override
+                public void boltAck(BoltAckInfo info) {
+                    acked++;
+                }
+
+                @Override
+                public void emit(EmitInfo info) {
+                    emitted++;
+                }
+                
+            });
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            collector.emit(new Values(emitted, acked, failed, executed));
+            if (acked - failed == 0) {
+                collector.ack(input);
+            } else {
+                collector.fail(input);
+            }
+        }
+    }
+    
+    @Test
+    public void testHooks() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .build()) {
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestPlannerSpout(new Fields("conf"))));
+
+            Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()),
+                    new HooksBolt()));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            List<FixedTuple> testTuples = Arrays.asList(1, 1, 1, 1).stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
+            
+            List<List<Object>> expectedTuples = Arrays.asList(
+                Arrays.asList(0, 0, 0, 0),
+                Arrays.asList(2, 1, 0, 1),
+                Arrays.asList(4, 1, 1, 2),
+                Arrays.asList(6, 2, 1, 3));
+
+            assertThat(Testing.readTuples(results, "2"), is(expectedTuples));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 3f511ff..0d69cf4 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -80,6 +80,13 @@
             <artifactId>curator-test</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-client</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.mockito</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-server/src/test/java/org/apache/storm/TestingTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/TestingTest.java b/storm-server/src/test/java/org/apache/storm/TestingTest.java
index ee0fabc..f3ead2e 100644
--- a/storm-server/src/test/java/org/apache/storm/TestingTest.java
+++ b/storm-server/src/test/java/org/apache/storm/TestingTest.java
@@ -12,6 +12,8 @@
 
 package org.apache.storm;
 
+import static org.junit.Assert.assertTrue;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -28,11 +30,7 @@ import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import static org.junit.Assert.assertTrue;
-
+import org.junit.jupiter.api.Test;
 /**
  * Test that the testing class does what it should do.
  */
@@ -89,7 +87,7 @@ public class TestingTest {
     };
 
     @Test
-    @Category(IntegrationTest.class)
+    @IntegrationTest
     public void testCompleteTopologyNettySimulated() throws Exception {
         Config daemonConf = new Config();
         daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
@@ -101,7 +99,7 @@ public class TestingTest {
     }
 
     @Test
-    @Category(IntegrationTest.class)
+    @IntegrationTest
     public void testCompleteTopologyNetty() throws Exception {
         Config daemonConf = new Config();
         daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
@@ -113,7 +111,7 @@ public class TestingTest {
     }
 
     @Test
-    @Category(IntegrationTest.class)
+    @IntegrationTest
     public void testCompleteTopologyLocalSimulated() throws Exception {
         MkClusterParam param = new MkClusterParam();
         param.setSupervisors(4);
@@ -122,7 +120,7 @@ public class TestingTest {
     }
 
     @Test
-    @Category(IntegrationTest.class)
+    @IntegrationTest
     public void testCompleteTopologyLocal() throws Exception {
         MkClusterParam param = new MkClusterParam();
         param.setSupervisors(4);

http://git-wip-us.apache.org/repos/asf/storm/blob/d6a1e730/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 0b1cb45..798a4a3 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -50,30 +50,26 @@ import org.apache.storm.utils.ReflectionUtils;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidation;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
 import static org.junit.Assert.*;
 
+import java.time.Duration;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 public class TestResourceAwareScheduler {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestResourceAwareScheduler.class);
     private static final Config defaultTopologyConf = createClusterConfig(10, 128, 0, null);
     private static int currentTime = 1450418597;
-    @Rule
-    public final ExpectedException schedulerException = ExpectedException.none();
 
-    @BeforeClass
+    @BeforeAll
     public static void initConf() {
         defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
         defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
@@ -971,8 +967,7 @@ public class TestResourceAwareScheduler {
         String notAllowed = DefaultResourceAwareStrategy.class.getName();
         config.put(Config.NIMBUS_SCHEDULER_STRATEGY_CLASS_WHITELIST, Arrays.asList(allowed));
 
-        schedulerException.expect(DisallowedStrategyException.class);
-        ReflectionUtils.newSchedulerStrategyInstance(notAllowed, config);
+        Assertions.assertThrows(DisallowedStrategyException.class, () -> ReflectionUtils.newSchedulerStrategyInstance(notAllowed, config));        
     }
 
     @Test
@@ -984,16 +979,19 @@ public class TestResourceAwareScheduler {
         assertEquals(sched.getClass().getName(), allowed);
     }
 
-    @Category(PerformanceTest.class)
-    @Test(timeout=30_000)
+    @PerformanceTest
+    @Test
     public void testLargeTopologiesOnLargeClusters() {
-        testLargeTopologiesCommon(DefaultResourceAwareStrategy.class.getName(), false, 1);
+        Assertions.assertTimeoutPreemptively(Duration.ofSeconds(30), 
+            () -> testLargeTopologiesCommon(DefaultResourceAwareStrategy.class.getName(), false, 1));
+        
     }
     
-    @Category(PerformanceTest.class)
-    @Test(timeout=75_000)
+    @PerformanceTest
+    @Test
     public void testLargeTopologiesOnLargeClustersGras() {
-        testLargeTopologiesCommon(GenericResourceAwareStrategy.class.getName(), true, 1);
+        Assertions.assertTimeoutPreemptively(Duration.ofSeconds(75),
+            () -> testLargeTopologiesCommon(GenericResourceAwareStrategy.class.getName(), true, 1));
     }
 
     public void testLargeTopologiesCommon(final String strategy, final boolean includeGpu, final int multiplier) {


Mime
View raw message