kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alessandro Tagliapietra <tagliapietra.alessan...@gmail.com>
Subject Re: Using processor API via DSL
Date Wed, 17 Apr 2019 04:48:12 GMT
Hi Bruno,

I'm using the confluent docker images 5.2.1, so kafka 2.2.
Anyway I'll try to make a small reproduction repo with all the different
cases soon.

Thank you

--
Alessandro Tagliapietra


On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <bruno@confluent.io> wrote:

> Hi Alessandro,
>
> What version of Kafka do you use?
>
> Could you please give a more detailed example for the issues with the two
> keys you see?
>
> Could the following bug be related to the duplicates you see?
>
>
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
>
> How do you restart the processor?
>
> Best,
> Bruno
>
> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > Thank you Bruno,
> >
> > I'll look into those, however average is just a simple thing I'm trying
> > right now just to get an initial windowing flow working.
> > In the future I'll probably still need the actual values for other
> > calculations. We won't have more than 60 elements per window for sure.
> >
> > So far to not manually serialize/deserialize the array list I've created
> an
> > Avro model with an array field containing the values.
> > I had issues with suppress as explained here
> >
> >
> >
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> >
> > but I got that working.
> > So far everything seems to be working, except a couple things:
> >  - if I generate data with 1 key, I correctly get a value each 10
> seconds,
> > if I later start generating data with another key (while key 1 is still
> > generating) the windowing emits a value only after the timestamp of key 2
> > reaches the last generated window
> >  - while generating data, if I restart the processor as soon as it starts
> > it sometimes generates 2 aggregates for the same window even if I'm using
> > the suppress
> >
> > Anyway, I'll look into your link and try to find out the cause of these
> > issues, probably starting from scratch with a simpler example
> >
> > Thank you for your help!
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <bruno@confluent.io>
> wrote:
> >
> > > Hi Alessandro,
> > >
> > > Have a look at this Kafka Usage Pattern for computing averages without
> > > using an ArrayList.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> > > ?
> > >
> > > The advantages of this pattern over the ArrayList approach is the
> reduced
> > > space needed to compute the aggregate. Note that you will still need to
> > > implement a SerDe. However, the SerDe should be a bit easier to
> implement
> > > than a SerDe for an ArrayList.
> > >
> > > Hope that helps.
> > >
> > > Best,
> > > Bruno
> > >
> > > On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> > > tagliapietra.alessandro@gmail.com> wrote:
> > >
> > > > Sorry but it seemed harder than I thought,
> > > >
> > > > to have the custom aggregation working I need to get an ArrayList of
> > all
> > > > the values in the window, so far my aggregate DSL method creates an
> > > > ArrayList on the initializer and adds each value to the list in the
> > > > aggregator.
> > > > Then I think I'll have to provide a serder to change the output type
> of
> > > > that method.
> > > > I was looking at
> > > >
> > > >
> > >
> >
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> > > > but
> > > > that seems more towards a list of longs and already uses longSerde.
> > > > I'm currently trying to implement another avro model that has a field
> > of
> > > > type array so I can use the regular avro serializer to implement
> this.
> > > > Should I create my own serdes instead or is this the right way?
> > > >
> > > > Thank you in advance
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > > > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> > > > tagliapietra.alessandro@gmail.com> wrote:
> > > >
> > > > > Thank you Bruno and Matthias,
> > > > >
> > > > > I've modified the transformer to implement the
> > ValueTransformerWithKey
> > > > > interface and everything is working fine.
> > > > > I've now to window the data and manually aggregate each window data
> > > since
> > > > > I've to do some averages and sum of differences.
> > > > > So far I've just having some issues with message types since I'm
> > > changing
> > > > > the data type when aggregating the window but I think it's an easy
> > > > problem.
> > > > >
> > > > > Thank you again
> > > > > Best
> > > > >
> > > > > --
> > > > > Alessandro Tagliapietra
> > > > >
> > > > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <bruno@confluent.io
> >
> > > > wrote:
> > > > >
> > > > >> Hi Alessandro,
> > > > >>
> > > > >> the `TransformSupplier` is internally wrapped with a
> > > > `ProcessorSupplier`,
> > > > >> so the statement
> > > > >>
> > > > >> `transform` is essentially equivalent to adding the Transformer
> via
> > > > >> Topology#addProcessor() to your processor topology
> > > > >>
> > > > >> is correct.
> > > > >>
> > > > >> If you do not change the key, you should definitely use one of
the
> > > > >> overloads of `transformValues` to avoid internal data
> > redistribution.
> > > In
> > > > >> your case the overload with `ValueTransformerWithKeySupplier`
as
> > > > suggested
> > > > >> by Matthias would fit.
> > > > >>
> > > > >> Best,
> > > > >> Bruno
> > > > >>
> > > > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> > > matthias@confluent.io
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >> > There is also `ValueTransformerWithKey` that gives you read-only
> > > acess
> > > > >> > to the key.
> > > > >> >
> > > > >> > -Matthias
> > > > >> >
> > > > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > > > >> > > Hi Bruno,
> > > > >> > >
> > > > >> > > Thank you for the quick answer.
> > > > >> > >
> > > > >> > > I'm actually trying to do that since it seems there
is really
> no
> > > way
> > > > >> to
> > > > >> > > have it use `Processor<K, V>`.
> > > > >> > > I just wanted (if that would've made any sense) to
use the
> > > Processor
> > > > >> in
> > > > >> > > both DSL and non-DSL pipelines.
> > > > >> > >
> > > > >> > > Anyway, regarding `transformValues()` I don't think
I can use
> it
> > > as
> > > > I
> > > > >> > need
> > > > >> > > the message key since that is the discriminating value
for the
> > > > filter
> > > > >> (I
> > > > >> > > want to exclude old values per sensor ID so per message
key)
> > > > >> > >
> > > > >> > > Right now I've this
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > > > >> > > and
> > > > >> > > i'm using it with `transform()` .
> > > > >> > >
> > > > >> > > One thing I've found confusing is this
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> > > > >> > >
> > > > >> > > transform is essentially equivalent to adding the Transformer
> > via
> > > > >> > >> Topology#addProcessor() to yourprocessor topology
> > > > >> > >> <
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> > > > >> > >
> > > > >> > >> .
> > > > >> > >
> > > > >> > >
> > > > >> > > is it? Doesn't `transform` need a TransformSupplier
while
> > > > >> `addProcessor`
> > > > >> > > uses a ProcessorSupplier?
> > > > >> > >
> > > > >> > > Thank you again for your help
> > > > >> > >
> > > > >> > > --
> > > > >> > > Alessandro Tagliapietra
> > > > >> > >
> > > > >> > >
> > > > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> > bruno@confluent.io
> > > >
> > > > >> > wrote:
> > > > >> > >
> > > > >> > >> Hi Alessandro,
> > > > >> > >>
> > > > >> > >> Have you considered using `transform()` (actually
in your
> case
> > > you
> > > > >> > should
> > > > >> > >> use `transformValues()`) instead of `.process()`?
> `transform()`
> > > and
> > > > >> > >> `transformValues()` are stateful operations similar
to
> > `.process`
> > > > but
> > > > >> > they
> > > > >> > >> return a `KStream`. On a `KStream` you can then
apply a
> > windowed
> > > > >> > >> aggregation.
> > > > >> > >>
> > > > >> > >> Hope that helps.
> > > > >> > >>
> > > > >> > >> Best,
> > > > >> > >> Bruno
> > > > >> > >>
> > > > >> > >>
> > > > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra
<
> > > > >> > >> tagliapietra.alessandro@gmail.com> wrote:
> > > > >> > >>
> > > > >> > >>> Hi there,
> > > > >> > >>>
> > > > >> > >>> I'm just starting with Kafka and I'm trying
to create a
> stream
> > > > >> > processor
> > > > >> > >>> that in multiple stages:
> > > > >> > >>>  - filters messages using a kv store so that
only messages
> > with
> > > > >> higher
> > > > >> > >>> timestamp gets processed
> > > > >> > >>>  - aggregates the message metrics by minute
giving e.g. the
> > avg
> > > of
> > > > >> > those
> > > > >> > >>> metrics in that minute
> > > > >> > >>>
> > > > >> > >>> The message is simple, the key is the sensor
ID and the
> value
> > is
> > > > >> e.g. {
> > > > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> > > > >> > >>>
> > > > >> > >>> I've started by creating a processor to use
the kv store and
> > > > filter
> > > > >> old
> > > > >> > >>> messages:
> > > > >> > >>>
> > > > >> > >>>
> > > > >> > >>>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> > > > >> > >>>
> > > > >> > >>> Then I was trying to implement windowing, I
saw very nice
> > > > windowing
> > > > >> > >>> examples for the DSL but none for the Processor
API (only a
> > > small
> > > > >> > >> reference
> > > > >> > >>> to the windowed store), can someone point me
in the right
> > > > direction?
> > > > >> > >>>
> > > > >> > >>> Now, since I wasn't able to find any example
I tried to use
> > the
> > > > DSL
> > > > >> but
> > > > >> > >>> haven't found a way to use my processor with
it, I saw this
> > > > >> > >>>
> > > > >> > >>>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > > > >> > >>> but
> > > > >> > >>> it explains mostly transformers not processors.
I also saw
> > after
> > > > >> that
> > > > >> > the
> > > > >> > >>> example usage of the processor but `.process(...)`
returns
> > void,
> > > > so
> > > > >> I
> > > > >> > >>> cannot have a KStream from a processor?
> > > > >> > >>>
> > > > >> > >>> Thank you all in advance
> > > > >> > >>>
> > > > >> > >>> --
> > > > >> > >>> Alessandro Tagliapietra
> > > > >> > >>>
> > > > >> > >>
> > > > >> > >
> > > > >> >
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message