kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: caching permantly disabled in Kafka Streams
Date Thu, 14 Dec 2017 18:30:24 GMT
StreamsConfig does accept the LONG type, hence it is accepting the
properties as `Map<?, ?>` and do casting internally based on the specified
type.

Note that in the code snippet, StreamsConfig is not used actually, it is
only using Properties which has string constraints.

If you run the slightly modified code from Artur: https://gist.github.
com/guozhangwang/8e69612f04d15e51f2951fc7c0685761

The results are:

null
10485760


On Thu, Dec 14, 2017 at 9:57 AM, Ted Yu <yuzhihong@gmail.com> wrote:

> In StreamsConfig.java , CACHE_MAX_BYTES_BUFFERING_CONFIG is defined as:
>
>             .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
>
>                     Type.LONG,
>
>                     10 * 1024 * 1024L,
>
>
> I think using numeral should be accepted (as shown by the Demo.java
> classes).
>
> On Thu, Dec 14, 2017 at 9:42 AM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Artur,
> >
> > This is because Properties#getProperty() is expecting a String value, and
> > hence 10 * 1024 * 1024L is not recognized; you can try "10485760".
> >
> >
> > Guozhang
> >
> > On Wed, Dec 13, 2017 at 10:51 PM, Artur Mrozowski <artmro@gmail.com>
> > wrote:
> >
> > > Sure.
> > >
> > > Another observation I've made is that before I started modifying these
> > > properties I could spot quite a few duplicates in the state store.
> Then I
> > > applied exactly once semantics which removed most of the duplicates.
> > > Finally I disabled cache by setting CACHE_MAX_BYTES_BUFFERING_CONFIG
> to
> > 0
> > > which duplicates each record. Since then I've been trying to reenable
> it.
> > >
> > >  StreamsConfig config = new StreamsConfig(getProperties());
> > >
> > >
> > > System.out.println(getProperties().getProperty(StreamsConfig
> .PROCESSING_
> > > GUARANTEE_CONFIG));
> > >
> > > System.out.println(getProperties().getProperty(
> > > StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
> > >
> > > System.out.println(getProperties().getProperty(StreamsConfig
> .STATE_DIR_
> > > CONFIG));
> > >
> > > exactly_once
> > > null
> > > /tmp/customerStoreLocal6
> > >
> > >
> > > private static Properties getProperties() {
> > >         Properties settings = new Properties();
> > >         settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
> > >         settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092");
> > >         settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > > "localhost:2181");
> > >         settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass());
> > >         settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass());
> > >
> > > settings.put(StreamsConfig.STATE_DIR_CONFIG,"/tmp/customerSt
> oreLocal6");
> > >         settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > > WallclockTimestampExtractor.class);
> > >
> > > settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exac
> tly_once");
> > >         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"
> > earliest");
> > >         settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10
> *
> > > 1024 * 1024L);
> > >         return settings;
> > >     }
> > >
> > > On Wed, Dec 13, 2017 at 11:53 PM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > >
> > > > Could you show us the testing code snippet that shows how you set the
> > > > configs and how you read from it for verification?
> > > >
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Dec 13, 2017 at 1:07 PM, Artur Mrozowski <artmro@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Guozhang,
> > > > >
> > > > > I print out some values that I assign to StreamsConfig in the
> > console,
> > > > but
> > > > > the CACHE_MAX_BYTES_BUFFERING_CONFIG is always null. I disabled
> > > caching
> > > > by
> > > > > setting it to 0 today, and it seems to have the expected effect.
> > > > > But after this I am not able to assign any value to it, it is
> always
> > > nul.
> > > > >
> > > > > Best Regards
> > > > > Artur
> > > > >
> > > > > On Wed, Dec 13, 2017 at 5:44 PM, Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hello Artur,
> > > > > >
> > > > > > What do you mean exactly by "It simply returns null no matter
> what
> > > > value
> > > > > I
> > > > > > provide."?
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Wed, Dec 13, 2017 at 8:02 AM, Artur Mrozowski <
> artmro@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Bill,
> > > > > > > No, but I'll be happy to generate it. How do I generate
logs
> for
> > > > > > > StreamsConfig?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Artur
> > > > > > >
> > > > > > > On Wed, Dec 13, 2017 at 3:44 PM, Bill Bejeck <
> bill@confluent.io>
> > > > > wrote:
> > > > > > >
> > > > > > > > H Artur,
> > > > > > > >
> > > > > > > > Do you have any log files you can share for this issue?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Bill
> > > > > > > >
> > > > > > > > On Wed, Dec 13, 2017 at 8:15 AM, Artur Mrozowski <
> > > artmro@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Actually I can see all other properties being
set, except
> for
> > > > > > > > > CACHE_MAX_BYTES_BUFFERING_CONFIG that is null.
> > > > > > > > > I use 0.11.0.2 Kafka Streams.
> > > > > > > > > Has anyone encountered this issue?
> > > > > > > > >
> > > > > > > > > /Artur
> > > > > > > > >
> > > > > > > > > On Wed, Dec 13, 2017 at 1:11 PM, Artur Mrozowski
<
> > > > artmro@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > > I played around with caching on Confluent
platform 3.3 by
> > > first
> > > > > > > > > disabling,
> > > > > > > > > > setting to zero. Now, it seems I can not
enable it again.
> > It
> > > > > simply
> > > > > > > > > returns
> > > > > > > > > > null no matter what value I provide.
> > > > > > > > > >
> > > > > > > > > > e.g
> > > > > > > > > > settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_
> > > > > > > > > > CONFIG,10*1024*1024L);
> > > > > > > > > >
> > > > > > > > > > How can I enable it again? It generates
a lot of
> > duplicates.
> > > > > > > > > >
> > > > > > > > > > Best Regards
> > > > > > > > > > Artur
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message