kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alessandro Tagliapietra <tagliapietra.alessan...@gmail.com>
Subject Re: Using processor API via DSL
Date Tue, 16 Apr 2019 06:01:51 GMT
Thank you Bruno,

I'll look into those, however average is just a simple thing I'm trying
right now just to get an initial windowing flow working.
In the future I'll probably still need the actual values for other
calculations. We won't have more than 60 elements per window for sure.

So far to not manually serialize/deserialize the array list I've created an
Avro model with an array field containing the values.
I had issues with suppress as explained here

https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198

but I got that working.
So far everything seems to be working, except a couple things:
 - if I generate data with 1 key, I correctly get a value each 10 seconds,
if I later start generating data with another key (while key 1 is still
generating) the windowing emits a value only after the timestamp of key 2
reaches the last generated window
 - while generating data, if I restart the processor as soon as it starts
it sometimes generates 2 aggregates for the same window even if I'm using
the suppress

Anyway, I'll look into your link and try to find out the cause of these
issues, probably starting from scratch with a simpler example

Thank you for your help!

--
Alessandro Tagliapietra

On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <bruno@confluent.io> wrote:

> Hi Alessandro,
>
> Have a look at this Kafka Usage Pattern for computing averages without
> using an ArrayList.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> ?
>
> The advantages of this pattern over the ArrayList approach is the reduced
> space needed to compute the aggregate. Note that you will still need to
> implement a SerDe. However, the SerDe should be a bit easier to implement
> than a SerDe for an ArrayList.
>
> Hope that helps.
>
> Best,
> Bruno
>
> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > Sorry but it seemed harder than I thought,
> >
> > to have the custom aggregation working I need to get an ArrayList of all
> > the values in the window, so far my aggregate DSL method creates an
> > ArrayList on the initializer and adds each value to the list in the
> > aggregator.
> > Then I think I'll have to provide a serder to change the output type of
> > that method.
> > I was looking at
> >
> >
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> > but
> > that seems more towards a list of longs and already uses longSerde.
> > I'm currently trying to implement another avro model that has a field of
> > type array so I can use the regular avro serializer to implement this.
> > Should I create my own serdes instead or is this the right way?
> >
> > Thank you in advance
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> > tagliapietra.alessandro@gmail.com> wrote:
> >
> > > Thank you Bruno and Matthias,
> > >
> > > I've modified the transformer to implement the ValueTransformerWithKey
> > > interface and everything is working fine.
> > > I've now to window the data and manually aggregate each window data
> since
> > > I've to do some averages and sum of differences.
> > > So far I've just having some issues with message types since I'm
> changing
> > > the data type when aggregating the window but I think it's an easy
> > problem.
> > >
> > > Thank you again
> > > Best
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <bruno@confluent.io>
> > wrote:
> > >
> > >> Hi Alessandro,
> > >>
> > >> the `TransformSupplier` is internally wrapped with a
> > `ProcessorSupplier`,
> > >> so the statement
> > >>
> > >> `transform` is essentially equivalent to adding the Transformer via
> > >> Topology#addProcessor() to your processor topology
> > >>
> > >> is correct.
> > >>
> > >> If you do not change the key, you should definitely use one of the
> > >> overloads of `transformValues` to avoid internal data redistribution.
> In
> > >> your case the overload with `ValueTransformerWithKeySupplier` as
> > suggested
> > >> by Matthias would fit.
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> matthias@confluent.io
> > >
> > >> wrote:
> > >>
> > >> > There is also `ValueTransformerWithKey` that gives you read-only
> acess
> > >> > to the key.
> > >> >
> > >> > -Matthias
> > >> >
> > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > >> > > Hi Bruno,
> > >> > >
> > >> > > Thank you for the quick answer.
> > >> > >
> > >> > > I'm actually trying to do that since it seems there is really
no
> way
> > >> to
> > >> > > have it use `Processor<K, V>`.
> > >> > > I just wanted (if that would've made any sense) to use the
> Processor
> > >> in
> > >> > > both DSL and non-DSL pipelines.
> > >> > >
> > >> > > Anyway, regarding `transformValues()` I don't think I can use
it
> as
> > I
> > >> > need
> > >> > > the message key since that is the discriminating value for the
> > filter
> > >> (I
> > >> > > want to exclude old values per sensor ID so per message key)
> > >> > >
> > >> > > Right now I've this
> > >> > >
> > >> >
> > >>
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > >> > > and
> > >> > > i'm using it with `transform()` .
> > >> > >
> > >> > > One thing I've found confusing is this
> > >> > >
> > >> >
> > >>
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> > >> > >
> > >> > > transform is essentially equivalent to adding the Transformer
via
> > >> > >> Topology#addProcessor() to yourprocessor topology
> > >> > >> <
> > >> >
> > >>
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> > >> > >
> > >> > >> .
> > >> > >
> > >> > >
> > >> > > is it? Doesn't `transform` need a TransformSupplier while
> > >> `addProcessor`
> > >> > > uses a ProcessorSupplier?
> > >> > >
> > >> > > Thank you again for your help
> > >> > >
> > >> > > --
> > >> > > Alessandro Tagliapietra
> > >> > >
> > >> > >
> > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <bruno@confluent.io
> >
> > >> > wrote:
> > >> > >
> > >> > >> Hi Alessandro,
> > >> > >>
> > >> > >> Have you considered using `transform()` (actually in your
case
> you
> > >> > should
> > >> > >> use `transformValues()`) instead of `.process()`? `transform()`
> and
> > >> > >> `transformValues()` are stateful operations similar to `.process`
> > but
> > >> > they
> > >> > >> return a `KStream`. On a `KStream` you can then apply a windowed
> > >> > >> aggregation.
> > >> > >>
> > >> > >> Hope that helps.
> > >> > >>
> > >> > >> Best,
> > >> > >> Bruno
> > >> > >>
> > >> > >>
> > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
> > >> > >> tagliapietra.alessandro@gmail.com> wrote:
> > >> > >>
> > >> > >>> Hi there,
> > >> > >>>
> > >> > >>> I'm just starting with Kafka and I'm trying to create
a stream
> > >> > processor
> > >> > >>> that in multiple stages:
> > >> > >>>  - filters messages using a kv store so that only messages
with
> > >> higher
> > >> > >>> timestamp gets processed
> > >> > >>>  - aggregates the message metrics by minute giving e.g.
the avg
> of
> > >> > those
> > >> > >>> metrics in that minute
> > >> > >>>
> > >> > >>> The message is simple, the key is the sensor ID and the
value is
> > >> e.g. {
> > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> > >> > >>>
> > >> > >>> I've started by creating a processor to use the kv store
and
> > filter
> > >> old
> > >> > >>> messages:
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> > >> > >>>
> > >> > >>> Then I was trying to implement windowing, I saw very
nice
> > windowing
> > >> > >>> examples for the DSL but none for the Processor API (only
a
> small
> > >> > >> reference
> > >> > >>> to the windowed store), can someone point me in the right
> > direction?
> > >> > >>>
> > >> > >>> Now, since I wasn't able to find any example I tried
to use the
> > DSL
> > >> but
> > >> > >>> haven't found a way to use my processor with it, I saw
this
> > >> > >>>
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > >> > >>> but
> > >> > >>> it explains mostly transformers not processors. I also
saw after
> > >> that
> > >> > the
> > >> > >>> example usage of the processor but `.process(...)` returns
void,
> > so
> > >> I
> > >> > >>> cannot have a KStream from a processor?
> > >> > >>>
> > >> > >>> Thank you all in advance
> > >> > >>>
> > >> > >>> --
> > >> > >>> Alessandro Tagliapietra
> > >> > >>>
> > >> > >>
> > >> > >
> > >> >
> > >> >
> > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message