kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: New CoGroup, how to do a left join
Date Thu, 16 Apr 2020 23:27:32 GMT
Hello Murilo,

Thanks for your interests in KIP-150.

As we discussed in the KIP, the scope of this co-group is for stream
co-aggregation. For your case, the first joining table is not from the
aggregation but is a source table itself, in this case it cannot be
included in the co-group of KIP-150.

Although we discussed about extending it to KTable multi-joins as well it
is not included in the current release, one (awkward) walk-around I can
think of for now, is to transform your `customer` table as a
"builder.stream("customer").aggregate(/*a dummy reducer that just
materialize the stream records*/)" and then it can be included in the


On Thu, Apr 16, 2020 at 1:23 PM Murilo Tavares <murilofla@gmail.com> wrote:

> Hi
> I'm really excited about the new release for KafkaStreams.
> I've been watching the new CoGroup feature, and now that this is out, I'm
> trying to play around with it.
> I wonder what would be the best way to do a
> KTable.leftJoin(otherTable).leftJoin(yetAnotherTable)...
> Taking the Customer example in the KIP (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> ),
> how would one do the cogroup if we had a Customer topic as well?
> In the old API, we would have
> KTable<Long, Customer> customer = builder.table("customer");
> KTable<K, V1> carts =
> builder.table("carts").groupByKey().aggregate(initializer1,
> aggregator1, materialized1);
> KTable<K, V2> purchases =
> builder.stream("purchases").groupByKey().aggregate(initializer2,
> aggregator2, materialized2);
> KTable<K, CG> final = customer.leftJoin(carts,
> joinerOneAndTwo).leftJoin(purchases, joinerOneTwoAndThree);
> Thanks
> Murilo

-- Guozhang

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message