storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [3/7] storm git commit: STORM-1307: Port testing4j_test.clj to Java
Date Sat, 05 Jan 2019 15:02:53 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
b/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
new file mode 100644
index 0000000..efff057
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
@@ -0,0 +1,904 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.integration.AssertLoop.assertAcked;
+import static org.apache.storm.integration.AssertLoop.assertFailed;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.Thrift.BoltDetails;
+import org.apache.storm.Thrift.SpoutDetails;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.hooks.BaseTaskHook;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestAggregatesCounter;
+import org.apache.storm.testing.TestConfBolt;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestPlannerSpout;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@IntegrationTest
+public class TopologyIntegrationTest {
+
+    @ParameterizedTest
+    @ValueSource(strings = {"true", "false"})
+    public void testBasicTopology(boolean useLocalMessaging) throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withSupervisors(4)
+            .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
+            .build()) {
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new
TestWordSpout(true), 3));
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+                    new TestWordCounter(), 4));
+            boltMap.put("3",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareGlobalGrouping()),
+                    new TestGlobalCount()));
+            boltMap.put("4",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("2", null),
+                        Thrift.prepareGlobalGrouping()),
+                    new TestAggregatesCounter()));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            Map<String, Object> stormConf = new HashMap<>();
+            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+            stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
+
+            List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1",
testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+            completeTopologyParams.setStormConf(stormConf);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster,
topology, completeTopologyParams);
+
+            assertThat(Testing.readTuples(results, "1"), containsInAnyOrder(
+                new Values("nathan"),
+                new Values("nathan"),
+                new Values("bob"),
+                new Values("joey")));
+            assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
+                new Values("nathan", 1),
+                new Values("nathan", 2),
+                new Values("bob", 1),
+                new Values("joey", 1)
+            ));
+            assertThat(Testing.readTuples(results, "3"), contains(
+                new Values(1),
+                new Values(2),
+                new Values(3),
+                new Values(4)
+            ));
+            assertThat(Testing.readTuples(results, "4"), contains(
+                new Values(1),
+                new Values(2),
+                new Values(3),
+                new Values(4)
+            ));
+        }
+    }
+
+    private static class EmitTaskIdBolt extends BaseRichBolt {
+
+        private int taskIndex;
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("tid"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
+            this.collector = collector;
+            this.taskIndex = context.getThisTaskIndex();
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            collector.emit(input, new Values(taskIndex));
+            collector.ack(input);
+        }
+
+    }
+
+    @Test
+    public void testMultiTasksPerCluster() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withSupervisors(4)
+            .build()) {
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new
TestWordSpout(true)));
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareAllGrouping()),
+                    new EmitTaskIdBolt(), 3, Collections.singletonMap(Config.TOPOLOGY_TASKS,
6)));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1",
Collections.singletonList(new FixedTuple(new Values("a")))));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster,
topology, completeTopologyParams);
+
+            assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
+                new Values(0),
+                new Values(1),
+                new Values(2),
+                new Values(3),
+                new Values(4),
+                new Values(5)
+            ));
+        }
+    }
+
+    @Test
+    public void testTimeout() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withSupervisors(4)
+            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
true))
+            .build()) {
+            FeederSpout feeder = new FeederSpout(new Fields("field1"));
+            AckFailMapTracker tracker = new AckFailMapTracker();
+            feeder.setAckFailDelegate(tracker);
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareGlobalGrouping()),
+                    new AckEveryOtherBolt()));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            cluster.submitTopology("timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,
10), topology);
+
+            cluster.advanceClusterTime(11);
+            feeder.feed(new Values("a"), 1);
+            feeder.feed(new Values("b"), 2);
+            feeder.feed(new Values("c"), 3);
+            cluster.advanceClusterTime(9);
+            assertAcked(tracker, 1, 3);
+            assertThat(tracker.isFailed(2), is(false));
+            cluster.advanceClusterTime(12);
+            assertFailed(tracker, 2);
+        }
+    }
+
+    private static class ResetTimeoutBolt extends BaseRichBolt {
+
+        private int tupleCounter = 1;
+        private Tuple firstTuple = null;
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            if (tupleCounter == 1) {
+                firstTuple = input;
+            } else if (tupleCounter == 2) {
+                collector.resetTimeout(firstTuple);
+            } else if (tupleCounter == 5) {
+                collector.ack(firstTuple);
+                collector.ack(input);
+            } else {
+                collector.resetTimeout(firstTuple);
+                collector.ack(input);
+            }
+            tupleCounter++;
+        }
+    }
+
+    @Test
+    public void testResetTimeout() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
true))
+            .build()) {
+            FeederSpout feeder = new FeederSpout(new Fields("field1"));
+            AckFailMapTracker tracker = new AckFailMapTracker();
+            feeder.setAckFailDelegate(tracker);
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(
+                        Utils.getGlobalStreamId("1", null),
+                        Thrift.prepareGlobalGrouping()),
+                    new ResetTimeoutBolt()));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            cluster.submitTopology("reset-timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,
10), topology);
+
+            //The first tuple wil be used to check timeout reset
+            feeder.feed(new Values("a"), 1);
+            //The second tuple is used to wait for the spout to rotate its pending map
+            feeder.feed(new Values("b"), 2);
+            cluster.advanceClusterTime(9);
+            //The other tuples are used to reset the first tuple's timeout,
+            //and to wait for the message to get through to the spout (acks use the same
path as timeout resets)
+            feeder.feed(new Values("c"), 3);
+            assertAcked(tracker, 3);
+            cluster.advanceClusterTime(9);
+            feeder.feed(new Values("d"), 4);
+            assertAcked(tracker, 4);
+            cluster.advanceClusterTime(2);
+            //The time is now twice the message timeout, the second tuple should expire since
it was not acked
+            //Waiting for this also ensures that the first tuple gets failed if reset-timeout
doesn't work
+            assertFailed(tracker, 2);
+            //Put in a tuple to cause the first tuple to be acked
+            feeder.feed(new Values("e"), 5);
+            assertAcked(tracker, 5);
+            //The first tuple should be acked, and should not have failed
+            assertThat(tracker.isFailed(1), is(false));
+            assertAcked(tracker, 1);
+        }
+    }
+
+    private StormTopology mkValidateTopology() {
+        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new
TestWordSpout(true), 3));
+        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+            Thrift.prepareBoltDetails(
+                Collections.singletonMap(
+                    Utils.getGlobalStreamId("1", null),
+                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+                new TestWordCounter(), 4));
+        return Thrift.buildTopology(spoutMap, boltMap);
+    }
+
+    private StormTopology mkInvalidateTopology1() {
+        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new
TestWordSpout(true), 3));
+        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+            Thrift.prepareBoltDetails(
+                Collections.singletonMap(
+                    Utils.getGlobalStreamId("3", null),
+                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+                new TestWordCounter(), 4));
+        return Thrift.buildTopology(spoutMap, boltMap);
+    }
+
+    private StormTopology mkInvalidateTopology2() {
+        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new
TestWordSpout(true), 3));
+        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+            Thrift.prepareBoltDetails(
+                Collections.singletonMap(
+                    Utils.getGlobalStreamId("1", null),
+                    Thrift.prepareFieldsGrouping(Collections.singletonList("non-exists-field"))),
+                new TestWordCounter(), 4));
+        return Thrift.buildTopology(spoutMap, boltMap);
+    }
+
+    private StormTopology mkInvalidateTopology3() {
+        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new
TestWordSpout(true), 3));
+        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+            Thrift.prepareBoltDetails(
+                Collections.singletonMap(
+                    Utils.getGlobalStreamId("1", "non-exists-stream"),
+                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+                new TestWordCounter(), 4));
+        return Thrift.buildTopology(spoutMap, boltMap);
+    }
+
+    private boolean tryCompleteWordCountTopology(LocalCluster cluster, StormTopology topology)
throws Exception {
+        try {
+            List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1",
testTuples));
+            CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
+            completeTopologyParam.setMockedSources(mockedSources);
+            completeTopologyParam.setStormConf(Collections.singletonMap(Config.TOPOLOGY_WORKERS,
2));
+            Testing.completeTopology(cluster, topology, completeTopologyParam);
+            return false;
+        } catch (InvalidTopologyException e) {
+            return true;
+        }
+    }
+
+    @Test
+    public void testValidateTopologystructure() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
true))
+            .build()) {
+            assertThat(tryCompleteWordCountTopology(cluster, mkValidateTopology()), is(false));
+            assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology1()), is(true));
+            assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology2()), is(true));
+            assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology3()), is(true));
+        }
+    }
+
+    @Test
+    public void testSystemStream() throws Exception {
+        //this test works because mocking a spout splits up the tuples evenly among the tasks
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .build()) {
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new
TestWordSpout(true), 3));
+
+            Map<GlobalStreamId, Grouping> boltInputs = new HashMap<>();
+            boltInputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareFieldsGrouping(Collections.singletonList("word")));
+            boltInputs.put(Utils.getGlobalStreamId("1", "__system"), Thrift.prepareGlobalGrouping());
+            Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+                Thrift.prepareBoltDetails(
+                    boltInputs,
+                    new IdentityBolt(), 1));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            Map<String, Object> stormConf = new HashMap<>();
+            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+
+            List<FixedTuple> testTuples = Arrays.asList("a", "b", "c").stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1",
testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+            completeTopologyParams.setStormConf(stormConf);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster,
topology, completeTopologyParams);
+
+            assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
+                new Values("a"),
+                new Values("b"),
+                new Values("c")
+            ));
+        }
+    }
+
+    private static class SpoutAndChecker {
+
+        private final FeederSpout spout;
+        private final Consumer<Integer> checker;
+
+        public SpoutAndChecker(FeederSpout spout, Consumer<Integer> checker) {
+            this.spout = spout;
+            this.checker = checker;
+        }
+    }
+
+    private static class BranchingBolt extends BaseRichBolt {
+
+        private final int branches;
+        private OutputCollector collector;
+
+        public BranchingBolt(int branches) {
+            this.branches = branches;
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("num"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            IntStream.range(0, branches)
+                .forEach(i -> collector.emit(input, new Values(i)));
+            collector.ack(input);
+        }
+    }
+
+    private static class AckBolt extends BaseRichBolt {
+
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            collector.ack(input);
+        }
+    }
+
+    @Test
+    public void testAcking() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withTracked()
+            .build()) {
+            AckTrackingFeeder feeder1 = new AckTrackingFeeder("num");
+            AckTrackingFeeder feeder2 = new AckTrackingFeeder("num");
+            AckTrackingFeeder feeder3 = new AckTrackingFeeder("num");
+
+            Map<String, SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder1.getSpout()));
+            spoutMap.put("2", Thrift.prepareSpoutDetails(feeder2.getSpout()));
+            spoutMap.put("3", Thrift.prepareSpoutDetails(feeder3.getSpout()));
+
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("4", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1",
null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
+            boltMap.put("5", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2",
null), Thrift.prepareShuffleGrouping()), new BranchingBolt(4)));
+            boltMap.put("6", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("3",
null), Thrift.prepareShuffleGrouping()), new BranchingBolt(1)));
+
+            Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
+            aggregatorInputs.put(Utils.getGlobalStreamId("4", null), Thrift.prepareShuffleGrouping());
+            aggregatorInputs.put(Utils.getGlobalStreamId("5", null), Thrift.prepareShuffleGrouping());
+            aggregatorInputs.put(Utils.getGlobalStreamId("6", null), Thrift.prepareShuffleGrouping());
+            boltMap.put("7", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(3)));
+
+            boltMap.put("8", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("7",
null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
+            boltMap.put("9", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("8",
null), Thrift.prepareShuffleGrouping()), new AckBolt()));
+
+            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap,
boltMap), cluster);;
+
+            cluster.submitTopology("acking-test1", Collections.emptyMap(), tracked);
+
+            cluster.advanceClusterTime(11);
+            feeder1.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.assertNumAcks(0);
+            feeder2.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.assertNumAcks(1);
+            feeder2.assertNumAcks(1);
+            feeder1.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.assertNumAcks(0);
+            feeder1.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.assertNumAcks(1);
+            feeder3.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.assertNumAcks(0);
+            feeder3.assertNumAcks(0);
+            feeder2.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder1.feed(new Values(1));
+            feeder2.feed(new Values(1));
+            feeder3.feed(new Values(1));
+        }
+    }
+
+    @Test
+    public void testAckBranching() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withTracked()
+            .build()) {
+            AckTrackingFeeder feeder = new AckTrackingFeeder("num");
+
+            Map<String, SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
+
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1",
null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
+            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1",
null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
+
+            Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
+            aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
+            aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
+            boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
+
+            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap,
boltMap), cluster);;
+
+            cluster.submitTopology("test-acking2", Collections.emptyMap(), tracked);
+
+            cluster.advanceClusterTime(11);
+            feeder.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder.assertNumAcks(0);
+            feeder.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder.assertNumAcks(2);
+        }
+    }
+
+    private static class DupAnchorBolt extends BaseRichBolt {
+
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("num"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            ArrayList<Tuple> anchors = new ArrayList<>();
+            anchors.add(input);
+            anchors.add(input);
+            collector.emit(anchors, new Values(1));
+            collector.ack(input);
+        }
+    }
+
+    private static boolean boltPrepared = false;
+
+    private static class PrepareTrackedBolt extends BaseRichBolt {
+
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            boltPrepared = true;
+            collector.ack(input);
+        }
+    }
+
+    private static boolean spoutOpened = false;
+
+    private static class OpenTrackedSpout extends BaseRichSpout {
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("val"));
+        }
+
+        @Override
+        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector
collector) {
+        }
+
+        @Override
+        public void nextTuple() {
+            spoutOpened = true;
+        }
+
+    }
+
+    @Test
+    public void testSubmitInactiveTopology() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
true))
+            .build()) {
+            FeederSpout feeder = new FeederSpout(new Fields("field1"));
+            AckFailMapTracker tracker = new AckFailMapTracker();
+            feeder.setAckFailDelegate(tracker);
+
+            Map<String, SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
+            spoutMap.put("2", Thrift.prepareSpoutDetails(new OpenTrackedSpout()));
+
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1",
null), Thrift.prepareGlobalGrouping()), new PrepareTrackedBolt()));
+
+            boltPrepared = false;
+            spoutOpened = false;
+
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            cluster.submitTopologyWithOpts("test", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,
10), topology, new SubmitOptions(TopologyInitialStatus.INACTIVE));
+
+            cluster.advanceClusterTime(11);
+            feeder.feed(new Values("a"), 1);
+            cluster.advanceClusterTime(9);
+            assertThat(boltPrepared, is(false));
+            assertThat(spoutOpened, is(false));
+            cluster.getNimbus().activate("test");
+
+            cluster.advanceClusterTime(12);
+            assertAcked(tracker, 1);
+            assertThat(boltPrepared, is(true));
+            assertThat(spoutOpened, is(true));
+        }
+    }
+
+    @Test
+    public void testAckingSelfAnchor() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withTracked()
+            .build()) {
+            AckTrackingFeeder feeder = new AckTrackingFeeder("num");
+
+            Map<String, SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
+
+            Map<String, BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1",
null), Thrift.prepareShuffleGrouping()), new DupAnchorBolt()));
+            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2",
null), Thrift.prepareShuffleGrouping()), new AckBolt()));
+
+            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap,
boltMap), cluster);;
+
+            cluster.submitTopology("test", Collections.emptyMap(), tracked);
+
+            cluster.advanceClusterTime(11);
+            feeder.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder.assertNumAcks(1);
+            feeder.feed(new Values(1));
+            feeder.feed(new Values(1));
+            feeder.feed(new Values(1));
+            Testing.trackedWait(tracked, 3);
+            feeder.assertNumAcks(3);
+        }
+    }
+
+    private Map<Object, Object> listToMap(List<Object> list) {
+        assertThat(list.size() % 2, is(0));
+        Map<Object, Object> res = new HashMap<>();
+        for (int i = 0; i < list.size(); i += 2) {
+            res.put(list.get(i), list.get(i + 1));
+        }
+        return res;
+    }
+
+    @Test
+    public void testKryoDecoratorsConfig() throws Exception {
+        Map<String, Object> daemonConf = new HashMap<>();
+        daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
+        daemonConf.put(Config.TOPOLOGY_KRYO_DECORATORS, "this-is-overridden");
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(daemonConf)
+            .build()) {
+            TopologyBuilder topologyBuilder = new TopologyBuilder();
+            topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
+            topologyBuilder.setBolt("2", new TestConfBolt(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS,
Arrays.asList("one", "two"))))
+                .shuffleGrouping("1");
+
+            List<FixedTuple> testTuples = Arrays.asList(new Values(Config.TOPOLOGY_KRYO_DECORATORS)).stream()
+                .map(value -> new FixedTuple(value))
+                .collect(Collectors.toList());
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1",
testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+            completeTopologyParams.setStormConf(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS,
Arrays.asList("one", "three")));
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster,
topologyBuilder.createTopology(), completeTopologyParams);
+
+            List<Object> concatValues = Testing.readTuples(results, "2").stream()
+                .flatMap(values -> values.stream())
+                .collect(Collectors.toList());
+            assertThat(concatValues.get(0), is(Config.TOPOLOGY_KRYO_DECORATORS));
+            assertThat(concatValues.get(1), is(Arrays.asList("one", "two", "three")));
+        }
+    }
+
+    @Test
+    public void testComponentSpecificConfig() throws Exception {
+        Map<String, Object> daemonConf = new HashMap<>();
+        daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withDaemonConf(daemonConf)
+            .build()) {
+            TopologyBuilder topologyBuilder = new TopologyBuilder();
+            topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
+            Map<String, Object> componentConf = new HashMap<>();
+            componentConf.put("fake.config", 123);
+            componentConf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 20);
+            componentConf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30);
+            componentConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(Collections.singletonMap("fake.type",
"bad.serializer"), Collections.singletonMap("fake.type2", "a.serializer")));
+            topologyBuilder.setBolt("2", new TestConfBolt(componentConf))
+                .shuffleGrouping("1")
+                .setMaxTaskParallelism(2)
+                .addConfiguration("fake.config2", 987);
+
+            List<FixedTuple> testTuples = Arrays.asList("fake.config", Config.TOPOLOGY_MAX_TASK_PARALLELISM,
Config.TOPOLOGY_MAX_SPOUT_PENDING, "fake.config2", Config.TOPOLOGY_KRYO_REGISTER).stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+            Map<String, String> kryoRegister = new HashMap<>();
+            kryoRegister.put("fake.type", "good.serializer");
+            kryoRegister.put("fake.type3", "a.serializer3");
+            Map<String, Object> stormConf = new HashMap<>();
+            stormConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(kryoRegister));
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1",
testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+            completeTopologyParams.setStormConf(stormConf);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster,
topologyBuilder.createTopology(), completeTopologyParams);
+
+            Map<String, Object> expectedValues = new HashMap<>();
+            expectedValues.put("fake.config", 123L);
+            expectedValues.put("fake.config2", 987L);
+            expectedValues.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 2L);
+            expectedValues.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30L);
+            Map<String, String> expectedKryoRegister = new HashMap<>();
+            expectedKryoRegister.putAll(kryoRegister);
+            expectedKryoRegister.put("fake.type2", "a.serializer");
+            expectedValues.put(Config.TOPOLOGY_KRYO_REGISTER, expectedKryoRegister);
+            List<Object> concatValues = Testing.readTuples(results, "2").stream()
+                .flatMap(values -> values.stream())
+                .collect(Collectors.toList());
+            assertThat(listToMap(concatValues), is(expectedValues));
+        }
+    }
+    
+    private static class HooksBolt extends BaseRichBolt {
+
+        private int acked = 0;
+        private int failed = 0;
+        private int executed = 0;
+        private int emitted = 0;
+        private OutputCollector collector;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("emit", "ack", "fail", "executed"));
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
+            this.collector = collector;
+            context.addTaskHook(new BaseTaskHook() {
+                @Override
+                public void boltExecute(BoltExecuteInfo info) {
+                    executed++;
+                }
+
+                @Override
+                public void boltFail(BoltFailInfo info) {
+                    failed++;
+                }
+
+                @Override
+                public void boltAck(BoltAckInfo info) {
+                    acked++;
+                }
+
+                @Override
+                public void emit(EmitInfo info) {
+                    emitted++;
+                }
+                
+            });
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            collector.emit(new Values(emitted, acked, failed, executed));
+            if (acked - failed == 0) {
+                collector.ack(input);
+            } else {
+                collector.fail(input);
+            }
+        }
+    }
+    
+    @Test
+    public void testHooks() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .build()) {
+            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new
TestPlannerSpout(new Fields("conf"))));
+
+            Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
+                Thrift.prepareBoltDetails(
+                    Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()),
+                    new HooksBolt()));
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            List<FixedTuple> testTuples = Arrays.asList(1, 1, 1, 1).stream()
+                .map(value -> new FixedTuple(new Values(value)))
+                .collect(Collectors.toList());
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1",
testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setMockedSources(mockedSources);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster,
topology, completeTopologyParams);
+            
+            List<List<Object>> expectedTuples = Arrays.asList(
+                Arrays.asList(0, 0, 0, 0),
+                Arrays.asList(2, 1, 0, 1),
+                Arrays.asList(4, 1, 1, 2),
+                Arrays.asList(6, 2, 1, 3));
+
+            assertThat(Testing.readTuples(results, "2"), is(expectedTuples));
+        }
+    }
+
+}


Mime
View raw message