kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alessandro Tagliapietra <tagliapietra.alessan...@gmail.com>
Subject Re: Using processor API via DSL
Date Fri, 03 May 2019 07:33:18 GMT
Ok so I'm not sure if I did this correctly,

I've upgraded both the server (by replacing the JARs in the confluent
docker image with those built from kafka source) and the client (by using
the built JARs as local file dependencies).
I've used this as source: https://github.com/apache/kafka/archive/2.2.zip
When the server runs it prints:

INFO Kafka version: 2.2.1-SNAPSHOT
(org.apache.kafka.common.utils.AppInfoParser).

and regarding the client I don't see any kafka jars in the "External
libraries" of the IntelliJ project tab so I think it's using the local JARs
(2.2.1-SNAPSHOT).

The problem is that the window isn't keeping the old values and still emits
values with partially processed intervals.

Just to summarize:
https://gist.github.com/alex88/43b72e23bda9e15657b008855e1904db

 - consumer emits one message per second with production = 1
 - windowing stream should emit one message each 10 seconds with the sum of
productions (so production = 10)

If I restart the stream processor, it emits window functions with partial
data (production < 10) as you can see from the logs.
I've checked the JAR file and it seems to include changes from
https://github.com/apache/kafka/pull/6623 (it has the newly
added FixedOrderMap class)

Even after removing the suppress() the error seems to persist (look at
consumer_nosuppress), here it seems it loses track of the contents of the
window:

S1 with computed metric {"timestamp": 50000, "production": 10}
S1 with computed metric {"timestamp": 60000, "production": 1}
S1 with computed metric {"timestamp": 60000, "production": 2}
S1 with computed metric {"timestamp": 60000, "production": 3}
S1 with computed metric {"timestamp": 60000, "production": 4}
-- RESTART --
S1 with computed metric {"timestamp": 60000, "production": 1}
S1 with computed metric {"timestamp": 60000, "production": 2}
S1 with computed metric {"timestamp": 60000, "production": 3}
S1 with computed metric {"timestamp": 60000, "production": 4}
S1 with computed metric {"timestamp": 60000, "production": 5}
S1 with computed metric {"timestamp": 60000, "production": 6}
S1 with computed metric {"timestamp": 70000, "production": 1}

after restart during the 60 seconds window the sum restarts.

Is it something wrong with my implementation?

--
Alessandro Tagliapietra

