storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <s...@apache.org>
Subject Re: Spout sleep wait strategy
Date Wed, 16 Aug 2017 04:46:55 GMT
Yes, I believe so.

2017-08-16 0:34 GMT+02:00 Mahak Goel <mahakgoel94@gmail.com>:

> Awesome, thanks, this makes sense. So here https://github.com/
> apache/storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/
> storm-client/src/jvm/org/apache/storm/executor/spout/
> SpoutExecutor.java#L141 it looks like all acks and fails are processed
> before nextTuple is called. Is that correct?
>
> On Tue, Aug 15, 2017 at 4:33 PM, Stig Rohde Døssing <srdo@apache.org>
> wrote:
>
>> Sure. When the supervisor starts up a worker JVM, the worker process
>> boots up its set of executors here https://github.com/apache/stor
>> m/blob/7f33447477dfbf581e9b46feb27c362cc170dc56/storm-
>> client/src/jvm/org/apache/storm/daemon/worker/Worker.java#L202 (note the
>> main method at the bottom of the file, this is the entry point for worker
>> processes). The loop calling SpoutExecutor.call is here
>> https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2
>> d3da3388b24d13db0/storm-client/src/jvm/org/apache/
>> storm/executor/Executor.java#L239, which basically just sets up a java
>> Thread to keep calling the call method until a crash or interrupt happens.
>>
>> The acks or fails are handled as part of call here
>> https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2
>> d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/
>> SpoutExecutor.java#L141. That statement pulls messages off the
>> executor's message queue, and calls back to
>> https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2
>> d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/
>> SpoutExecutor.java#L191, which is where acks and fails are handled. Once
>> the messages are handled, control returns to the call method.
>>
>> This might be helpful as a reference too http://storm.apache.org/releas
>> es/2.0.0-SNAPSHOT/Understanding-the-parallelism-of-a-Storm-topology.html
>>
>> 2017-08-15 22:02 GMT+02:00 Mahak Goel <mahakgoel94@gmail.com>:
>>
>>> Thanks stig, this is very helpful. So that call function gets called in
>>> a loop from somewhere? And when there is an ack or fail does do those get
>>> handled instead of call? Would you be able to point me toward the source
>>> for that as well? Just trying to understand how things work.
>>>
>>> Thanks again!
>>>
>>>
>>> On Aug 15, 2017, at 14:48, Stig Rohde Døssing <srdo@apache.org> wrote:
>>>
>>> Sure, take a look at https://github.com/apache/stor
>>> m/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client
>>> /src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L140. This
>>> function is called repeatedly on spouts to emit new tuples. The wait
>>> strategy is used in L175 when a call to nextTuple doesn't emit anything.
>>> The wait strategy is instantiated here https://github.com/apache/stor
>>> m/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client
>>> /src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L73. Note
>>> that this is linking to the current master code, the 1.x code is Clojure
>>> code instead. The equivalent on 1.x is here
>>> https://github.com/apache/storm/blob/v1.1.1/storm-core/src/c
>>> lj/org/apache/storm/daemon/executor.clj#L659.
>>>
>>> I believe you have to do it in code for Java-based topology
>>> configurations, but you should take a look at
>>> http://storm.apache.org/releases/2.0.0-SNAPSHOT/flux.html, which allows
>>> you to specify topology configuration as yaml.
>>>
>>> 2017-08-15 20:36 GMT+02:00 Mahak Goel <mahakgoel94@gmail.com>:
>>>
>>>> Also there's no config file that can do something similar right? It has
>>>> to be done in the code?
>>>>
>>>>
>>>> On Aug 15, 2017, at 14:31, Mahak Goel <mahakgoel94@gmail.com> wrote:
>>>>
>>>> Thanks stig, that worked for me!
>>>>
>>>> Another question, how does storm internally handle this time out? Is
>>>> there some source code you can point me to?
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On Aug 15, 2017, at 12:15, Stig Rohde Døssing <srdo@apache.org> wrote:
>>>>
>>>> I think you need to give the FQCN for SleepSpoutWaitStrategy instead of
>>>> an instance, since the config must be serializable to JSON, a little
>>>> surprised you don't get an error when you submit that topology. If you're
>>>> using the default wait strategy, you can just leave out the
>>>> TOPOLOGY_SPOUT_WAIT_STRATEGY part.
>>>>
>>>> Here's what works for me (based on the word count topology in
>>>> storm-starter):
>>>>
>>>> builder.setSpout("spout", new RandomSentenceSpout(), 5)
>>>>         .addConfiguration(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY, new
>>>> TestWait().getClass().getName())
>>>>         .addConfiguration(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS,
>>>> 60_000);
>>>>
>>>> where TestWait is just an inner class like this (purely so I can print
>>>> the configuration, normally I'd just use the built in wait strategy)
>>>>
>>>> public static final class TestWait extends SleepSpoutWaitStrategy {
>>>>
>>>>         @Override
>>>>         public void prepare(Map<String, Object> conf) {
>>>>             super.prepare(conf);
>>>>             LogManager.getLogger(getClass()).error("The sleep backoff
>>>> is {}", conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS));
>>>>         }
>>>>
>>>>   }
>>>>
>>>> When I run the topology I get the following in the log:
>>>> 2017-08-15 18:11:56.596 o.a.s.s.WordCountTopology$TestWait main
>>>> [ERROR] The sleep backoff is 60000
>>>>
>>>> 2017-08-15 18:00 GMT+02:00 Mahak Goel <mahakgoel94@gmail.com>:
>>>>
>>>>> In the last line I use addConfigurations
>>>>>
>>>>> Sent from my iPhone
>>>>>
>>>>> On Aug 15, 2017, at 11:59, Mahak Goel <mahakgoel94@gmail.com> wrote:
>>>>>
>>>>> Hmm okay, that's what I'm trying to do but maybe I'm doing it wrong.
>>>>>
>>>>>
>>>>> Config config = new Config();
>>>>> SleepSpoutWaitStrategy strategy = new SleepSpoutWaitStrategy();
>>>>> config.put(org.apache.storm.Config.TOPOLOGY_SPOUT_WAIT_STRATEGY,
>>>>> strategy);
>>>>> config.put(org.apache.storm.Config.TOPOLOGY_SLEEP_SPOUT_WAIT
>>>>> _STRATEGY_TIME_MS, 10);
>>>>> builder.setSpout(...).addConfiguration(config);
>>>>>
>>>>>
>>>>>
>>>>> Sent from my iPhone
>>>>> On Aug 15, 2017, at 11:51, Stig Rohde Døssing <srdo@apache.org>
wrote:
>>>>>
>>>>> I think I might have misread the code. It looks like the method I
>>>>> linked does the opposite of what I thought, and removes only the
>>>>> configuration that is not listed in the link. I would expect using
>>>>> SpoutDeclarer.addConfiguration to work then.
>>>>>
>>>>> 2017-08-15 17:36 GMT+02:00 Mahak Goel <mahakgoel94@gmail.com>:
>>>>>
>>>>>> Text from post.
>>>>>>
>>>>>> 2. Spout wait strategies: There's two situations in which a spout
>>>>>> needs to wait. The first is when the max spout pending limit is reached.
>>>>>> The second is when nothing is emitted from nextTuple. Previously,
Storm
>>>>>> would just have that spout sit in a busy loop in those cases. What
Storm
>>>>>> does in those situations is now pluggable, and the default is now
for the
>>>>>> spout to sleep for 1 ms. This will cause the spout to use dramatically
less
>>>>>> CPU when it hits those cases, and it also obviates the need for spouts
to
>>>>>> do any sleeping in their implementation to be "polite". The wait
strategy
>>>>>> can be configured with TOPOLOGY_SPOUT_WAIT_STRATEGY and can be
>>>>>> configured on a spout by spout basis. The interface to implement
for a wait
>>>>>> strategy is backtype.storm.spout.ISpoutWaitStrategy
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Aug 15, 2017, at 11:34, Mahak Goel <mahakgoel94@gmail.com>
wrote:
>>>>>>
>>>>>> I tried adding TOPOLOGY_SPOUT_WAIT_STRATEGY  and
>>>>>> TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS in the spouts config but
>>>>>> that didn't seem to have an effect.
>>>>>>
>>>>>>
>>>>>> On Aug 15, 2017, at 11:28, Mahak Goel <mahakgoel94@gmail.com>
wrote:
>>>>>>
>>>>>> Hi Stig,
>>>>>>
>>>>>> Thank you. However it looks like from this post there is a way to
do
>>>>>> it on a per spout basis.
>>>>>> https://groups.google.com/forum/m/#!search/Storm$200.8.1$20r
>>>>>> eleased/storm-user/hVbXtBdCkQo
>>>>>>
>>>>>> Do you or does anyone else know if this is still a possibility? If
>>>>>> so, how do I do it?
>>>>>>
>>>>>>
>>>>>> On Aug 15, 2017, at 11:14, Stig Rohde Døssing <srdo@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Mahak,
>>>>>>
>>>>>> I haven't checked in any detail, but I suspect there isn't. I'd have
>>>>>> said you could set the configuration for the spout via the SpoutDeclarer
>>>>>> addConfiguration methods when declaring the spout, but it looks like
the
>>>>>> wait strategy and backoff are both removed from the component
>>>>>> configuration, and only read from the topology level configuration
>>>>>> https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f3
>>>>>> 3ab64e200345333e4/storm-client/src/jvm/org/apache/storm/exec
>>>>>> utor/Executor.java#L431.
>>>>>>
>>>>>> 2017-08-15 16:45 GMT+02:00 Brian Taylor <
>>>>>> brian@resolvingarchitecture.com>:
>>>>>>
>>>>>>> Unsubscribe
>>>>>>>
>>>>>>> Sent from BlueMail <http://www.bluemail.me/r?b=9660>
>>>>>>> On Aug 15, 2017, at 10:34 AM, Mahak Goel <mahakgoel94@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I know I can configure a sleep wait strategy in the defaults.yaml
and that will apply to all spouts in the topology. Is there a way to do this on a spout by
spout basis? That is, is there a way to configure different times for different spouts?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message