kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Cadonna <br...@confluent.io>
Subject Re: Using processor API via DSL
Date Sat, 13 Apr 2019 00:03:39 GMT
Hi Alessandro,

Have you considered using `transform()` (actually in your case you should
use `transformValues()`) instead of `.process()`? `transform()` and
`transformValues()` are stateful operations similar to `.process` but they
return a `KStream`. On a `KStream` you can then apply a windowed
aggregation.

Hope that helps.

Best,
Bruno


On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Hi there,
>
> I'm just starting with Kafka and I'm trying to create a stream processor
> that in multiple stages:
>  - filters messages using a kv store so that only messages with higher
> timestamp gets processed
>  - aggregates the message metrics by minute giving e.g. the avg of those
> metrics in that minute
>
> The message is simple, the key is the sensor ID and the value is e.g. {
> timestamp: UNIX_TIMESTAMP, speed: INT }.
>
> I've started by creating a processor to use the kv store and filter old
> messages:
>
>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
>
> Then I was trying to implement windowing, I saw very nice windowing
> examples for the DSL but none for the Processor API (only a small reference
> to the windowed store), can someone point me in the right direction?
>
> Now, since I wasn't able to find any example I tried to use the DSL but
> haven't found a way to use my processor with it, I saw this
>
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> but
> it explains mostly transformers not processors. I also saw after that the
> example usage of the processor but `.process(...)` returns void, so I
> cannot have a KStream from a processor?
>
> Thank you all in advance
>
> --
> Alessandro Tagliapietra
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message