kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 杰 杨 <funk...@live.com>
Subject Re: Re: kafka steams with TimeWindows ,incorrect result
Date Fri, 27 Apr 2018 08:28:13 GMT
and I checked windowStore interface found it has put method not get method.
in one second
the stream had sample key and different value in it.
and I must update key value which store in it.


________________________________
funkyyj@live.com

From: funkyyj@live.com<mailto:funkyyj@live.com>
Date: 2018-04-27 16:08
To: users<mailto:users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?


________________________________
funkyyj@live.com

From: Guozhang Wang<mailto:wangguoz@gmail.com>
Date: 2018-04-27 01:50
To: users<mailto:users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(60000))`, then you can do
>
> `
> windows = TimeWindows.of(60000);
>
> Stores.WindowStoreBuilder(
>         Stores.persistentWindowStore("Counts"),
>         windows.maintainMs(),
>
>         windows.segments,
>
>         windows.size(),
>         true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <funkyyj@live.com> wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> ________________________________
>> funkyyj@live.com
>>
>> From: Guozhang Wang<mailto:wangguoz@gmail.com>
>> Date: 2018-03-12 03:58
>> To: users<mailto:users@kafka.apache.org>
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <funkyyj@live.com> wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > ________________________________
>> > funkyyj@live.com
>> >
>> > From: Guozhang Wang<mailto:wangguoz@gmail.com>
>> > Date: 2018-03-10 02:50
>> > To: users<mailto:users@kafka.apache.org>
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funkyyj@live.com> wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > >                 Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer())))
>> > >         .windowedBy(TimeWindows.of(60000)).reduce(new
>> > > Reducer<CountInfo>() {
>> > >             @Override
>> > >             public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > >                 return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > >             }
>> > >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
>> > > String>() {
>> > >             @Override
>> > >             public String apply(Windowed<String> key, CountInfo
>> value) {
>> > >                 return key.key();
>> > >             }
>> > >         }).print(Printed.toSysOut());
>> > >
>> > >         KafkaStreams streams = new KafkaStreams(builder.build(),
>> > > KStreamReducer.getConf());
>> > >         streams.start();
>> > >
>> > > and I test 30000 data in kafka .
>> > > and I print key value .
>> > >
>> > >
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
>> > > fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=12179, active=12179, fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
>> > > fresh=30000}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=30000, active=30000, fresh=30000}
>> > > why in one window duration will be print two result but not one
>> result ?
>> > >
>> > > ________________________________
>> > > funkyyj@live.com
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message