kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: caching permantly disabled in Kafka Streams
Date Thu, 14 Dec 2017 17:57:50 GMT
In StreamsConfig.java , CACHE_MAX_BYTES_BUFFERING_CONFIG is defined as:

            .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,

                    Type.LONG,

                    10 * 1024 * 1024L,


I think using numeral should be accepted (as shown by the Demo.java
classes).

On Thu, Dec 14, 2017 at 9:42 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Artur,
>
> This is because Properties#getProperty() is expecting a String value, and
> hence 10 * 1024 * 1024L is not recognized; you can try "10485760".
>
>
> Guozhang
>
> On Wed, Dec 13, 2017 at 10:51 PM, Artur Mrozowski <artmro@gmail.com>
> wrote:
>
> > Sure.
> >
> > Another observation I've made is that before I started modifying these
> > properties I could spot quite a few duplicates in the state store. Then I
> > applied exactly once semantics which removed most of the duplicates.
> > Finally I disabled cache by setting CACHE_MAX_BYTES_BUFFERING_CONFIG to
> 0
> > which duplicates each record. Since then I've been trying to reenable it.
> >
> >  StreamsConfig config = new StreamsConfig(getProperties());
> >
> >
> > System.out.println(getProperties().getProperty(StreamsConfig.PROCESSING_
> > GUARANTEE_CONFIG));
> >
> > System.out.println(getProperties().getProperty(
> > StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
> >
> > System.out.println(getProperties().getProperty(StreamsConfig.STATE_DIR_
> > CONFIG));
> >
> > exactly_once
> > null
> > /tmp/customerStoreLocal6
> >
> >
> > private static Properties getProperties() {
> >         Properties settings = new Properties();
> >         settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
> >         settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> >         settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > "localhost:2181");
> >         settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> >         settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> >
> > settings.put(StreamsConfig.STATE_DIR_CONFIG,"/tmp/customerStoreLocal6");
> >         settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > WallclockTimestampExtractor.class);
> >
> > settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exactly_once");
> >         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"
> earliest");
> >         settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10 *
> > 1024 * 1024L);
> >         return settings;
> >     }
> >
> > On Wed, Dec 13, 2017 at 11:53 PM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> >
> > > Could you show us the testing code snippet that shows how you set the
> > > configs and how you read from it for verification?
> > >
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Dec 13, 2017 at 1:07 PM, Artur Mrozowski <artmro@gmail.com>
> > wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > I print out some values that I assign to StreamsConfig in the
> console,
> > > but
> > > > the CACHE_MAX_BYTES_BUFFERING_CONFIG is always null. I disabled
> > caching
> > > by
> > > > setting it to 0 today, and it seems to have the expected effect.
> > > > But after this I am not able to assign any value to it, it is always
> > nul.
> > > >
> > > > Best Regards
> > > > Artur
> > > >
> > > > On Wed, Dec 13, 2017 at 5:44 PM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Artur,
> > > > >
> > > > > What do you mean exactly by "It simply returns null no matter what
> > > value
> > > > I
> > > > > provide."?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Wed, Dec 13, 2017 at 8:02 AM, Artur Mrozowski <artmro@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi Bill,
> > > > > > No, but I'll be happy to generate it. How do I generate logs
for
> > > > > > StreamsConfig?
> > > > > >
> > > > > > Thanks,
> > > > > > Artur
> > > > > >
> > > > > > On Wed, Dec 13, 2017 at 3:44 PM, Bill Bejeck <bill@confluent.io>
> > > > wrote:
> > > > > >
> > > > > > > H Artur,
> > > > > > >
> > > > > > > Do you have any log files you can share for this issue?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bill
> > > > > > >
> > > > > > > On Wed, Dec 13, 2017 at 8:15 AM, Artur Mrozowski <
> > artmro@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Actually I can see all other properties being set,
except for
> > > > > > > > CACHE_MAX_BYTES_BUFFERING_CONFIG that is null.
> > > > > > > > I use 0.11.0.2 Kafka Streams.
> > > > > > > > Has anyone encountered this issue?
> > > > > > > >
> > > > > > > > /Artur
> > > > > > > >
> > > > > > > > On Wed, Dec 13, 2017 at 1:11 PM, Artur Mrozowski <
> > > artmro@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > I played around with caching on Confluent platform
3.3 by
> > first
> > > > > > > > disabling,
> > > > > > > > > setting to zero. Now, it seems I can not enable
it again.
> It
> > > > simply
> > > > > > > > returns
> > > > > > > > > null no matter what value I provide.
> > > > > > > > >
> > > > > > > > > e.g
> > > > > > > > > settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_
> > > > > > > > > CONFIG,10*1024*1024L);
> > > > > > > > >
> > > > > > > > > How can I enable it again? It generates a lot
of
> duplicates.
> > > > > > > > >
> > > > > > > > > Best Regards
> > > > > > > > > Artur
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message