kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Cadonna <br...@confluent.io>
Subject Re: Using processor API via DSL
Date Wed, 08 May 2019 09:11:29 GMT
Hi Alessandro,

Apologies for the late reply.

I tried the code from your repository under
https://github.com/alex88/kafka-test/tree/master and I run into a
`ClassCastException`. I think this is a bug that is described here
https://issues.apache.org/jira/browse/KAFKA-8317 .

Should I have tried one of the other branches?

Best regards,
Bruno

On Fri, May 3, 2019 at 9:33 AM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Ok so I'm not sure if I did this correctly,
>
> I've upgraded both the server (by replacing the JARs in the confluent
> docker image with those built from kafka source) and the client (by using
> the built JARs as local file dependencies).
> I've used this as source: https://github.com/apache/kafka/archive/2.2.zip
> When the server runs it prints:
>
> INFO Kafka version: 2.2.1-SNAPSHOT
> (org.apache.kafka.common.utils.AppInfoParser).
>
> and regarding the client I don't see any kafka jars in the "External
> libraries" of the IntelliJ project tab so I think it's using the local JARs
> (2.2.1-SNAPSHOT).
>
> The problem is that the window isn't keeping the old values and still emits
> values with partially processed intervals.
>
> Just to summarize:
> https://gist.github.com/alex88/43b72e23bda9e15657b008855e1904db
>
>  - consumer emits one message per second with production = 1
>  - windowing stream should emit one message each 10 seconds with the sum of
> productions (so production = 10)
>
> If I restart the stream processor, it emits window functions with partial
> data (production < 10) as you can see from the logs.
> I've checked the JAR file and it seems to include changes from
> https://github.com/apache/kafka/pull/6623 (it has the newly
> added FixedOrderMap class)
>
> Even after removing the suppress() the error seems to persist (look at
> consumer_nosuppress), here it seems it loses track of the contents of the
> window:
>
> S1 with computed metric {"timestamp": 50000, "production": 10}
> S1 with computed metric {"timestamp": 60000, "production": 1}
> S1 with computed metric {"timestamp": 60000, "production": 2}
> S1 with computed metric {"timestamp": 60000, "production": 3}
> S1 with computed metric {"timestamp": 60000, "production": 4}
> -- RESTART --
> S1 with computed metric {"timestamp": 60000, "production": 1}
> S1 with computed metric {"timestamp": 60000, "production": 2}
> S1 with computed metric {"timestamp": 60000, "production": 3}
> S1 with computed metric {"timestamp": 60000, "production": 4}
> S1 with computed metric {"timestamp": 60000, "production": 5}
> S1 with computed metric {"timestamp": 60000, "production": 6}
> S1 with computed metric {"timestamp": 70000, "production": 1}
>
> after restart during the 60 seconds window the sum restarts.
>
> Is it something wrong with my implementation?
>
> --
> Alessandro Tagliapietra
>
> On Thu, May 2, 2019 at 7:58 PM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > thank you for your help, glad to hear that those are only bugs and not a
> > problem on my implementation,
> > I'm currently using confluent docker images, I've checked their master
> > branch which seems to use the SNAPSHOT version however those
> > images/packages aren't publicly available. Are there any snapshot builds
> > available?
> > In the meantime I'm trying to create a custom docker image from kafka
> > source.
> >
> > Thanks
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna <bruno@confluent.io>
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> It seems that the behaviour you described regarding the window
> aggregation
> >> is due to bugs. The good news is that the bugs have been already fixed.
> >>
> >> The relevant bug reports are
> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >> https://issues.apache.org/jira/browse/KAFKA-8204
> >>
> >> The fixes for both bugs have been already merged to the 2.2 branch.
> >>
> >> Could you please build from the 2.2 branch and confirm that the fixes
> >> solve
> >> your problem?
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra <
> >> tagliapietra.alessandro@gmail.com> wrote:
> >>
> >> > Thanks Matthias, one less thing to worry about in the future :)
> >> >
> >> > --
> >> > Alessandro Tagliapietra
> >> >
> >> >
> >> > On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <
> matthias@confluent.io
> >> >
> >> > wrote:
> >> >
> >> > > Just a side note. There is currently work in progress on
> >> > > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix
> the
> >> > > configuration problem for Serdes.
> >> > >
> >> > > -Matthias
> >> > >
> >> > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
> >> > > > Hi Bruno,
> >> > > > thanks a lot for checking the code, regarding the
> SpecificAvroSerde
> >> > I've
> >> > > > found that using
> >> > > >
> >> > > > final Serde<InputList> valueSpecificAvroSerde = new
> >> > > SpecificAvroSerde<>();
> >> > > > final Map<String, String> serdeConfig =
> >> > > > Collections.singletonMap("schema.registry.url", "
> >> http://localhost:8081
> >> > > ");
> >> > > > valueSpecificAvroSerde.configure(serdeConfig, false);
> >> > > >
> >> > > > and then in aggregate()
> >> > > >
> >> > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde)
> >> > > >
> >> > > > fixed the issue.
> >> > > >
> >> > > > Thanks in advance for the windowing help, very appreciated.
> >> > > > In the meantime I'll try to make some progress on the rest.
> >> > > >
> >> > > > Have a great weekend
> >> > > >
> >> > > > --
> >> > > > Alessandro Tagliapietra
> >> > > >
> >> > > >
> >> > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <bruno@confluent.io
> >
> >> > > wrote:
> >> > > >
> >> > > >> Hi Alessandro,
> >> > > >>
> >> > > >> I had a look at your code. Regarding your question whether you
> use
> >> the
> >> > > >> SpecificAvroSerde correctly, take a look at the following
> >> > documentation:
> >> > > >>
> >> > > >>
> >> > >
> >>
> https://docs.confluent.io/current/streams/developer-guide/datatypes.html
> >> > > >>
> >> > > >> I haven't had the time yet to take a closer look at your problems
> >> with
> >> > > the
> >> > > >> aggregation. I will have a look next week.
> >> > > >>
> >> > > >> Have a nice weekend,
> >> > > >> Bruno
> >> > > >>
> >> > > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
> >> > > >> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>
> >> > > >>> So I've started with a new app with the archetype:generate as in
> >> > > >>> https://kafka.apache.org/22/documentation/streams/tutorial
> >> > > >>>
> >> > > >>> I've pushed a sample repo here:
> >> https://github.com/alex88/kafka-test
> >> > > >>> The avro schemas are a Metric with 2 fields: timestamp and
> >> production
> >> > > >> and a
> >> > > >>> MetricList with a list of records (Metric) to be able to
> manually
> >> do
> >> > > the
> >> > > >>> aggregation.
> >> > > >>> Right now the aggregation is simple just for the purpose of the
> >> > sample
> >> > > >> repo
> >> > > >>> and to easily see if we're getting wrong values.
> >> > > >>>
> >> > > >>> What I wanted to achieve is:
> >> > > >>>  - have a custom generator that generates 1 message per second
> >> with
> >> > > >>> production = 1 with 1 ore more separate message keys which in my
> >> case
> >> > > are
> >> > > >>> the sensor IDs generating the data
> >> > > >>>  - a filter that removes out of order messages by having a state
> >> that
> >> > > >>> stores key (sensorID) -> last timestamp
> >> > > >>>  - a window operation that for this example just sums the values
> >> in
> >> > > each
> >> > > >> 10
> >> > > >>> seconds windows
> >> > > >>>
> >> > > >>> To show where I'm having issues I've setup multiple branches for
> >> the
> >> > > >> repo:
> >> > > >>>  - *issue-01 <
> https://github.com/alex88/kafka-test/tree/issue-01
> >> >*
> >> > is
> >> > > >> the
> >> > > >>> one I had initially "Failed to flush state store
> >> > > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve
> >> using
> >> > > >>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
> >> > > >>>  - *issue-02 <
> https://github.com/alex88/kafka-test/tree/issue-02
> >> >*
> >> > is
> >> > > >> the
> >> > > >>> one after I've tried to solve above problem with the
> materializer
> >> > > (maybe
> >> > > >>> the SpecificAvroSerde is wrong?)
> >> > > >>>  - *issue-03 <
> https://github.com/alex88/kafka-test/tree/issue-03
> >> >*
> >> > > after
> >> > > >>> fixing issue-02 (by using
> groupByKey(Grouped.with(Serdes.String(),
> >> > new
> >> > > >>> SpecificAvroSerde<>()))) everything seems to be working, if you
> >> let
> >> > > both
> >> > > >>> the producer and stream running, you'll see that the stream
> >> receives
> >> > 10
> >> > > >>> messages (with the timestamp incrementing 1 second for each
> >> message)
> >> > > like
> >> > > >>> this:
> >> > > >>>
> >> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 163000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 164000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 165000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 166000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 167000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 168000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 169000, "production": 1}
> >> > > >>>
> >> > > >>> and at the 10 seconds interval something like:
> >> > > >>>
> >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> >> > > >>> S1 with computed metric {"timestamp": 170000, "production": 10}
> >> > > >>> S1 with computed metric {"timestamp": 180000, "production": 10}
> >> > > >>>
> >> > > >>> and so on...
> >> > > >>> Now there are two problems, after stopping and restarting the
> >> stream
> >> > > >>> processor (by sending SIGINT via IntelliJ since I start the
> class
> >> > main
> >> > > >> with
> >> > > >>> it) it happens:
> >> > > >>>  - sometimes the aggregated count is wrong, if I have it start
> >> > > windowing
> >> > > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after
> >> restart
> >> > > it
> >> > > >>> might just emit a value for the new 3 missing seconds (seconds
> >> 18-20)
> >> > > and
> >> > > >>> the aggregated value is 3 not 10
> >> > > >>>  - sometimes the window outputs twice, in the example where I
> >> restart
> >> > > the
> >> > > >>> stream processor I might get as output
> >> > > >>>
> >> > > >>> S1 with filtered metric{"timestamp": 154000, "production": 1}
> >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 5}
> >> > > >>> S1 with filtered metric{"timestamp": 155000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 156000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 157000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 158000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 159000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> >> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> >> > > >>>
> >> > > >>> as you can see, window for timestamp 160000 is duplicated
> >> > > >>>
> >> > > >>> Is this because the window state isn't persisted across
> restarts?
> >> > > >>> My ultimate goal is to have the window part emit only once and
> >> resume
> >> > > >>> processing across restarts, while avoiding processing out of
> order
> >> > data
> >> > > >>> (that's the purpose of the TimestampIncrementalFilter)
> >> > > >>>
> >> > > >>> Thank you in advance
> >> > > >>>
> >> > > >>> --
> >> > > >>> Alessandro Tagliapietra
> >> > > >>>
> >> > > >>>
> >> > > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
> >> > > >>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>
> >> > > >>>> Hi Bruno,
> >> > > >>>>
> >> > > >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2.
> >> > > >>>> Anyway I'll try to make a small reproduction repo with all the
> >> > > >> different
> >> > > >>>> cases soon.
> >> > > >>>>
> >> > > >>>> Thank you
> >> > > >>>>
> >> > > >>>> --
> >> > > >>>> Alessandro Tagliapietra
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <
> >> bruno@confluent.io>
> >> > > >>> wrote:
> >> > > >>>>
> >> > > >>>>> Hi Alessandro,
> >> > > >>>>>
> >> > > >>>>> What version of Kafka do you use?
> >> > > >>>>>
> >> > > >>>>> Could you please give a more detailed example for the issues
> >> with
> >> > the
> >> > > >>> two
> >> > > >>>>> keys you see?
> >> > > >>>>>
> >> > > >>>>> Could the following bug be related to the duplicates you see?
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
> >> > > >>>>>
> >> > > >>>>> How do you restart the processor?
> >> > > >>>>>
> >> > > >>>>> Best,
> >> > > >>>>> Bruno
> >> > > >>>>>
> >> > > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> >> > > >>>>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>>>
> >> > > >>>>>> Thank you Bruno,
> >> > > >>>>>>
> >> > > >>>>>> I'll look into those, however average is just a simple thing
> >> I'm
> >> > > >>> trying
> >> > > >>>>>> right now just to get an initial windowing flow working.
> >> > > >>>>>> In the future I'll probably still need the actual values for
> >> other
> >> > > >>>>>> calculations. We won't have more than 60 elements per window
> >> for
> >> > > >> sure.
> >> > > >>>>>>
> >> > > >>>>>> So far to not manually serialize/deserialize the array list
> >> I've
> >> > > >>>>> created an
> >> > > >>>>>> Avro model with an array field containing the values.
> >> > > >>>>>> I had issues with suppress as explained here
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> >> > > >>>>>>
> >> > > >>>>>> but I got that working.
> >> > > >>>>>> So far everything seems to be working, except a couple
> things:
> >> > > >>>>>>  - if I generate data with 1 key, I correctly get a value
> each
> >> 10
> >> > > >>>>> seconds,
> >> > > >>>>>> if I later start generating data with another key (while key
> 1
> >> is
> >> > > >>> still
> >> > > >>>>>> generating) the windowing emits a value only after the
> >> timestamp
> >> > of
> >> > > >>> key
> >> > > >>>>> 2
> >> > > >>>>>> reaches the last generated window
> >> > > >>>>>>  - while generating data, if I restart the processor as soon
> >> as it
> >> > > >>>>> starts
> >> > > >>>>>> it sometimes generates 2 aggregates for the same window even
> if
> >> > I'm
> >> > > >>>>> using
> >> > > >>>>>> the suppress
> >> > > >>>>>>
> >> > > >>>>>> Anyway, I'll look into your link and try to find out the
> cause
> >> of
> >> > > >>> these
> >> > > >>>>>> issues, probably starting from scratch with a simpler example
> >> > > >>>>>>
> >> > > >>>>>> Thank you for your help!
> >> > > >>>>>>
> >> > > >>>>>> --
> >> > > >>>>>> Alessandro Tagliapietra
> >> > > >>>>>>
> >> > > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <
> >> > bruno@confluent.io>
> >> > > >>>>> wrote:
> >> > > >>>>>>
> >> > > >>>>>>> Hi Alessandro,
> >> > > >>>>>>>
> >> > > >>>>>>> Have a look at this Kafka Usage Pattern for computing
> averages
> >> > > >>> without
> >> > > >>>>>>> using an ArrayList.
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> >> > > >>>>>>> ?
> >> > > >>>>>>>
> >> > > >>>>>>> The advantages of this pattern over the ArrayList approach
> is
> >> the
> >> > > >>>>> reduced
> >> > > >>>>>>> space needed to compute the aggregate. Note that you will
> >> still
> >> > > >> need
> >> > > >>>>> to
> >> > > >>>>>>> implement a SerDe. However, the SerDe should be a bit easier
> >> to
> >> > > >>>>> implement
> >> > > >>>>>>> than a SerDe for an ArrayList.
> >> > > >>>>>>>
> >> > > >>>>>>> Hope that helps.
> >> > > >>>>>>>
> >> > > >>>>>>> Best,
> >> > > >>>>>>> Bruno
> >> > > >>>>>>>
> >> > > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> >> > > >>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>>>>>
> >> > > >>>>>>>> Sorry but it seemed harder than I thought,
> >> > > >>>>>>>>
> >> > > >>>>>>>> to have the custom aggregation working I need to get an
> >> > > >> ArrayList
> >> > > >>> of
> >> > > >>>>>> all
> >> > > >>>>>>>> the values in the window, so far my aggregate DSL method
> >> creates
> >> > > >>> an
> >> > > >>>>>>>> ArrayList on the initializer and adds each value to the
> list
> >> in
> >> > > >>> the
> >> > > >>>>>>>> aggregator.
> >> > > >>>>>>>> Then I think I'll have to provide a serder to change the
> >> output
> >> > > >>>>> type of
> >> > > >>>>>>>> that method.
> >> > > >>>>>>>> I was looking at
> >> > > >>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> >> > > >>>>>>>> but
> >> > > >>>>>>>> that seems more towards a list of longs and already uses
> >> > > >>> longSerde.
> >> > > >>>>>>>> I'm currently trying to implement another avro model that
> >> has a
> >> > > >>>>> field
> >> > > >>>>>> of
> >> > > >>>>>>>> type array so I can use the regular avro serializer to
> >> implement
> >> > > >>>>> this.
> >> > > >>>>>>>> Should I create my own serdes instead or is this the right
> >> way?
> >> > > >>>>>>>>
> >> > > >>>>>>>> Thank you in advance
> >> > > >>>>>>>>
> >> > > >>>>>>>> --
> >> > > >>>>>>>> Alessandro Tagliapietra
> >> > > >>>>>>>>
> >> > > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> >> > > >>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>>>>>>
> >> > > >>>>>>>>> Thank you Bruno and Matthias,
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> I've modified the transformer to implement the
> >> > > >>>>>> ValueTransformerWithKey
> >> > > >>>>>>>>> interface and everything is working fine.
> >> > > >>>>>>>>> I've now to window the data and manually aggregate each
> >> window
> >> > > >>>>> data
> >> > > >>>>>>> since
> >> > > >>>>>>>>> I've to do some averages and sum of differences.
> >> > > >>>>>>>>> So far I've just having some issues with message types
> since
> >> > > >> I'm
> >> > > >>>>>>> changing
> >> > > >>>>>>>>> the data type when aggregating the window but I think it's
> >> an
> >> > > >>> easy
> >> > > >>>>>>>> problem.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Thank you again
> >> > > >>>>>>>>> Best
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> --
> >> > > >>>>>>>>> Alessandro Tagliapietra
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
> >> > > >>>>> bruno@confluent.io>
> >> > > >>>>>>>> wrote:
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>> Hi Alessandro,
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> the `TransformSupplier` is internally wrapped with a
> >> > > >>>>>>>> `ProcessorSupplier`,
> >> > > >>>>>>>>>> so the statement
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> `transform` is essentially equivalent to adding the
> >> > > >> Transformer
> >> > > >>>>> via
> >> > > >>>>>>>>>> Topology#addProcessor() to your processor topology
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> is correct.
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> If you do not change the key, you should definitely use
> one
> >> > > >> of
> >> > > >>>>> the
> >> > > >>>>>>>>>> overloads of `transformValues` to avoid internal data
> >> > > >>>>>> redistribution.
> >> > > >>>>>>> In
> >> > > >>>>>>>>>> your case the overload with
> >> `ValueTransformerWithKeySupplier`
> >> > > >>> as
> >> > > >>>>>>>> suggested
> >> > > >>>>>>>>>> by Matthias would fit.
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> Best,
> >> > > >>>>>>>>>> Bruno
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> >> > > >>>>>>> matthias@confluent.io
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>> wrote:
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you
> >> > > >>>>> read-only
> >> > > >>>>>>> acess
> >> > > >>>>>>>>>>> to the key.
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>> -Matthias
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> >> > > >>>>>>>>>>>> Hi Bruno,
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Thank you for the quick answer.
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> I'm actually trying to do that since it seems there is
> >> > > >>>>> really no
> >> > > >>>>>>> way
> >> > > >>>>>>>>>> to
> >> > > >>>>>>>>>>>> have it use `Processor<K, V>`.
> >> > > >>>>>>>>>>>> I just wanted (if that would've made any sense) to use
> >> > > >> the
> >> > > >>>>>>> Processor
> >> > > >>>>>>>>>> in
> >> > > >>>>>>>>>>>> both DSL and non-DSL pipelines.
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I
> can
> >> > > >>>>> use it
> >> > > >>>>>>> as
> >> > > >>>>>>>> I
> >> > > >>>>>>>>>>> need
> >> > > >>>>>>>>>>>> the message key since that is the discriminating value
> >> > > >> for
> >> > > >>>>> the
> >> > > >>>>>>>> filter
> >> > > >>>>>>>>>> (I
> >> > > >>>>>>>>>>>> want to exclude old values per sensor ID so per message
> >> > > >>> key)
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Right now I've this
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> >> > > >>>>>>>>>>>> and
> >> > > >>>>>>>>>>>> i'm using it with `transform()` .
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> One thing I've found confusing is this
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> transform is essentially equivalent to adding the
> >> > > >>> Transformer
> >> > > >>>>>> via
> >> > > >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology
> >> > > >>>>>>>>>>>>> <
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> .
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier
> while
> >> > > >>>>>>>>>> `addProcessor`
> >> > > >>>>>>>>>>>> uses a ProcessorSupplier?
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Thank you again for your help
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> --
> >> > > >>>>>>>>>>>> Alessandro Tagliapietra
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> >> > > >>>>>> bruno@confluent.io
> >> > > >>>>>>>>
> >> > > >>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Hi Alessandro,
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Have you considered using `transform()` (actually in
> >> > > >> your
> >> > > >>>>> case
> >> > > >>>>>>> you
> >> > > >>>>>>>>>>> should
> >> > > >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`?
> >> > > >>>>> `transform()`
> >> > > >>>>>>> and
> >> > > >>>>>>>>>>>>> `transformValues()` are stateful operations similar to
> >> > > >>>>>> `.process`
> >> > > >>>>>>>> but
> >> > > >>>>>>>>>>> they
> >> > > >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply
> a
> >> > > >>>>>> windowed
> >> > > >>>>>>>>>>>>> aggregation.
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Hope that helps.
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Best,
> >> > > >>>>>>>>>>>>> Bruno
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro
> Tagliapietra
> >> > > >> <
> >> > > >>>>>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> Hi there,
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to
> create a
> >> > > >>>>> stream
> >> > > >>>>>>>>>>> processor
> >> > > >>>>>>>>>>>>>> that in multiple stages:
> >> > > >>>>>>>>>>>>>>  - filters messages using a kv store so that only
> >> > > >>> messages
> >> > > >>>>>> with
> >> > > >>>>>>>>>> higher
> >> > > >>>>>>>>>>>>>> timestamp gets processed
> >> > > >>>>>>>>>>>>>>  - aggregates the message metrics by minute giving
> e.g.
> >> > > >>> the
> >> > > >>>>>> avg
> >> > > >>>>>>> of
> >> > > >>>>>>>>>>> those
> >> > > >>>>>>>>>>>>>> metrics in that minute
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and
> the
> >> > > >>>>> value
> >> > > >>>>>> is
> >> > > >>>>>>>>>> e.g. {
> >> > > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> I've started by creating a processor to use the kv
> >> > > >> store
> >> > > >>>>> and
> >> > > >>>>>>>> filter
> >> > > >>>>>>>>>> old
> >> > > >>>>>>>>>>>>>> messages:
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very
> >> > > >> nice
> >> > > >>>>>>>> windowing
> >> > > >>>>>>>>>>>>>> examples for the DSL but none for the Processor API
> >> > > >>> (only a
> >> > > >>>>>>> small
> >> > > >>>>>>>>>>>>> reference
> >> > > >>>>>>>>>>>>>> to the windowed store), can someone point me in the
> >> > > >> right
> >> > > >>>>>>>> direction?
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried
> to
> >> > > >>> use
> >> > > >>>>>> the
> >> > > >>>>>>>> DSL
> >> > > >>>>>>>>>> but
> >> > > >>>>>>>>>>>>>> haven't found a way to use my processor with it, I
> saw
> >> > > >>> this
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> >> > > >>>>>>>>>>>>>> but
> >> > > >>>>>>>>>>>>>> it explains mostly transformers not processors. I
> also
> >> > > >>> saw
> >> > > >>>>>> after
> >> > > >>>>>>>>>> that
> >> > > >>>>>>>>>>> the
> >> > > >>>>>>>>>>>>>> example usage of the processor but `.process(...)`
> >> > > >>> returns
> >> > > >>>>>> void,
> >> > > >>>>>>>> so
> >> > > >>>>>>>>>> I
> >> > > >>>>>>>>>>>>>> cannot have a KStream from a processor?
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> Thank you all in advance
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> --
> >> > > >>>>>>>>>>>>>> Alessandro Tagliapietra
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > > >
> >> > >
> >> > >
> >> >
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message