storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sandeep Samudrala <>
Subject Re: Introduce lag into a topology spout to process the events with a delay.
Date Thu, 23 Feb 2017 08:34:15 GMT
Running both streams in same topology doesn't work for me as the stream B
events can come very late up to 24 hours.

Sleep doesn't work as it will slow down the topology, I want the topology
to be running as fast as possible but only with delay so as to ensure
enrichment in the first trial itself.

To give more context. Th events coming from Stream B can come as late as up
to 24 hours and hence I keep the events in a key-value store. I am using
Normal Spout and not Trident as I will be handling event by event(record by
record). I don't think blocking at a tuple level is a good idea as it will
slow down the processing of events.

For now, I am working with a hack to check for the current backlog with the
message header in the kafka event to look for events to be processed with a
delay. Although it works to some extent I am still not able to get it fully

Please let me know If I can add more context.

On Wed, Feb 22, 2017 at 3:23 PM, Ankur Garg <> wrote:

> Are u processing the events in Storm topology in batch (Trident Spout) or
> Normal Spout .
> The way I see (this is very trivial and am sure you would have thought
> about it)  is if u can introduce sleep in the nextTuple method for Stream B
> (in case of Normal Spout) or increasing the value *topology.max.spout.pending
> in case of Trident can help you achieve better %age . You can also think of
> making nextTuple blocking (although not recommended in general as
> everything runs in a single thread so ur ack/fail/emit can get delayed but
> I believe it can be fine in your case). *
> *Alternatively , since almost both the streams are real time , u could
> read from both streams in the same spout and then do enriching instead of
> writing the stream A into some key value store and then perform enriching .*
> *Obviously , I am making lot of assumptions here since they are not
> mentioned in the question and I am not aware of full context of the problem
> too . *
> *Hope this helps*
> *Ankur*
> On Wed, 22 Feb 2017 at 11:22 Sandeep Samudrala <>
> wrote:
>> Yes. I am reading both the streams from kafka as part of a topology.
>> On Wed, Feb 22, 2017 at 12:39 AM, Ankur Garg <>
>> wrote:
>> Hi Sandeep ,
>> One question :- how are you reading Streams B and A . Are u reading from
>> some messaging queue (Kafka , Rabbit Mq etc.) with some spout (as part of
>> some topology) reading from them . Please confirm .
>> Thanks
>> Ankur
>> On Tue, 21 Feb 2017 at 15:28 Sandeep Samudrala <>
>> wrote:
>> Hello,
>>  I have two streams A and B. I need to enrich events coming from stream B
>> with events coming from A and I store events coming from A in a key-value
>> store to enrich events from B. Events that doesn't get enriched are sent to
>> a deferred queue(kafka stream) and are read back later.
>> Most of the the time the events from Stream B are sent to defer queue
>> because of bit delay in storing the events into a key-value store from
>> Stream A and events coming into A and B are almost real time.
>> I want to introduce a delay into reading into my spout reading from
>> Stream B so as to make sure higher % of events get enriched in first shot
>> rather than getting enriched post reading from defer queue. I tried putting
>> a check on the lag and controlling on the backlog queue to get a hold but
>> didn't seemed right and would enter into draining and other issues.
>> Is there a way in the kafka consumer or Storm spout to control the data
>> in flow to come with delay for processing?
>> Thanks,
>> -sandeep.

View raw message