On Thu, May 2, 2019 at 7:58 PM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Hi Bruno,
>
> thank you for your help, glad to hear that those are only bugs and not a
> problem on my implementation,
> I'm currently using confluent docker images, I've checked their master
> branch which seems to use the SNAPSHOT version however those
> images/packages aren't publicly available. Are there any snapshot builds
> available?
> In the meantime I'm trying to create a custom docker image from kafka
> source.
>
> Thanks
>
> --
> Alessandro Tagliapietra
>
> On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna <bruno@confluent.io> wrote:
>
>> Hi Alessandro,
>>
>> It seems that the behaviour you described regarding the window aggregation
>> is due to bugs. The good news is that the bugs have been already fixed.
>>
>> The relevant bug reports are
>> https://issues.apache.org/jira/browse/KAFKA-7895
>> https://issues.apache.org/jira/browse/KAFKA-8204
>>
>> The fixes for both bugs have been already merged to the 2.2 branch.
>>
>> Could you please build from the 2.2 branch and confirm that the fixes
>> solve
>> your problem?
>>
>> Best,
>> Bruno
>>
>>
>> On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra <
>> tagliapietra.alessandro@gmail.com> wrote:
>>
>> > Thanks Matthias, one less thing to worry about in the future :)
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> >
>> > On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <matthias@confluent.io
>> >
>> > wrote:
>> >
>> > > Just a side note. There is currently work in progress on
>> > > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
>> > > configuration problem for Serdes.
>> > >
>> > > -Matthias
>> > >
>> > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
>> > > > Hi Bruno,
>> > > > thanks a lot for checking the code, regarding the SpecificAvroSerde
>> > I've
>> > > > found that using
>> > > >
>> > > > final Serde<InputList> valueSpecificAvroSerde = new
>> > > SpecificAvroSerde<>();
>> > > > final Map<String, String> serdeConfig =
>> > > > Collections.singletonMap("schema.registry.url", "
>> http://localhost:8081
>> > > ");
>> > > > valueSpecificAvroSerde.configure(serdeConfig, false);
>> > > >
>> > > > and then in aggregate()
>> > > >
>> > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde)
>> > > >
>> > > > fixed the issue.
>> > > >
>> > > > Thanks in advance for the windowing help, very appreciated.
>> > > > In the meantime I'll try to make some progress on the rest.
>> > > >
>> > > > Have a great weekend
>> > > >
>> > > > --
>> > > > Alessandro Tagliapietra
>> > > >
>> > > >
>> > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <bruno@confluent.io>
>> > > wrote:
>> > > >
>> > > >> Hi Alessandro,
>> > > >>
>> > > >> I had a look at your code. Regarding your question whether you
use
>> the
>> > > >> SpecificAvroSerde correctly, take a look at the following
>> > documentation:
>> > > >>
>> > > >>
>> > >
>> https://docs.confluent.io/current/streams/developer-guide/datatypes.html
>> > > >>
>> > > >> I haven't had the time yet to take a closer look at your problems
>> with
>> > > the
>> > > >> aggregation. I will have a look next week.
>> > > >>
>> > > >> Have a nice weekend,
>> > > >> Bruno
>> > > >>
>> > > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
>> > > >> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>
>> > > >>> So I've started with a new app with the archetype:generate
as in
>> > > >>> https://kafka.apache.org/22/documentation/streams/tutorial
>> > > >>>
>> > > >>> I've pushed a sample repo here:
>> https://github.com/alex88/kafka-test
>> > > >>> The avro schemas are a Metric with 2 fields: timestamp and
>> production
>> > > >> and a
>> > > >>> MetricList with a list of records (Metric) to be able to manually
>> do
>> > > the
>> > > >>> aggregation.
>> > > >>> Right now the aggregation is simple just for the purpose of
the
>> > sample
>> > > >> repo
>> > > >>> and to easily see if we're getting wrong values.
>> > > >>>
>> > > >>> What I wanted to achieve is:
>> > > >>>  - have a custom generator that generates 1 message per second
>> with
>> > > >>> production = 1 with 1 ore more separate message keys which
in my
>> case
>> > > are
>> > > >>> the sensor IDs generating the data
>> > > >>>  - a filter that removes out of order messages by having a
state
>> that
>> > > >>> stores key (sensorID) -> last timestamp
>> > > >>>  - a window operation that for this example just sums the
values
>> in
>> > > each
>> > > >> 10
>> > > >>> seconds windows
>> > > >>>
>> > > >>> To show where I'm having issues I've setup multiple branches
for
>> the
>> > > >> repo:
>> > > >>>  - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01
>> >*
>> > is
>> > > >> the
>> > > >>> one I had initially "Failed to flush state store
>> > > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to
solve
>> using
>> > > >>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
>> > > >>>  - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02
>> >*
>> > is
>> > > >> the
>> > > >>> one after I've tried to solve above problem with the materializer
>> > > (maybe
>> > > >>> the SpecificAvroSerde is wrong?)
>> > > >>>  - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03
>> >*
>> > > after
>> > > >>> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(),
>> > new
>> > > >>> SpecificAvroSerde<>()))) everything seems to be working,
if you
>> let
>> > > both
>> > > >>> the producer and stream running, you'll see that the stream
>> receives
>> > 10
>> > > >>> messages (with the timestamp incrementing 1 second for each
>> message)
>> > > like
>> > > >>> this:
>> > > >>>
>> > > >>> S1 with filtered metric{"timestamp": 160000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 161000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 162000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 163000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 164000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 165000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 166000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 167000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 168000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 169000, "production":
1}
>> > > >>>
>> > > >>> and at the 10 seconds interval something like:
>> > > >>>
>> > > >>> S1 with computed metric {"timestamp": 160000, "production":
10}
>> > > >>> S1 with computed metric {"timestamp": 170000, "production":
10}
>> > > >>> S1 with computed metric {"timestamp": 180000, "production":
10}
>> > > >>>
>> > > >>> and so on...
>> > > >>> Now there are two problems, after stopping and restarting
the
>> stream
>> > > >>> processor (by sending SIGINT via IntelliJ since I start the
class
>> > main
>> > > >> with
>> > > >>> it) it happens:
>> > > >>>  - sometimes the aggregated count is wrong, if I have it start
>> > > windowing
>> > > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after
>> restart
>> > > it
>> > > >>> might just emit a value for the new 3 missing seconds (seconds
>> 18-20)
>> > > and
>> > > >>> the aggregated value is 3 not 10
>> > > >>>  - sometimes the window outputs twice, in the example where
I
>> restart
>> > > the
>> > > >>> stream processor I might get as output
>> > > >>>
>> > > >>> S1 with filtered metric{"timestamp": 154000, "production":
1}
>> > > >>> S1 with computed metric {"timestamp": 160000, "production":
5}
>> > > >>> S1 with filtered metric{"timestamp": 155000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 156000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 157000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 158000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 159000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 160000, "production":
1}
>> > > >>> S1 with filtered metric{"timestamp": 161000, "production":
1}
>> > > >>> S1 with computed metric {"timestamp": 160000, "production":
10}
>> > > >>> S1 with filtered metric{"timestamp": 162000, "production":
1}
>> > > >>>
>> > > >>> as you can see, window for timestamp 160000 is duplicated
>> > > >>>
>> > > >>> Is this because the window state isn't persisted across restarts?
>> > > >>> My ultimate goal is to have the window part emit only once
and
>> resume
>> > > >>> processing across restarts, while avoiding processing out
of order
>> > data
>> > > >>> (that's the purpose of the TimestampIncrementalFilter)
>> > > >>>
>> > > >>> Thank you in advance
>> > > >>>
>> > > >>> --
>> > > >>> Alessandro Tagliapietra
>> > > >>>
>> > > >>>
>> > > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
>> > > >>> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>>
>> > > >>>> Hi Bruno,
>> > > >>>>
>> > > >>>> I'm using the confluent docker images 5.2.1, so kafka
2.2.
>> > > >>>> Anyway I'll try to make a small reproduction repo with
all the
>> > > >> different
>> > > >>>> cases soon.
>> > > >>>>
>> > > >>>> Thank you
>> > > >>>>
>> > > >>>> --
>> > > >>>> Alessandro Tagliapietra
>> > > >>>>
>> > > >>>>
>> > > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <
>> bruno@confluent.io>
>> > > >>> wrote:
>> > > >>>>
>> > > >>>>> Hi Alessandro,
>> > > >>>>>
>> > > >>>>> What version of Kafka do you use?
>> > > >>>>>
>> > > >>>>> Could you please give a more detailed example for
the issues
>> with
>> > the
>> > > >>> two
>> > > >>>>> keys you see?
>> > > >>>>>
>> > > >>>>> Could the following bug be related to the duplicates
you see?
>> > > >>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
>> > > >>>>>
>> > > >>>>> How do you restart the processor?
>> > > >>>>>
>> > > >>>>> Best,
>> > > >>>>> Bruno
>> > > >>>>>
>> > > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra
<
>> > > >>>>> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>>>>
>> > > >>>>>> Thank you Bruno,
>> > > >>>>>>
>> > > >>>>>> I'll look into those, however average is just
a simple thing
>> I'm
>> > > >>> trying
>> > > >>>>>> right now just to get an initial windowing flow
working.
>> > > >>>>>> In the future I'll probably still need the actual
values for
>> other
>> > > >>>>>> calculations. We won't have more than 60 elements
per window
>> for
>> > > >> sure.
>> > > >>>>>>
>> > > >>>>>> So far to not manually serialize/deserialize the
array list
>> I've
>> > > >>>>> created an
>> > > >>>>>> Avro model with an array field containing the
values.
>> > > >>>>>> I had issues with suppress as explained here
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
>> > > >>>>>>
>> > > >>>>>> but I got that working.
>> > > >>>>>> So far everything seems to be working, except
a couple things:
>> > > >>>>>>  - if I generate data with 1 key, I correctly
get a value each
>> 10
>> > > >>>>> seconds,
>> > > >>>>>> if I later start generating data with another
key (while key 1
>> is
>> > > >>> still
>> > > >>>>>> generating) the windowing emits a value only after
the
>> timestamp
>> > of
>> > > >>> key
>> > > >>>>> 2
>> > > >>>>>> reaches the last generated window
>> > > >>>>>>  - while generating data, if I restart the processor
as soon
>> as it
>> > > >>>>> starts
>> > > >>>>>> it sometimes generates 2 aggregates for the same
window even if
>> > I'm
>> > > >>>>> using
>> > > >>>>>> the suppress
>> > > >>>>>>
>> > > >>>>>> Anyway, I'll look into your link and try to find
out the cause
>> of
>> > > >>> these
>> > > >>>>>> issues, probably starting from scratch with a
simpler example
>> > > >>>>>>
>> > > >>>>>> Thank you for your help!
>> > > >>>>>>
>> > > >>>>>> --
>> > > >>>>>> Alessandro Tagliapietra
>> > > >>>>>>
>> > > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna
<
>> > bruno@confluent.io>
>> > > >>>>> wrote:
>> > > >>>>>>
>> > > >>>>>>> Hi Alessandro,
>> > > >>>>>>>
>> > > >>>>>>> Have a look at this Kafka Usage Pattern for
computing averages
>> > > >>> without
>> > > >>>>>>> using an ArrayList.
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
>> > > >>>>>>> ?
>> > > >>>>>>>
>> > > >>>>>>> The advantages of this pattern over the ArrayList
approach is
>> the
>> > > >>>>> reduced
>> > > >>>>>>> space needed to compute the aggregate. Note
that you will
>> still
>> > > >> need
>> > > >>>>> to
>> > > >>>>>>> implement a SerDe. However, the SerDe should
be a bit easier
>> to
>> > > >>>>> implement
>> > > >>>>>>> than a SerDe for an ArrayList.
>> > > >>>>>>>
>> > > >>>>>>> Hope that helps.
>> > > >>>>>>>
>> > > >>>>>>> Best,
>> > > >>>>>>> Bruno
>> > > >>>>>>>
>> > > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro
Tagliapietra <
>> > > >>>>>>> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>>>>>>
>> > > >>>>>>>> Sorry but it seemed harder than I thought,
>> > > >>>>>>>>
>> > > >>>>>>>> to have the custom aggregation working
I need to get an
>> > > >> ArrayList
>> > > >>> of
>> > > >>>>>> all
>> > > >>>>>>>> the values in the window, so far my aggregate
DSL method
>> creates
>> > > >>> an
>> > > >>>>>>>> ArrayList on the initializer and adds
each value to the list
>> in
>> > > >>> the
>> > > >>>>>>>> aggregator.
>> > > >>>>>>>> Then I think I'll have to provide a serder
to change the
>> output
>> > > >>>>> type of
>> > > >>>>>>>> that method.
>> > > >>>>>>>> I was looking at
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
>> > > >>>>>>>> but
>> > > >>>>>>>> that seems more towards a list of longs
and already uses
>> > > >>> longSerde.
>> > > >>>>>>>> I'm currently trying to implement another
avro model that
>> has a
>> > > >>>>> field
>> > > >>>>>> of
>> > > >>>>>>>> type array so I can use the regular avro
serializer to
>> implement
>> > > >>>>> this.
>> > > >>>>>>>> Should I create my own serdes instead
or is this the right
>> way?
>> > > >>>>>>>>
>> > > >>>>>>>> Thank you in advance
>> > > >>>>>>>>
>> > > >>>>>>>> --
>> > > >>>>>>>> Alessandro Tagliapietra
>> > > >>>>>>>>
>> > > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro
Tagliapietra <
>> > > >>>>>>>> tagliapietra.alessandro@gmail.com>
wrote:
>> > > >>>>>>>>
>> > > >>>>>>>>> Thank you Bruno and Matthias,
>> > > >>>>>>>>>
>> > > >>>>>>>>> I've modified the transformer to implement
the
>> > > >>>>>> ValueTransformerWithKey
>> > > >>>>>>>>> interface and everything is working
fine.
>> > > >>>>>>>>> I've now to window the data and manually
aggregate each
>> window
>> > > >>>>> data
>> > > >>>>>>> since
>> > > >>>>>>>>> I've to do some averages and sum of
differences.
>> > > >>>>>>>>> So far I've just having some issues
with message types since
>> > > >> I'm
>> > > >>>>>>> changing
>> > > >>>>>>>>> the data type when aggregating the
window but I think it's
>> an
>> > > >>> easy
>> > > >>>>>>>> problem.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thank you again
>> > > >>>>>>>>> Best
>> > > >>>>>>>>>
>> > > >>>>>>>>> --
>> > > >>>>>>>>> Alessandro Tagliapietra
>> > > >>>>>>>>>
>> > > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno
Cadonna <
>> > > >>>>> bruno@confluent.io>
>> > > >>>>>>>> wrote:
>> > > >>>>>>>>>
>> > > >>>>>>>>>> Hi Alessandro,
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> the `TransformSupplier` is internally
wrapped with a
>> > > >>>>>>>> `ProcessorSupplier`,
>> > > >>>>>>>>>> so the statement
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> `transform` is essentially equivalent
to adding the
>> > > >> Transformer
>> > > >>>>> via
>> > > >>>>>>>>>> Topology#addProcessor() to your
processor topology
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> is correct.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> If you do not change the key,
you should definitely use one
>> > > >> of
>> > > >>>>> the
>> > > >>>>>>>>>> overloads of `transformValues`
to avoid internal data
>> > > >>>>>> redistribution.
>> > > >>>>>>> In
>> > > >>>>>>>>>> your case the overload with
>> `ValueTransformerWithKeySupplier`
>> > > >>> as
>> > > >>>>>>>> suggested
>> > > >>>>>>>>>> by Matthias would fit.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Best,
>> > > >>>>>>>>>> Bruno
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51
PM Matthias J. Sax <
>> > > >>>>>>> matthias@confluent.io
>> > > >>>>>>>>>
>> > > >>>>>>>>>> wrote:
>> > > >>>>>>>>>>
>> > > >>>>>>>>>>> There is also `ValueTransformerWithKey`
that gives you
>> > > >>>>> read-only
>> > > >>>>>>> acess
>> > > >>>>>>>>>>> to the key.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> -Matthias
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro
Tagliapietra wrote:
>> > > >>>>>>>>>>>> Hi Bruno,
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Thank you for the quick
answer.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> I'm actually trying to
do that since it seems there is
>> > > >>>>> really no
>> > > >>>>>>> way
>> > > >>>>>>>>>> to
>> > > >>>>>>>>>>>> have it use `Processor<K,
V>`.
>> > > >>>>>>>>>>>> I just wanted (if that
would've made any sense) to use
>> > > >> the
>> > > >>>>>>> Processor
>> > > >>>>>>>>>> in
>> > > >>>>>>>>>>>> both DSL and non-DSL pipelines.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Anyway, regarding `transformValues()`
I don't think I can
>> > > >>>>> use it
>> > > >>>>>>> as
>> > > >>>>>>>> I
>> > > >>>>>>>>>>> need
>> > > >>>>>>>>>>>> the message key since
that is the discriminating value
>> > > >> for
>> > > >>>>> the
>> > > >>>>>>>> filter
>> > > >>>>>>>>>> (I
>> > > >>>>>>>>>>>> want to exclude old values
per sensor ID so per message
>> > > >>> key)
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Right now I've this
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
>> > > >>>>>>>>>>>> and
>> > > >>>>>>>>>>>> i'm using it with `transform()`
.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> One thing I've found confusing
is this
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> transform is essentially
equivalent to adding the
>> > > >>> Transformer
>> > > >>>>>> via
>> > > >>>>>>>>>>>>> Topology#addProcessor()
to yourprocessor topology
>> > > >>>>>>>>>>>>> <
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>> .
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> is it? Doesn't `transform`
need a TransformSupplier while
>> > > >>>>>>>>>> `addProcessor`
>> > > >>>>>>>>>>>> uses a ProcessorSupplier?
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Thank you again for your
help
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> --
>> > > >>>>>>>>>>>> Alessandro Tagliapietra
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> On Fri, Apr 12, 2019 at
5:04 PM Bruno Cadonna <
>> > > >>>>>> bruno@confluent.io
>> > > >>>>>>>>
>> > > >>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Hi Alessandro,
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Have you considered
using `transform()` (actually in
>> > > >> your
>> > > >>>>> case
>> > > >>>>>>> you
>> > > >>>>>>>>>>> should
>> > > >>>>>>>>>>>>> use `transformValues()`)
instead of `.process()`?
>> > > >>>>> `transform()`
>> > > >>>>>>> and
>> > > >>>>>>>>>>>>> `transformValues()`
are stateful operations similar to
>> > > >>>>>> `.process`
>> > > >>>>>>>> but
>> > > >>>>>>>>>>> they
>> > > >>>>>>>>>>>>> return a `KStream`.
On a `KStream` you can then apply a
>> > > >>>>>> windowed
>> > > >>>>>>>>>>>>> aggregation.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Hope that helps.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Best,
>> > > >>>>>>>>>>>>> Bruno
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> On Fri, Apr 12, 2019
at 4:31 PM Alessandro Tagliapietra
>> > > >> <
>> > > >>>>>>>>>>>>> tagliapietra.alessandro@gmail.com>
wrote:
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> Hi there,
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> I'm just starting
with Kafka and I'm trying to create a
>> > > >>>>> stream
>> > > >>>>>>>>>>> processor
>> > > >>>>>>>>>>>>>> that in multiple
stages:
>> > > >>>>>>>>>>>>>>  - filters messages
using a kv store so that only
>> > > >>> messages
>> > > >>>>>> with
>> > > >>>>>>>>>> higher
>> > > >>>>>>>>>>>>>> timestamp gets
processed
>> > > >>>>>>>>>>>>>>  - aggregates
the message metrics by minute giving e.g.
>> > > >>> the
>> > > >>>>>> avg
>> > > >>>>>>> of
>> > > >>>>>>>>>>> those
>> > > >>>>>>>>>>>>>> metrics in that
minute
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> The message is
simple, the key is the sensor ID and the
>> > > >>>>> value
>> > > >>>>>> is
>> > > >>>>>>>>>> e.g. {
>> > > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP,
speed: INT }.
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> I've started by
creating a processor to use the kv
>> > > >> store
>> > > >>>>> and
>> > > >>>>>>>> filter
>> > > >>>>>>>>>> old
>> > > >>>>>>>>>>>>>> messages:
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> Then I was trying
to implement windowing, I saw very
>> > > >> nice
>> > > >>>>>>>> windowing
>> > > >>>>>>>>>>>>>> examples for the
DSL but none for the Processor API
>> > > >>> (only a
>> > > >>>>>>> small
>> > > >>>>>>>>>>>>> reference
>> > > >>>>>>>>>>>>>> to the windowed
store), can someone point me in the
>> > > >> right
>> > > >>>>>>>> direction?
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> Now, since I wasn't
able to find any example I tried to
>> > > >>> use
>> > > >>>>>> the
>> > > >>>>>>>> DSL
>> > > >>>>>>>>>> but
>> > > >>>>>>>>>>>>>> haven't found
a way to use my processor with it, I saw
>> > > >>> this
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
>> > > >>>>>>>>>>>>>> but
>> > > >>>>>>>>>>>>>> it explains mostly
transformers not processors. I also
>> > > >>> saw
>> > > >>>>>> after
>> > > >>>>>>>>>> that
>> > > >>>>>>>>>>> the
>> > > >>>>>>>>>>>>>> example usage
of the processor but `.process(...)`
>> > > >>> returns
>> > > >>>>>> void,
>> > > >>>>>>>> so
>> > > >>>>>>>>>> I
>> > > >>>>>>>>>>>>>> cannot have a
KStream from a processor?
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> Thank you all
in advance
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> --
>> > > >>>>>>>>>>>>>> Alessandro Tagliapietra
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > > >
>> > >
>> > >
>> >
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message