kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Artur Mrozowski <art...@gmail.com>
Subject Re: caching permantly disabled in Kafka Streams
Date Thu, 14 Dec 2017 19:18:47 GMT
Ok I see, what was the default value before I've changed it?

On Thu, Dec 14, 2017 at 7:47 PM, Artur Mrozowski <artmro@gmail.com> wrote:

> Hi Gouzhang,
> thank you for the answer. Indeed the value is being populated now, however
> the application behaves oddly and not how it used to. I suspect that
> disabling caching by setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 has
> been persisited somehow.
>
> It seems as log compaction has been disabled permanently. What I observe
> now is rather log append. For each run the output will be more and more
> duplicates.
>
> Normally I would have hard time to reproduce duplication with that number
> of records, 3 in this case. I am trying to implement same idea as you
> described in KIP 150. Normally I would not observe duplicates until
> aggregation in line 495
>
> https://github.com/afuyo/KStreamsDemo/blob/master/src/
> main/java/kstream.demo/CustomerStreamPipelineHDI.java#L425
>
> I could get rid of large number of duplicates using exactly once semantics
> but not anymore. I run on version 0.11 of Kafka Streams. What do you think
> could be causing it? Is version 1.0 more stable in this aspect?
> Best regards
> Artur
>
> On Thu, Dec 14, 2017 at 6:42 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
>> Artur,
>>
>> This is because Properties#getProperty() is expecting a String value, and
>> hence 10 * 1024 * 1024L is not recognized; you can try "10485760".
>>
>>
>> Guozhang
>>
>> On Wed, Dec 13, 2017 at 10:51 PM, Artur Mrozowski <artmro@gmail.com>
>> wrote:
>>
>> > Sure.
>> >
>> > Another observation I've made is that before I started modifying these
>> > properties I could spot quite a few duplicates in the state store. Then
>> I
>> > applied exactly once semantics which removed most of the duplicates.
>> > Finally I disabled cache by setting CACHE_MAX_BYTES_BUFFERING_CONFIG
>> to 0
>> > which duplicates each record. Since then I've been trying to reenable
>> it.
>> >
>> >  StreamsConfig config = new StreamsConfig(getProperties());
>> >
>> >
>> > System.out.println(getProperties().getProperty(StreamsConfig
>> .PROCESSING_
>> > GUARANTEE_CONFIG));
>> >
>> > System.out.println(getProperties().getProperty(
>> > StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
>> >
>> > System.out.println(getProperties().getProperty(StreamsConfig.STATE_DIR_
>> > CONFIG));
>> >
>> > exactly_once
>> > null
>> > /tmp/customerStoreLocal6
>> >
>> >
>> > private static Properties getProperties() {
>> >         Properties settings = new Properties();
>> >         settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
>> >         settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> > "localhost:9092");
>> >         settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>> > "localhost:2181");
>> >         settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass());
>> >         settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass());
>> >
>> > settings.put(StreamsConfig.STATE_DIR_CONFIG,"/tmp/customerSt
>> oreLocal6");
>> >         settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>> > WallclockTimestampExtractor.class);
>> >
>> > settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exactly_once");
>> >         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earli
>> est");
>> >         settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10
>> *
>> > 1024 * 1024L);
>> >         return settings;
>> >     }
>> >
>> > On Wed, Dec 13, 2017 at 11:53 PM, Guozhang Wang <wangguoz@gmail.com>
>> > wrote:
>> >
>> > > Could you show us the testing code snippet that shows how you set the
>> > > configs and how you read from it for verification?
>> > >
>> > >
>> > >
>> > > Guozhang
>> > >
>> > > On Wed, Dec 13, 2017 at 1:07 PM, Artur Mrozowski <artmro@gmail.com>
>> > wrote:
>> > >
>> > > > Hello Guozhang,
>> > > >
>> > > > I print out some values that I assign to StreamsConfig in the
>> console,
>> > > but
>> > > > the CACHE_MAX_BYTES_BUFFERING_CONFIG is always null. I disabled
>> > caching
>> > > by
>> > > > setting it to 0 today, and it seems to have the expected effect.
>> > > > But after this I am not able to assign any value to it, it is always
>> > nul.
>> > > >
>> > > > Best Regards
>> > > > Artur
>> > > >
>> > > > On Wed, Dec 13, 2017 at 5:44 PM, Guozhang Wang <wangguoz@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hello Artur,
>> > > > >
>> > > > > What do you mean exactly by "It simply returns null no matter
what
>> > > value
>> > > > I
>> > > > > provide."?
>> > > > >
>> > > > >
>> > > > > Guozhang
>> > > > >
>> > > > >
>> > > > > On Wed, Dec 13, 2017 at 8:02 AM, Artur Mrozowski <
>> artmro@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > Hi Bill,
>> > > > > > No, but I'll be happy to generate it. How do I generate
logs for
>> > > > > > StreamsConfig?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Artur
>> > > > > >
>> > > > > > On Wed, Dec 13, 2017 at 3:44 PM, Bill Bejeck <bill@confluent.io
>> >
>> > > > wrote:
>> > > > > >
>> > > > > > > H Artur,
>> > > > > > >
>> > > > > > > Do you have any log files you can share for this issue?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Bill
>> > > > > > >
>> > > > > > > On Wed, Dec 13, 2017 at 8:15 AM, Artur Mrozowski <
>> > artmro@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Actually I can see all other properties being
set, except
>> for
>> > > > > > > > CACHE_MAX_BYTES_BUFFERING_CONFIG that is null.
>> > > > > > > > I use 0.11.0.2 Kafka Streams.
>> > > > > > > > Has anyone encountered this issue?
>> > > > > > > >
>> > > > > > > > /Artur
>> > > > > > > >
>> > > > > > > > On Wed, Dec 13, 2017 at 1:11 PM, Artur Mrozowski
<
>> > > artmro@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi,
>> > > > > > > > > I played around with caching on Confluent
platform 3.3 by
>> > first
>> > > > > > > > disabling,
>> > > > > > > > > setting to zero. Now, it seems I can not
enable it again.
>> It
>> > > > simply
>> > > > > > > > returns
>> > > > > > > > > null no matter what value I provide.
>> > > > > > > > >
>> > > > > > > > > e.g
>> > > > > > > > > settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_
>> > > > > > > > > CONFIG,10*1024*1024L);
>> > > > > > > > >
>> > > > > > > > > How can I enable it again? It generates a
lot of
>> duplicates.
>> > > > > > > > >
>> > > > > > > > > Best Regards
>> > > > > > > > > Artur
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message