storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Yost <>
Subject Re: FieldsGrouping at KafkaSpout
Date Tue, 06 Oct 2015 00:36:55 GMT
Hey Andrey,

Excellent thoughts, thanks!

I do need to ack between Bolts 1 and 2, so I cannot disable acking, but
that's a good suggestion.

Michael Noll's blog post is great and I did try tuning the send/receive
buffers and, while this helped a little, the fieldsGrouping ended up much
to slow, unfortunately.

Your custom stream grouping sounds really cool.  I don't think I can use it
for my current topology but sure seems like a good idea.  Do you have a
github repo I could check out the code from.

Thanks again for your thoughts and responding to this thread--much


On Mon, Oct 5, 2015 at 8:27 PM, Andrey Yegorov <>

> Hi John,
> I guess you are using acks for messages between Bolt1 and Bolt2. If you do
> not really need acks, disabling them could help.
> in the same situation I did the following:
> 1. read
> and spent some time tuning storm topology to improve throughput. Also make
> sure you have one acker per worker.
> 2. Tuned GC for workers.
> 3. wrote custom stream grouping (GroupLocal(..)) which groups by field but
> directs tuple to the bolt in the same worker process to use faster
> inter-process data transfer. As result I have to shuffle data once between
> machines (kafkaSpout to Bolt1) while the rest of flow works within the same
> worker despite being grouped by field name. I still rely on default storm
> scheduler to schedule bolts to workers, it schedules tasks evenly enough.
> ----------
> Andrey Yegorov
> On Mon, Oct 5, 2015 at 4:45 PM, Enno Shioji <> wrote:
>> If you are doing unnecessary repartitioning (sending tuples to remote
>> processes) now and if you can remove/reduce that, that could help a lot.
>> That said it's not obvious to me if that's the case here (e.g. is Kafka
>> partitioned in a way you could exploit it to reduce repatriating?).
>> If the above is not the case, it's not clear to me why it should help,
>> because it's just the same work being done in a different place. I think it
>> won't hurt to try though.
>> Another thing you could try is to combine Bolt 1 and Bolt 2 to one bolt.
>> That's what Trident tries to do by default for optimization.
>> On 5 Oct 2015, at 16:38, Javier Gonzalez <> wrote:
>> If you get one bolt2 per worker, it should work as you say. Though I'm
>> not completely sure it's *guaranteed* that every mesage will go local.
>> Regards,
>> Javier
>> On Oct 5, 2015 10:01 AM, "John Yost" <> wrote:
>>> Hi Javier,
>>> I apologize, I don't think I am making myself clear. I am attempting to
>>> get all the tuples for a given key sent to the same Bolt 2 executor
>>> instance. I previously followed the pattern of using fieldsGrouping on
>>> Bolt1 as this is a well-established pattern.  However, there are roughly 4
>>> times as many Bolt 1 executors to every Bolt 2 executor, and I was finding
>>> the throughput was very low between Bolts 1 and 2.  Once I switched to
>>> localOrShuffleGrouping between Bolt 1 and Bolt 2, the throughput tripled. I
>>> did this based upon advice from this board to do localOrShuffleGrouping for
>>> large fan-in patterns like this (great advice, definitely worked great!).
>>> Unfortunately, this also means that there is no guarantee that all
>>> tuples for a given key will be sent to the same Bolt 2. To hopefully get
>>> the best of both worlds, I am thinking I can do the fieldsGrouping between
>>> KafkaSpout and Bolt 1, and therefore I get the same effect of all tuples
>>> for a given key going to the same Bolt 2. Of course, the key (pun intended)
>>> is that there is one Bolt 2 per worker, which will ensure all tuples for
>>> the same key will go to the same Bolt 1 which will then forward 'em to Bolt
>>> 2.
>>> Please confirm if this seems logical and that it should work. I think it
>>> should, but I may be missing something.
>>> Thanks! :)
>>> --John
>>> On Mon, Oct 5, 2015 at 9:20 AM, Javier Gonzalez <>
>>> wrote:
>>>> If I'm reading this correctly, I think you're not getting the result
>>>> you want - having all tuples with a given key processed in the same bolt2
>>>> instance.
>>>> If you want to have all messages of a given key to be processed in the
>>>> same Bolt2, you need to do fields grouping from bolt1 to bolt2. By doing
>>>> fields grouping in the spout-bolt1 hop and shuffle/local in the bolt1-bolt2
>>>> hop, you're ensuring that bolt1 instances always see the same key, but is
>>>> there any guarantee that the bolt2 you want is the nearest/only local bolt
>>>> available to any given instance of bolt1?
>>>> Regards,
>>>> Javier
>>>> On Oct 5, 2015 7:33 AM, "John Yost" <> wrote:
>>>>> Hi Everyone,
>>>>> I am currently prototyping FieldsGrouping at the KafkaSpout vs Bolt
>>>>> level. I am curious as to whether anyone else has tried this and, if
>>>>> how well this worked.
>>>>> The reason I am attempting to do FieldsGrouping in the KafkaSpout is
>>>>> that I moved from fieldsGrouping to localOrShuffleGrouping between Bolt
>>>>> and Bolt 2 in my topology due to a 4 to 1 fan in from Bolt 1 to Bolt
2 (for
>>>>> example, 200 Bolt 1 executors and 50 Bolt 2 executors) which was
>>>>> dramatically slowing throughput. It is still highly preferable to do
>>>>> fieldsGrouping one way or another so that I am getting all values for
>>>>> given key to the same Bolt 2 executor, which is the impetus for attempting
>>>>> to do fieldsGrouping in the KafkaSpout.
>>>>> If anyone has any thoughts on this approach, I'd very much like to get
>>>>> your thoughts.
>>>>> Thanks
>>>>> --John

View raw message