storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mahak Goel <mahakgoe...@gmail.com>
Subject Re: Spout sleep wait strategy
Date Tue, 15 Aug 2017 20:02:44 GMT
Thanks stig, this is very helpful. So that call function gets called in a loop from somewhere?
And when there is an ack or fail does do those get handled instead of call? Would you be able
to point me toward the source for that as well? Just trying to understand how things work.


Thanks again!  


> On Aug 15, 2017, at 14:48, Stig Rohde Døssing <srdo@apache.org> wrote:
> 
> Sure, take a look at https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L140.
This function is called repeatedly on spouts to emit new tuples. The wait strategy is used
in L175 when a call to nextTuple doesn't emit anything. The wait strategy is instantiated
here https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L73.
Note that this is linking to the current master code, the 1.x code is Clojure code instead.
The equivalent on 1.x is here https://github.com/apache/storm/blob/v1.1.1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L659.
> 
> I believe you have to do it in code for Java-based topology configurations, but you should
take a look at http://storm.apache.org/releases/2.0.0-SNAPSHOT/flux.html, which allows you
to specify topology configuration as yaml.
> 
> 2017-08-15 20:36 GMT+02:00 Mahak Goel <mahakgoel94@gmail.com>:
>> Also there's no config file that can do something similar right? It has to be done
in the code? 
>> 
>> 
>>> On Aug 15, 2017, at 14:31, Mahak Goel <mahakgoel94@gmail.com> wrote:
>>> 
>>> Thanks stig, that worked for me! 
>>> 
>>> Another question, how does storm internally handle this time out? Is there some
source code you can point me to? 
>>> 
>>> Sent from my iPhone
>>> 
>>>> On Aug 15, 2017, at 12:15, Stig Rohde Døssing <srdo@apache.org> wrote:
>>>> 
>>>> I think you need to give the FQCN for SleepSpoutWaitStrategy instead of an
instance, since the config must be serializable to JSON, a little surprised you don't get
an error when you submit that topology. If you're using the default wait strategy, you can
just leave out the TOPOLOGY_SPOUT_WAIT_STRATEGY part.
>>>> 
>>>> Here's what works for me (based on the word count topology in storm-starter):
>>>> 
>>>> builder.setSpout("spout", new RandomSentenceSpout(), 5)
>>>>         .addConfiguration(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY, new TestWait().getClass().getName())
>>>>         .addConfiguration(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS,
60_000);
>>>> 
>>>> where TestWait is just an inner class like this (purely so I can print the
configuration, normally I'd just use the built in wait strategy)
>>>> 
>>>> public static final class TestWait extends SleepSpoutWaitStrategy {
>>>> 
>>>>         @Override
>>>>         public void prepare(Map<String, Object> conf) {
>>>>             super.prepare(conf);
>>>>             LogManager.getLogger(getClass()).error("The sleep backoff is
{}", conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS));
>>>>         }
>>>>       
>>>>   }
>>>> 
>>>> When I run the topology I get the following in the log:
>>>> 2017-08-15 18:11:56.596 o.a.s.s.WordCountTopology$TestWait main [ERROR] The
sleep backoff is 60000
>>>> 
>>>> 2017-08-15 18:00 GMT+02:00 Mahak Goel <mahakgoel94@gmail.com>:
>>>>> In the last line I use addConfigurations
>>>>> 
>>>>> Sent from my iPhone
>>>>> 
>>>>>> On Aug 15, 2017, at 11:59, Mahak Goel <mahakgoel94@gmail.com>
wrote:
>>>>>> 
>>>>>> Hmm okay, that's what I'm trying to do but maybe I'm doing it wrong.

>>>>>> 
>>>>>> 
>>>>>> Config config = new Config();
>>>>>> SleepSpoutWaitStrategy strategy = new SleepSpoutWaitStrategy();
>>>>>> config.put(org.apache.storm.Config.TOPOLOGY_SPOUT_WAIT_STRATEGY,
strategy);
>>>>>> config.put(org.apache.storm.Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS,
10);
>>>>>> builder.setSpout(...).addConfiguration(config);
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Sent from my iPhone
>>>>>> On Aug 15, 2017, at 11:51, Stig Rohde Døssing <srdo@apache.org>
wrote:
>>>>>> 
>>>>>>> I think I might have misread the code. It looks like the method
I linked does the opposite of what I thought, and removes only the configuration that is not
listed in the link. I would expect using SpoutDeclarer.addConfiguration to work then.
>>>>>>> 
>>>>>>> 2017-08-15 17:36 GMT+02:00 Mahak Goel <mahakgoel94@gmail.com>:
>>>>>>>> Text from post. 
>>>>>>>> 
>>>>>>>> 2. Spout wait strategies: There's two situations in which
a spout needs to wait. The first is when the max spout pending limit is reached. The second
is when nothing is emitted from nextTuple. Previously, Storm would just have that spout sit
in a busy loop in those cases. What Storm does in those situations is now pluggable, and the
default is now for the spout to sleep for 1 ms. This will cause the spout to use dramatically
less CPU when it hits those cases, and it also obviates the need for spouts to do any sleeping
in their implementation to be "polite". The wait strategy can be configured with TOPOLOGY_SPOUT_WAIT_STRATEGY
and can be configured on a spout by spout basis. The interface to implement for a wait strategy
is backtype.storm.spout.ISpoutWaitStrategy
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Aug 15, 2017, at 11:34, Mahak Goel <mahakgoel94@gmail.com>
wrote:
>>>>>>>>> 
>>>>>>>>> I tried adding TOPOLOGY_SPOUT_WAIT_STRATEGY  and TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS
in the spouts config but that didn't seem to have an effect. 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Aug 15, 2017, at 11:28, Mahak Goel <mahakgoel94@gmail.com>
wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi Stig, 
>>>>>>>>>> 
>>>>>>>>>> Thank you. However it looks like from this post there
is a way to do it on a per spout basis. 
>>>>>>>>>> https://groups.google.com/forum/m/#!search/Storm$200.8.1$20released/storm-user/hVbXtBdCkQo
>>>>>>>>>> 
>>>>>>>>>> Do you or does anyone else know if this is still
a possibility? If so, how do I do it? 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> On Aug 15, 2017, at 11:14, Stig Rohde Døssing
<srdo@apache.org> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Mahak,
>>>>>>>>>>> 
>>>>>>>>>>> I haven't checked in any detail, but I suspect
there isn't. I'd have said you could set the configuration for the spout via the SpoutDeclarer
addConfiguration methods when declaring the spout, but it looks like the wait strategy and
backoff are both removed from the component configuration, and only read from the topology
level configuration https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/executor/Executor.java#L431.

>>>>>>>>>>> 
>>>>>>>>>>> 2017-08-15 16:45 GMT+02:00 Brian Taylor <brian@resolvingarchitecture.com>:
>>>>>>>>>>>> Unsubscribe
>>>>>>>>>>>> 
>>>>>>>>>>>> Sent from BlueMail
>>>>>>>>>>>>> On Aug 15, 2017, at 10:34 AM, Mahak Goel
<mahakgoel94@gmail.com> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I know I can configure a sleep wait strategy
in the defaults.yaml and that will apply to all spouts in the topology. Is there a way to
do this on a spout by spout basis? That is, is there a way to configure different times for
different spouts? 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>> 
>>>>>>> 
>>>> 
> 

Mime
View raw message