storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Yost <>
Subject Re: FieldsGrouping at KafkaSpout
Date Tue, 06 Oct 2015 00:16:56 GMT
Hi Enno,

I am thinking that the 4 to 1 fan-in is the problem, and if I can do the
fields grouping on the fan-out from the KafkaSpout, hopefully that will be

One issue one of my co-workers told me is the number of Kafka partitions
(and therefore KafkaSpout executors) limits the number of Bolt 1 executors
if I am using fieldsGrouping on the KafkaSpout-Bolt 1 emit..  That will be
a showstopper. Are you aware of this limitation?


On Mon, Oct 5, 2015 at 7:45 PM, Enno Shioji <> wrote:

> If you are doing unnecessary repartitioning (sending tuples to remote
> processes) now and if you can remove/reduce that, that could help a lot.
> That said it's not obvious to me if that's the case here (e.g. is Kafka
> partitioned in a way you could exploit it to reduce repatriating?).
> If the above is not the case, it's not clear to me why it should help,
> because it's just the same work being done in a different place. I think it
> won't hurt to try though.
> Another thing you could try is to combine Bolt 1 and Bolt 2 to one bolt.
> That's what Trident tries to do by default for optimization.
> On 5 Oct 2015, at 16:38, Javier Gonzalez <> wrote:
> If you get one bolt2 per worker, it should work as you say. Though I'm not
> completely sure it's *guaranteed* that every mesage will go local.
> Regards,
> Javier
> On Oct 5, 2015 10:01 AM, "John Yost" <> wrote:
>> Hi Javier,
>> I apologize, I don't think I am making myself clear. I am attempting to
>> get all the tuples for a given key sent to the same Bolt 2 executor
>> instance. I previously followed the pattern of using fieldsGrouping on
>> Bolt1 as this is a well-established pattern.  However, there are roughly 4
>> times as many Bolt 1 executors to every Bolt 2 executor, and I was finding
>> the throughput was very low between Bolts 1 and 2.  Once I switched to
>> localOrShuffleGrouping between Bolt 1 and Bolt 2, the throughput tripled. I
>> did this based upon advice from this board to do localOrShuffleGrouping for
>> large fan-in patterns like this (great advice, definitely worked great!).
>> Unfortunately, this also means that there is no guarantee that all tuples
>> for a given key will be sent to the same Bolt 2. To hopefully get the best
>> of both worlds, I am thinking I can do the fieldsGrouping between
>> KafkaSpout and Bolt 1, and therefore I get the same effect of all tuples
>> for a given key going to the same Bolt 2. Of course, the key (pun intended)
>> is that there is one Bolt 2 per worker, which will ensure all tuples for
>> the same key will go to the same Bolt 1 which will then forward 'em to Bolt
>> 2.
>> Please confirm if this seems logical and that it should work. I think it
>> should, but I may be missing something.
>> Thanks! :)
>> --John
>> On Mon, Oct 5, 2015 at 9:20 AM, Javier Gonzalez <>
>> wrote:
>>> If I'm reading this correctly, I think you're not getting the result you
>>> want - having all tuples with a given key processed in the same bolt2
>>> instance.
>>> If you want to have all messages of a given key to be processed in the
>>> same Bolt2, you need to do fields grouping from bolt1 to bolt2. By doing
>>> fields grouping in the spout-bolt1 hop and shuffle/local in the bolt1-bolt2
>>> hop, you're ensuring that bolt1 instances always see the same key, but is
>>> there any guarantee that the bolt2 you want is the nearest/only local bolt
>>> available to any given instance of bolt1?
>>> Regards,
>>> Javier
>>> On Oct 5, 2015 7:33 AM, "John Yost" <> wrote:
>>>> Hi Everyone,
>>>> I am currently prototyping FieldsGrouping at the KafkaSpout vs Bolt
>>>> level. I am curious as to whether anyone else has tried this and, if so,
>>>> how well this worked.
>>>> The reason I am attempting to do FieldsGrouping in the KafkaSpout is
>>>> that I moved from fieldsGrouping to localOrShuffleGrouping between Bolt 1
>>>> and Bolt 2 in my topology due to a 4 to 1 fan in from Bolt 1 to Bolt 2 (for
>>>> example, 200 Bolt 1 executors and 50 Bolt 2 executors) which was
>>>> dramatically slowing throughput. It is still highly preferable to do
>>>> fieldsGrouping one way or another so that I am getting all values for a
>>>> given key to the same Bolt 2 executor, which is the impetus for attempting
>>>> to do fieldsGrouping in the KafkaSpout.
>>>> If anyone has any thoughts on this approach, I'd very much like to get
>>>> your thoughts.
>>>> Thanks
>>>> --John

View raw message