kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: How to process KStream windowed message parallelly
Date Fri, 14 Jun 2019 18:33:27 GMT
There is no built-in support for this atm.

Async processing support as suggested via KIP-408 might help in the
future. But there in not much activity on this KIP atm.
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams)

> If I add submit Threads in foreach to thread pool, then I am not understanding how can
I block committing the message until it is successfully executed

Well. You can't block committing.

What you could try instead is, to use a `transform()` instead of
`foreach()` and attach a state store to transform. Each time you receive
an input message, you first put it into the store. If the message is
successfully processed, you remove if from the store. If you crash and
restart, you check the store for pending messages and retry processing them.


-Matthias


On 6/13/19 1:52 PM, Divya Goel wrote:
> Hi,
> 
> I have the requirement to dedup messages within the window and take bunch of actions
on the filtered message. I understand that we can get parallelism with the number of Kstream
thread and can get maximum parallelism as number of partitions. But the actions that I take
on the filtered message are various IO operations. As my applications is IO bound, I want
to be able to execute multiple message after the window parallelly, and not sequentially.
> 
> I am using following code to start with. The forEach allows to execute messages sequentially.
If I add submit Threads in foreach to thread pool, then I am not understanding how can I block
committing the message until it is successfully executed. Some message executed in the window
can fail as well. Please let me know, if you have any suggestions to process the windowed
messages parallelly with Kstream.
> 
> builder.<String, String>stream(topic)
>         .groupByKey()
>         .windowedBy(TimeWindows.of(Duration.ofMillis(windowedTime)).advanceBy(Duration.ofMillis(windowedTime)))
>         .reduce((value1, value2) -> value2, Materialized.as(reducerTopic))
>         .toStream()
>         .foreach((key, value) -> System.out.println(key + " => " + value));
> 
> Thanks,
> Divya
> 


Mime
View raw message