kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alessandro Tagliapietra <tagliapietra.alessan...@gmail.com>
Subject Using processor API via DSL
Date Fri, 12 Apr 2019 23:31:26 GMT
Hi there,

I'm just starting with Kafka and I'm trying to create a stream processor
that in multiple stages:
 - filters messages using a kv store so that only messages with higher
timestamp gets processed
 - aggregates the message metrics by minute giving e.g. the avg of those
metrics in that minute

The message is simple, the key is the sensor ID and the value is e.g. {
timestamp: UNIX_TIMESTAMP, speed: INT }.

I've started by creating a processor to use the kv store and filter old
messages:

https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java

Then I was trying to implement windowing, I saw very nice windowing
examples for the DSL but none for the Processor API (only a small reference
to the windowed store), can someone point me in the right direction?

Now, since I wasn't able to find any example I tried to use the DSL but
haven't found a way to use my processor with it, I saw this
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
but
it explains mostly transformers not processors. I also saw after that the
example usage of the processor but `.process(...)` returns void, so I
cannot have a KStream from a processor?

Thank you all in advance

--
Alessandro Tagliapietra

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message