kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Artur Mrozowski <art...@gmail.com>
Subject Re: caching permantly disabled in Kafka Streams
Date Thu, 14 Dec 2017 19:21:48 GMT
hm, strange. It keeps appending records, even in the state store. The
number of records grows for each run.

/Artur

On Thu, Dec 14, 2017 at 8:18 PM, Artur Mrozowski <artmro@gmail.com> wrote:

> Ok I see, what was the default value before I've changed it?
>
> On Thu, Dec 14, 2017 at 7:47 PM, Artur Mrozowski <artmro@gmail.com> wrote:
>
>> Hi Gouzhang,
>> thank you for the answer. Indeed the value is being populated now,
>> however the application behaves oddly and not how it used to. I suspect
>> that disabling caching by setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0
>> has been persisited somehow.
>>
>> It seems as log compaction has been disabled permanently. What I observe
>> now is rather log append. For each run the output will be more and more
>> duplicates.
>>
>> Normally I would have hard time to reproduce duplication with that number
>> of records, 3 in this case. I am trying to implement same idea as you
>> described in KIP 150. Normally I would not observe duplicates until
>> aggregation in line 495
>>
>> https://github.com/afuyo/KStreamsDemo/blob/master/src/main/
>> java/kstream.demo/CustomerStreamPipelineHDI.java#L425
>>
>> I could get rid of large number of duplicates using exactly once
>> semantics but not anymore. I run on version 0.11 of Kafka Streams. What do
>> you think could be causing it? Is version 1.0 more stable in this aspect?
>> Best regards
>> Artur
>>
>> On Thu, Dec 14, 2017 at 6:42 PM, Guozhang Wang <wangguoz@gmail.com>
>> wrote:
>>
>>> Artur,
>>>
>>> This is because Properties#getProperty() is expecting a String value, and
>>> hence 10 * 1024 * 1024L is not recognized; you can try "10485760".
>>>
>>>
>>> Guozhang
>>>
>>> On Wed, Dec 13, 2017 at 10:51 PM, Artur Mrozowski <artmro@gmail.com>
>>> wrote:
>>>
>>> > Sure.
>>> >
>>> > Another observation I've made is that before I started modifying these
>>> > properties I could spot quite a few duplicates in the state store.
>>> Then I
>>> > applied exactly once semantics which removed most of the duplicates.
>>> > Finally I disabled cache by setting CACHE_MAX_BYTES_BUFFERING_CONFIG
>>> to 0
>>> > which duplicates each record. Since then I've been trying to reenable
>>> it.
>>> >
>>> >  StreamsConfig config = new StreamsConfig(getProperties());
>>> >
>>> >
>>> > System.out.println(getProperties().getProperty(StreamsConfig
>>> .PROCESSING_
>>> > GUARANTEE_CONFIG));
>>> >
>>> > System.out.println(getProperties().getProperty(
>>> > StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
>>> >
>>> > System.out.println(getProperties().getProperty(StreamsConfig
>>> .STATE_DIR_
>>> > CONFIG));
>>> >
>>> > exactly_once
>>> > null
>>> > /tmp/customerStoreLocal6
>>> >
>>> >
>>> > private static Properties getProperties() {
>>> >         Properties settings = new Properties();
>>> >         settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
>>> >         settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> > "localhost:9092");
>>> >         settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>>> > "localhost:2181");
>>> >         settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
>>> > Serdes.String().getClass());
>>> >         settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
>>> > Serdes.String().getClass());
>>> >
>>> > settings.put(StreamsConfig.STATE_DIR_CONFIG,"/tmp/customerSt
>>> oreLocal6");
>>> >         settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>>> > WallclockTimestampExtractor.class);
>>> >
>>> > settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exac
>>> tly_once");
>>> >         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earli
>>> est");
>>> >         settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10
>>> *
>>> > 1024 * 1024L);
>>> >         return settings;
>>> >     }
>>> >
>>> > On Wed, Dec 13, 2017 at 11:53 PM, Guozhang Wang <wangguoz@gmail.com>
>>> > wrote:
>>> >
>>> > > Could you show us the testing code snippet that shows how you set the
>>> > > configs and how you read from it for verification?
>>> > >
>>> > >
>>> > >
>>> > > Guozhang
>>> > >
>>> > > On Wed, Dec 13, 2017 at 1:07 PM, Artur Mrozowski <artmro@gmail.com>
>>> > wrote:
>>> > >
>>> > > > Hello Guozhang,
>>> > > >
>>> > > > I print out some values that I assign to StreamsConfig in the
>>> console,
>>> > > but
>>> > > > the CACHE_MAX_BYTES_BUFFERING_CONFIG is always null. I disabled
>>> > caching
>>> > > by
>>> > > > setting it to 0 today, and it seems to have the expected effect.
>>> > > > But after this I am not able to assign any value to it, it is
>>> always
>>> > nul.
>>> > > >
>>> > > > Best Regards
>>> > > > Artur
>>> > > >
>>> > > > On Wed, Dec 13, 2017 at 5:44 PM, Guozhang Wang <wangguoz@gmail.com
>>> >
>>> > > wrote:
>>> > > >
>>> > > > > Hello Artur,
>>> > > > >
>>> > > > > What do you mean exactly by "It simply returns null no matter
>>> what
>>> > > value
>>> > > > I
>>> > > > > provide."?
>>> > > > >
>>> > > > >
>>> > > > > Guozhang
>>> > > > >
>>> > > > >
>>> > > > > On Wed, Dec 13, 2017 at 8:02 AM, Artur Mrozowski <
>>> artmro@gmail.com>
>>> > > > wrote:
>>> > > > >
>>> > > > > > Hi Bill,
>>> > > > > > No, but I'll be happy to generate it. How do I generate
logs
>>> for
>>> > > > > > StreamsConfig?
>>> > > > > >
>>> > > > > > Thanks,
>>> > > > > > Artur
>>> > > > > >
>>> > > > > > On Wed, Dec 13, 2017 at 3:44 PM, Bill Bejeck <
>>> bill@confluent.io>
>>> > > > wrote:
>>> > > > > >
>>> > > > > > > H Artur,
>>> > > > > > >
>>> > > > > > > Do you have any log files you can share for this
issue?
>>> > > > > > >
>>> > > > > > > Thanks,
>>> > > > > > > Bill
>>> > > > > > >
>>> > > > > > > On Wed, Dec 13, 2017 at 8:15 AM, Artur Mrozowski
<
>>> > artmro@gmail.com
>>> > > >
>>> > > > > > wrote:
>>> > > > > > >
>>> > > > > > > > Actually I can see all other properties being
set, except
>>> for
>>> > > > > > > > CACHE_MAX_BYTES_BUFFERING_CONFIG that is null.
>>> > > > > > > > I use 0.11.0.2 Kafka Streams.
>>> > > > > > > > Has anyone encountered this issue?
>>> > > > > > > >
>>> > > > > > > > /Artur
>>> > > > > > > >
>>> > > > > > > > On Wed, Dec 13, 2017 at 1:11 PM, Artur Mrozowski
<
>>> > > artmro@gmail.com
>>> > > > >
>>> > > > > > > wrote:
>>> > > > > > > >
>>> > > > > > > > > Hi,
>>> > > > > > > > > I played around with caching on Confluent
platform 3.3 by
>>> > first
>>> > > > > > > > disabling,
>>> > > > > > > > > setting to zero. Now, it seems I can
not enable it
>>> again. It
>>> > > > simply
>>> > > > > > > > returns
>>> > > > > > > > > null no matter what value I provide.
>>> > > > > > > > >
>>> > > > > > > > > e.g
>>> > > > > > > > > settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_
>>> > > > > > > > > CONFIG,10*1024*1024L);
>>> > > > > > > > >
>>> > > > > > > > > How can I enable it again? It generates
a lot of
>>> duplicates.
>>> > > > > > > > >
>>> > > > > > > > > Best Regards
>>> > > > > > > > > Artur
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > --
>>> > > > > -- Guozhang
>>> > > > >
>>> > > >
>>> > >
>>> > >
>>> > >
>>> > > --
>>> > > -- Guozhang
>>> > >
>>> >
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message