kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Cadonna <br...@confluent.io>
Subject Re: Using processor API via DSL
Date Tue, 16 Apr 2019 20:01:27 GMT
Hi Alessandro,

What version of Kafka do you use?

Could you please give a more detailed example for the issues with the two
keys you see?

Could the following bug be related to the duplicates you see?

https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22

How do you restart the processor?

Best,
Bruno

On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Thank you Bruno,
>
> I'll look into those, however average is just a simple thing I'm trying
> right now just to get an initial windowing flow working.
> In the future I'll probably still need the actual values for other
> calculations. We won't have more than 60 elements per window for sure.
>
> So far to not manually serialize/deserialize the array list I've created an
> Avro model with an array field containing the values.
> I had issues with suppress as explained here
>
>
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
>
> but I got that working.
> So far everything seems to be working, except a couple things:
>  - if I generate data with 1 key, I correctly get a value each 10 seconds,
> if I later start generating data with another key (while key 1 is still
> generating) the windowing emits a value only after the timestamp of key 2
> reaches the last generated window
>  - while generating data, if I restart the processor as soon as it starts
> it sometimes generates 2 aggregates for the same window even if I'm using
> the suppress
>
> Anyway, I'll look into your link and try to find out the cause of these
> issues, probably starting from scratch with a simpler example
>
> Thank you for your help!
>
> --
> Alessandro Tagliapietra
>
> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <bruno@confluent.io> wrote:
>
> > Hi Alessandro,
> >
> > Have a look at this Kafka Usage Pattern for computing averages without
> > using an ArrayList.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> > ?
> >
> > The advantages of this pattern over the ArrayList approach is the reduced
> > space needed to compute the aggregate. Note that you will still need to
> > implement a SerDe. However, the SerDe should be a bit easier to implement
> > than a SerDe for an ArrayList.
> >
> > Hope that helps.
> >
> > Best,
> > Bruno
> >
> > On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> > tagliapietra.alessandro@gmail.com> wrote:
> >
> > > Sorry but it seemed harder than I thought,
> > >
> > > to have the custom aggregation working I need to get an ArrayList of
> all
> > > the values in the window, so far my aggregate DSL method creates an
> > > ArrayList on the initializer and adds each value to the list in the
> > > aggregator.
> > > Then I think I'll have to provide a serder to change the output type of
> > > that method.
> > > I was looking at
> > >
> > >
> >
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> > > but
> > > that seems more towards a list of longs and already uses longSerde.
> > > I'm currently trying to implement another avro model that has a field
> of
> > > type array so I can use the regular avro serializer to implement this.
> > > Should I create my own serdes instead or is this the right way?
> > >
> > > Thank you in advance
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> > > tagliapietra.alessandro@gmail.com> wrote:
> > >
> > > > Thank you Bruno and Matthias,
> > > >
> > > > I've modified the transformer to implement the
> ValueTransformerWithKey
> > > > interface and everything is working fine.
> > > > I've now to window the data and manually aggregate each window data
> > since
> > > > I've to do some averages and sum of differences.
> > > > So far I've just having some issues with message types since I'm
> > changing
> > > > the data type when aggregating the window but I think it's an easy
> > > problem.
> > > >
> > > > Thank you again
> > > > Best
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <bruno@confluent.io>
> > > wrote:
> > > >
> > > >> Hi Alessandro,
> > > >>
> > > >> the `TransformSupplier` is internally wrapped with a
> > > `ProcessorSupplier`,
> > > >> so the statement
> > > >>
> > > >> `transform` is essentially equivalent to adding the Transformer via
> > > >> Topology#addProcessor() to your processor topology
> > > >>
> > > >> is correct.
> > > >>
> > > >> If you do not change the key, you should definitely use one of the
> > > >> overloads of `transformValues` to avoid internal data
> redistribution.
> > In
> > > >> your case the overload with `ValueTransformerWithKeySupplier` as
> > > suggested
> > > >> by Matthias would fit.
> > > >>
> > > >> Best,
> > > >> Bruno
> > > >>
> > > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > >> wrote:
> > > >>
> > > >> > There is also `ValueTransformerWithKey` that gives you read-only
> > acess
> > > >> > to the key.
> > > >> >
> > > >> > -Matthias
> > > >> >
> > > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > > >> > > Hi Bruno,
> > > >> > >
> > > >> > > Thank you for the quick answer.
> > > >> > >
> > > >> > > I'm actually trying to do that since it seems there is really
no
> > way
> > > >> to
> > > >> > > have it use `Processor<K, V>`.
> > > >> > > I just wanted (if that would've made any sense) to use the
> > Processor
> > > >> in
> > > >> > > both DSL and non-DSL pipelines.
> > > >> > >
> > > >> > > Anyway, regarding `transformValues()` I don't think I can
use it
> > as
> > > I
> > > >> > need
> > > >> > > the message key since that is the discriminating value for
the
> > > filter
> > > >> (I
> > > >> > > want to exclude old values per sensor ID so per message
key)
> > > >> > >
> > > >> > > Right now I've this
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > > >> > > and
> > > >> > > i'm using it with `transform()` .
> > > >> > >
> > > >> > > One thing I've found confusing is this
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> > > >> > >
> > > >> > > transform is essentially equivalent to adding the Transformer
> via
> > > >> > >> Topology#addProcessor() to yourprocessor topology
> > > >> > >> <
> > > >> >
> > > >>
> > >
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> > > >> > >
> > > >> > >> .
> > > >> > >
> > > >> > >
> > > >> > > is it? Doesn't `transform` need a TransformSupplier while
> > > >> `addProcessor`
> > > >> > > uses a ProcessorSupplier?
> > > >> > >
> > > >> > > Thank you again for your help
> > > >> > >
> > > >> > > --
> > > >> > > Alessandro Tagliapietra
> > > >> > >
> > > >> > >
> > > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> bruno@confluent.io
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > >> Hi Alessandro,
> > > >> > >>
> > > >> > >> Have you considered using `transform()` (actually in
your case
> > you
> > > >> > should
> > > >> > >> use `transformValues()`) instead of `.process()`? `transform()`
> > and
> > > >> > >> `transformValues()` are stateful operations similar
to
> `.process`
> > > but
> > > >> > they
> > > >> > >> return a `KStream`. On a `KStream` you can then apply
a
> windowed
> > > >> > >> aggregation.
> > > >> > >>
> > > >> > >> Hope that helps.
> > > >> > >>
> > > >> > >> Best,
> > > >> > >> Bruno
> > > >> > >>
> > > >> > >>
> > > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra
<
> > > >> > >> tagliapietra.alessandro@gmail.com> wrote:
> > > >> > >>
> > > >> > >>> Hi there,
> > > >> > >>>
> > > >> > >>> I'm just starting with Kafka and I'm trying to create
a stream
> > > >> > processor
> > > >> > >>> that in multiple stages:
> > > >> > >>>  - filters messages using a kv store so that only
messages
> with
> > > >> higher
> > > >> > >>> timestamp gets processed
> > > >> > >>>  - aggregates the message metrics by minute giving
e.g. the
> avg
> > of
> > > >> > those
> > > >> > >>> metrics in that minute
> > > >> > >>>
> > > >> > >>> The message is simple, the key is the sensor ID
and the value
> is
> > > >> e.g. {
> > > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> > > >> > >>>
> > > >> > >>> I've started by creating a processor to use the
kv store and
> > > filter
> > > >> old
> > > >> > >>> messages:
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> > > >> > >>>
> > > >> > >>> Then I was trying to implement windowing, I saw
very nice
> > > windowing
> > > >> > >>> examples for the DSL but none for the Processor
API (only a
> > small
> > > >> > >> reference
> > > >> > >>> to the windowed store), can someone point me in
the right
> > > direction?
> > > >> > >>>
> > > >> > >>> Now, since I wasn't able to find any example I tried
to use
> the
> > > DSL
> > > >> but
> > > >> > >>> haven't found a way to use my processor with it,
I saw this
> > > >> > >>>
> > > >> > >>>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > > >> > >>> but
> > > >> > >>> it explains mostly transformers not processors.
I also saw
> after
> > > >> that
> > > >> > the
> > > >> > >>> example usage of the processor but `.process(...)`
returns
> void,
> > > so
> > > >> I
> > > >> > >>> cannot have a KStream from a processor?
> > > >> > >>>
> > > >> > >>> Thank you all in advance
> > > >> > >>>
> > > >> > >>> --
> > > >> > >>> Alessandro Tagliapietra
> > > >> > >>>
> > > >> > >>
> > > >> > >
> > > >> >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message