storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrey Yegorov <>
Subject Re: FieldsGrouping at KafkaSpout
Date Tue, 06 Oct 2015 00:27:03 GMT
Hi John,

I guess you are using acks for messages between Bolt1 and Bolt2. If you do
not really need acks, disabling them could help.
in the same situation I did the following:

1. read
and spent some time tuning storm topology to improve throughput. Also make
sure you have one acker per worker.

2. Tuned GC for workers.

3. wrote custom stream grouping (GroupLocal(..)) which groups by field but
directs tuple to the bolt in the same worker process to use faster
inter-process data transfer. As result I have to shuffle data once between
machines (kafkaSpout to Bolt1) while the rest of flow works within the same
worker despite being grouped by field name. I still rely on default storm
scheduler to schedule bolts to workers, it schedules tasks evenly enough.

Andrey Yegorov

On Mon, Oct 5, 2015 at 4:45 PM, Enno Shioji <> wrote:

> If you are doing unnecessary repartitioning (sending tuples to remote
> processes) now and if you can remove/reduce that, that could help a lot.
> That said it's not obvious to me if that's the case here (e.g. is Kafka
> partitioned in a way you could exploit it to reduce repatriating?).
> If the above is not the case, it's not clear to me why it should help,
> because it's just the same work being done in a different place. I think it
> won't hurt to try though.
> Another thing you could try is to combine Bolt 1 and Bolt 2 to one bolt.
> That's what Trident tries to do by default for optimization.
> On 5 Oct 2015, at 16:38, Javier Gonzalez <> wrote:
> If you get one bolt2 per worker, it should work as you say. Though I'm not
> completely sure it's *guaranteed* that every mesage will go local.
> Regards,
> Javier
> On Oct 5, 2015 10:01 AM, "John Yost" <> wrote:
>> Hi Javier,
>> I apologize, I don't think I am making myself clear. I am attempting to
>> get all the tuples for a given key sent to the same Bolt 2 executor
>> instance. I previously followed the pattern of using fieldsGrouping on
>> Bolt1 as this is a well-established pattern.  However, there are roughly 4
>> times as many Bolt 1 executors to every Bolt 2 executor, and I was finding
>> the throughput was very low between Bolts 1 and 2.  Once I switched to
>> localOrShuffleGrouping between Bolt 1 and Bolt 2, the throughput tripled. I
>> did this based upon advice from this board to do localOrShuffleGrouping for
>> large fan-in patterns like this (great advice, definitely worked great!).
>> Unfortunately, this also means that there is no guarantee that all tuples
>> for a given key will be sent to the same Bolt 2. To hopefully get the best
>> of both worlds, I am thinking I can do the fieldsGrouping between
>> KafkaSpout and Bolt 1, and therefore I get the same effect of all tuples
>> for a given key going to the same Bolt 2. Of course, the key (pun intended)
>> is that there is one Bolt 2 per worker, which will ensure all tuples for
>> the same key will go to the same Bolt 1 which will then forward 'em to Bolt
>> 2.
>> Please confirm if this seems logical and that it should work. I think it
>> should, but I may be missing something.
>> Thanks! :)
>> --John
>> On Mon, Oct 5, 2015 at 9:20 AM, Javier Gonzalez <>
>> wrote:
>>> If I'm reading this correctly, I think you're not getting the result you
>>> want - having all tuples with a given key processed in the same bolt2
>>> instance.
>>> If you want to have all messages of a given key to be processed in the
>>> same Bolt2, you need to do fields grouping from bolt1 to bolt2. By doing
>>> fields grouping in the spout-bolt1 hop and shuffle/local in the bolt1-bolt2
>>> hop, you're ensuring that bolt1 instances always see the same key, but is
>>> there any guarantee that the bolt2 you want is the nearest/only local bolt
>>> available to any given instance of bolt1?
>>> Regards,
>>> Javier
>>> On Oct 5, 2015 7:33 AM, "John Yost" <> wrote:
>>>> Hi Everyone,
>>>> I am currently prototyping FieldsGrouping at the KafkaSpout vs Bolt
>>>> level. I am curious as to whether anyone else has tried this and, if so,
>>>> how well this worked.
>>>> The reason I am attempting to do FieldsGrouping in the KafkaSpout is
>>>> that I moved from fieldsGrouping to localOrShuffleGrouping between Bolt 1
>>>> and Bolt 2 in my topology due to a 4 to 1 fan in from Bolt 1 to Bolt 2 (for
>>>> example, 200 Bolt 1 executors and 50 Bolt 2 executors) which was
>>>> dramatically slowing throughput. It is still highly preferable to do
>>>> fieldsGrouping one way or another so that I am getting all values for a
>>>> given key to the same Bolt 2 executor, which is the impetus for attempting
>>>> to do fieldsGrouping in the KafkaSpout.
>>>> If anyone has any thoughts on this approach, I'd very much like to get
>>>> your thoughts.
>>>> Thanks
>>>> --John

View raw message