kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Artur Mrozowski <art...@gmail.com>
Subject Re: caching permantly disabled in Kafka Streams
Date Thu, 14 Dec 2017 18:47:52 GMT
Hi Gouzhang,
thank you for the answer. Indeed the value is being populated now, however
the application behaves oddly and not how it used to. I suspect that
disabling caching by setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 has been
persisited somehow.

It seems as log compaction has been disabled permanently. What I observe
now is rather log append. For each run the output will be more and more
duplicates.

Normally I would have hard time to reproduce duplication with that number
of records, 3 in this case. I am trying to implement same idea as you
described in KIP 150. Normally I would not observe duplicates until
aggregation in line 495

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L425

I could get rid of large number of duplicates using exactly once semantics
but not anymore. I run on version 0.11 of Kafka Streams. What do you think
could be causing it? Is version 1.0 more stable in this aspect?
Best regards
Artur

On Thu, Dec 14, 2017 at 6:42 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Artur,
>
> This is because Properties#getProperty() is expecting a String value, and
> hence 10 * 1024 * 1024L is not recognized; you can try "10485760".
>
>
> Guozhang
>
> On Wed, Dec 13, 2017 at 10:51 PM, Artur Mrozowski <artmro@gmail.com>
> wrote:
>
> > Sure.
> >
> > Another observation I've made is that before I started modifying these
> > properties I could spot quite a few duplicates in the state store. Then I
> > applied exactly once semantics which removed most of the duplicates.
> > Finally I disabled cache by setting CACHE_MAX_BYTES_BUFFERING_CONFIG to
> 0
> > which duplicates each record. Since then I've been trying to reenable it.
> >
> >  StreamsConfig config = new StreamsConfig(getProperties());
> >
> >
> > System.out.println(getProperties().getProperty(StreamsConfig.PROCESSING_
> > GUARANTEE_CONFIG));
> >
> > System.out.println(getProperties().getProperty(
> > StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
> >
> > System.out.println(getProperties().getProperty(StreamsConfig.STATE_DIR_
> > CONFIG));
> >
> > exactly_once
> > null
> > /tmp/customerStoreLocal6
> >
> >
> > private static Properties getProperties() {
> >         Properties settings = new Properties();
> >         settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
> >         settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> >         settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > "localhost:2181");
> >         settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> >         settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> >
> > settings.put(StreamsConfig.STATE_DIR_CONFIG,"/tmp/customerStoreLocal6");
> >         settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > WallclockTimestampExtractor.class);
> >
> > settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exactly_once");
> >         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"
> earliest");
> >         settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10 *
> > 1024 * 1024L);
> >         return settings;
> >     }
> >
> > On Wed, Dec 13, 2017 at 11:53 PM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> >
> > > Could you show us the testing code snippet that shows how you set the
> > > configs and how you read from it for verification?
> > >
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Dec 13, 2017 at 1:07 PM, Artur Mrozowski <artmro@gmail.com>
> > wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > I print out some values that I assign to StreamsConfig in the
> console,
> > > but
> > > > the CACHE_MAX_BYTES_BUFFERING_CONFIG is always null. I disabled
> > caching
> > > by
> > > > setting it to 0 today, and it seems to have the expected effect.
> > > > But after this I am not able to assign any value to it, it is always
> > nul.
> > > >
> > > > Best Regards
> > > > Artur
> > > >
> > > > On Wed, Dec 13, 2017 at 5:44 PM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Artur,
> > > > >
> > > > > What do you mean exactly by "It simply returns null no matter what
> > > value
> > > > I
> > > > > provide."?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Wed, Dec 13, 2017 at 8:02 AM, Artur Mrozowski <artmro@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi Bill,
> > > > > > No, but I'll be happy to generate it. How do I generate logs
for
> > > > > > StreamsConfig?
> > > > > >
> > > > > > Thanks,
> > > > > > Artur
> > > > > >
> > > > > > On Wed, Dec 13, 2017 at 3:44 PM, Bill Bejeck <bill@confluent.io>
> > > > wrote:
> > > > > >
> > > > > > > H Artur,
> > > > > > >
> > > > > > > Do you have any log files you can share for this issue?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bill
> > > > > > >
> > > > > > > On Wed, Dec 13, 2017 at 8:15 AM, Artur Mrozowski <
> > artmro@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Actually I can see all other properties being set,
except for
> > > > > > > > CACHE_MAX_BYTES_BUFFERING_CONFIG that is null.
> > > > > > > > I use 0.11.0.2 Kafka Streams.
> > > > > > > > Has anyone encountered this issue?
> > > > > > > >
> > > > > > > > /Artur
> > > > > > > >
> > > > > > > > On Wed, Dec 13, 2017 at 1:11 PM, Artur Mrozowski <
> > > artmro@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > I played around with caching on Confluent platform
3.3 by
> > first
> > > > > > > > disabling,
> > > > > > > > > setting to zero. Now, it seems I can not enable
it again.
> It
> > > > simply
> > > > > > > > returns
> > > > > > > > > null no matter what value I provide.
> > > > > > > > >
> > > > > > > > > e.g
> > > > > > > > > settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_
> > > > > > > > > CONFIG,10*1024*1024L);
> > > > > > > > >
> > > > > > > > > How can I enable it again? It generates a lot
of
> duplicates.
> > > > > > > > >
> > > > > > > > > Best Regards
> > > > > > > > > Artur
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message