kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Re: kafka steams with TimeWindows ,incorrect result
Date Thu, 26 Apr 2018 17:49:26 GMT
If you're talking about which store to use in your transform function, it
should be a windowed store.

You can create such a store with the `Stores` factory, and suppose your old
code has `windowedBy(TimeWindows.of(60000))`, then you can do

`
windows = TimeWindows.of(60000);

Stores.WindowStoreBuilder(
        Stores.persistentWindowStore("Counts"),
        windows.maintainMs(),

        windows.segments,

        windows.size(),
        true)

)

`


Guozhang



On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <funkyyj@live.com> wrote:

> I return back .
> Which StateStore could I use for this problem?
> and another idea .I can send 'flush' message into this topic .
> when received this message could update results to db.
> I don't know it's work?
>
> ________________________________
> funkyyj@live.com
>
> From: Guozhang Wang<mailto:wangguoz@gmail.com>
> Date: 2018-03-12 03:58
> To: users<mailto:users@kafka.apache.org>
> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
> If you want to strictly "only have one output per window", then for now
> you'd probably implement that logic using a lower-level "transform"
> function in which you can schedule a punctuate function to send all the
> results at the end of a window.
>
> If you just want to reduce the amount of data to your sink, but your sink
> can still handle overwritten records of the same key, you can enlarge the
> cache size via the cache.max.bytes.buffering config.
>
> https://kafka.apache.org/documentation/#streamsconfigs
>
> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <funkyyj@live.com> wrote:
>
> > thx for your reply!
> > I see that it is designed to operate on an infinite, unbounded stream of
> > data.
> > now I want to process for  unbounded stream but divided by time interval
> .
> > so what can I do for doing this ?
> >
> > ________________________________
> > funkyyj@live.com
> >
> > From: Guozhang Wang<mailto:wangguoz@gmail.com>
> > Date: 2018-03-10 02:50
> > To: users<mailto:users@kafka.apache.org>
> > Subject: Re: kafka steams with TimeWindows ,incorrect result
> > Hi Jie,
> >
> > This is by design of Kafka Streams, please read this doc for more details
> > (search for "outputs of the Wordcount application is actually a
> continuous
> > stream of updates"):
> >
> > https://kafka.apache.org/0110/documentation/streams/quickstart
> >
> > Note this semantics applies for both windowed and un-windowed tables.
> >
> >
> > Guozhang
> >
> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funkyyj@live.com> wrote:
> >
> > > Hi:
> > > I used TimeWindow for aggregate data in kafka.
> > >
> > > this is code snippet ;
> > >
> > >   view.flatMap(new MultipleKeyValueMapper(client)
> > > ).groupByKey(Serialized.with(Serdes.String(),
> > >                 Serdes.serdeFrom(new CountInfoSerializer(), new
> > > CountInfoDeserializer())))
> > >         .windowedBy(TimeWindows.of(60000)).reduce(new
> > > Reducer<CountInfo>() {
> > >             @Override
> > >             public CountInfo apply(CountInfo value1, CountInfo value2)
> {
> > >                 return new CountInfo(value1.start + value2.start,
> > > value1.active + value2.active, value1.fresh + value2.fresh);
> > >             }
> > >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
> > > String>() {
> > >             @Override
> > >             public String apply(Windowed<String> key, CountInfo value)
> {
> > >                 return key.key();
> > >             }
> > >         }).print(Printed.toSysOut());
> > >
> > >         KafkaStreams streams = new KafkaStreams(builder.build(),
> > > KStreamReducer.getConf());
> > >         streams.start();
> > >
> > > and I test 30000 data in kafka .
> > > and I print key value .
> > >
> > >
> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
> > > fresh=12179}
> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> > 1520601300000/1520601360000],
> > > CountInfo{start=12179, active=12179, fresh=12179}
> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
> > > fresh=30000}
> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> > 1520601300000/1520601360000],
> > > CountInfo{start=30000, active=30000, fresh=30000}
> > > why in one window duration will be print two result but not one result
> ?
> > >
> > > ________________________________
> > > funkyyj@live.com
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message