kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ara Ebrahimi <ara.ebrah...@argyledata.com>
Subject Re: Initializing StateStores takes *really* long for large datasets
Date Wed, 30 Nov 2016 19:04:42 GMT
+1 on this.

Ara.

> On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak <mathieu.fenniak@replicon.com> wrote:
>
> I'd like to quickly reinforce Frank's opinion regarding the rocksdb memory
> usage.  I was also surprised by the amount of non-JVM-heap memory being
> used and had to tune the 100 MB default down considerably.  It's also
> unfortunate that it's hard to estimate the memory requirements for a KS app
> because of this.  If you have ten stores, and assuming the default config,
> you'd need a GB of memory for the rocksdb cache if you run 1 app, but only
> half a GB if you run two app instances because the stores will be
> distributed.
>
> It would be much nicer to be able to give KS a fixed amount of memory in a
> config that it divided among the active stores on a node.  Configure it
> with N GB; if a rebalance adds more tasks and stores, they each get less
> RAM; if a rebalance removes tasks and stores, the remaining stores get more
> RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
> interface because it doesn't get any state about the KS topology to make
> decisions; which are arguably not config, but tuning / performance
> decisions.
>
> Mathieu
>
>
>
> On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu <flyaruu@gmail.com> wrote:
>
>> I'll write an update on where I am now.
>>
>> I've got about 40 'primary' topics, some small, some up to about 10M
>> messages,
>> and about 30 internal topics, divided over 6 stream instances, all running
>> in a single
>> app, talking to a 3 node Kafka cluster.
>>
>> I use a single thread per stream instance, as my prime concern is now to
>> get it
>> to run stable, rather than optimizing performance.
>>
>> My biggest issue was that after a few hours my application started to slow
>> down
>> to ultimately freeze up or crash. It turned out that RocksDb consumed all
>> my
>> memory, which I overlooked as it was off-heap.
>>
>> I was fooling around with RocksDb settings a bit but I had missed the most
>> important
>> one:
>>
>>        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
>>        tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
>>        tableConfig.setBlockSize(BLOCK_SIZE);
>>        options.setTableFormatConfig(tableConfig);
>>
>> The block cache size defaults to a whopping 100Mb per store, and that gets
>> expensive
>> fast. I reduced it to a few megabytes. My data size is so big that I doubt
>> it is very effective
>> anyway. Now it seems more stable.
>>
>> I'd say that a smaller default makes sense, especially because the failure
>> case is
>> so opaque (running all tests just fine but with a serious dataset it dies
>> slowly)
>>
>> Another thing I see is that while starting all my instances, some are quick
>> and some take
>> time (makes sense as the data size varies greatly), but as more instances
>> start up, they
>> start to use more and more CPU I/O and network, that the initialization of
>> the bigger ones
>> takes even longer, increasing the chance that one of them takes longer than
>> the
>> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we can
>> separate the 'initialize' and 'start' step somehow.
>>
>> In this case we could log better: If initialization is taking longer than
>> the timeout, it ends up
>> being reassigned (in my case to the same instance) and then it errors out
>> on being unable
>> to lock the state dir. That message isn't too informative as the timeout is
>> the actual problem.
>>
>> regards, Frank
>>
>>
>> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>>
>>> Hello Frank,
>>>
>>> How many instances do you have in your apps and how many threads did you
>>> use per thread? Note that besides the topology complexity (i.e. number of
>>> state stores, number of internal topics etc) the (re-)initialization
>>> process is depending on the underlying consumer's membership protocol,
>> and
>>> hence its rebalance latency could be longer with larger groups.
>>>
>>> We have been optimizing our rebalance latency due to state store
>> migration
>>> and restoration in the latest release, but for now the re-initialization
>>> latency is still largely depends on 1) topology complexity regarding to
>>> state stores and 2) number of input topic partitions and instance /
>> threads
>>> in the application.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy <damian.guy@gmail.com>
>> wrote:
>>>
>>>> Hi Frank,
>>>>
>>>> If you are running on a single node then the RocksDB state should be
>>>> re-used by your app. However, it relies on the app being cleanly
>> shutdown
>>>> and the existence of ".checkpoint" files in the state directory for the
>>>> store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If the
>>>> file
>>>> doesn't exist then the entire state will be restored from the
>> changelog -
>>>> which could take some time. I suspect this is what is happening?
>>>>
>>>> As for the RocksDB memory settings, yes the off heap memory usage does
>>>> sneak under the radar. There is a memory management story for Kafka
>>> Streams
>>>> that is yet to be started. This would involve limiting the off-heap
>>> memory
>>>> that RocksDB uses.
>>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>> On Fri, 25 Nov 2016 at 21:14 Frank Lyaruu <flyaruu@gmail.com> wrote:
>>>>
>>>>> I'm running all on a single node, so there is no 'data mobility'
>>>> involved.
>>>>> So if Streams does not use any existing data, I might as well wipe
>> the
>>>>> whole RocksDb before starting, right?
>>>>>
>>>>> As for the RocksDb tuning, I am using a RocksDBConfigSetter, to
>> reduce
>>>> the
>>>>> memory usage a bit:
>>>>>
>>>>> options.setWriteBufferSize(3000000);
>>>>> options.setMaxBytesForLevelBase(30000000);
>>>>> options.setMaxBytesForLevelMultiplier(3);
>>>>>
>>>>> I needed to do this as my 16Gb machine would die otherwise but I
>>> honestly
>>>>> was just reducing values more or less randomly until it wouldn't fall
>>>> over.
>>>>> I have to say this is a big drawback of Rocks, I monitor Java memory
>>>> usage
>>>>> but this just sneaks under the radar as it is off heap, and it isn't
>>> very
>>>>> clear what the implications are of different settings, as I can't
>> says
>>>>> something like the Xmx heap setting, meaning: Take whatever you need
>> up
>>>> to
>>>>> this maximum. Also, if I get this right, in the long run, as the data
>>> set
>>>>> changes and grows, I can never be sure it won't take too much memory.
>>>>>
>>>>> I get the impression I'll be better off with an external store,
>>>> something I
>>>>> can monitor, tune and restart separately.
>>>>>
>>>>> But I'm getting ahead of myself. I'll wipe the data before I start,
>> see
>>>> if
>>>>> that gets me any stability
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 25, 2016 at 4:54 PM, Damian Guy <damian.guy@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hi Frank,
>>>>>>
>>>>>> If you have run the app before with the same applicationId,
>>> completely
>>>>> shut
>>>>>> it down, and then restarted it again, it will need to restore all
>> of
>>>> the
>>>>>> state which will take some time depending on the amount of data you
>>>> have.
>>>>>> In this case the placement of the partitions doesn't take into
>>> account
>>>>> any
>>>>>> existing state stores, so it might need to load quite a lot of data
>>> if
>>>>>> nodes assigned certain partitions don't have that state-store (this
>>> is
>>>>>> something we should look at improving).
>>>>>>
>>>>>> As for RocksDB tuning - you can provide an implementation of
>>>>>> RocksDBConfigSetter via config: StreamsConfig.ROCKSDB_CONFIG_
>>>> SETTER_CLASS
>>>>>> it has a single method:
>>>>>>
>>>>>> public void setConfig(final String storeName, final Options
>> options,
>>>>>> final Map<String, Object> configs)
>>>>>>
>>>>>> in this method you can set various options on the provided Options
>>>>> object.
>>>>>> The options that might help in this case are:
>>>>>> options.setWriteBufferSize(..)  - default in streams is 32MB
>>>>>> options.setMaxWriteBufferNumer(..) - default in streams is 3
>>>>>>
>>>>>> However, i'm no expert on RocksDB and i suggest you have look at
>>>>>> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide for
>>> more
>>>>>> info.
>>>>>>
>>>>>> Thanks,
>>>>>> Damian
>>>>>>
>>>>>> On Fri, 25 Nov 2016 at 13:02 Frank Lyaruu <flyaruu@gmail.com>
>> wrote:
>>>>>>
>>>>>>> @Damian:
>>>>>>>
>>>>>>> Yes, it ran before, and it has that 200gb blob worth of Rocksdb
>>> stuff
>>>>>>>
>>>>>>> @Svente: It's on a pretty high end san in a managed private
>> cloud,
>>>> I'm
>>>>>>> unsure what the ultimate storage is, but I doubt there is a
>>>> performance
>>>>>>> problem there.
>>>>>>>
>>>>>>> On Fri, 25 Nov 2016 at 13:37, Svante Karlsson <
>>>> svante.karlsson@csi.se>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> What kind of disk are you using for the rocksdb store? ie
>>> spinning
>>>> or
>>>>>>> ssd?
>>>>>>>>
>>>>>>>> 2016-11-25 12:51 GMT+01:00 Damian Guy <damian.guy@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi Frank,
>>>>>>>>>
>>>>>>>>> Is this on a restart of the application?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Damian
>>>>>>>>>
>>>>>>>>> On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu <flyaruu@gmail.com
>>>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi y'all,
>>>>>>>>>>
>>>>>>>>>> I have a reasonably simple KafkaStream application,
which
>>>> merges
>>>>>>> about
>>>>>>>> 20
>>>>>>>>>> topics a few times.
>>>>>>>>>> The thing is, some of those topic datasets are pretty
big,
>>>> about
>>>>>> 10M
>>>>>>>>>> messages. In total I've got
>>>>>>>>>> about 200Gb worth of state in RocksDB, the largest
topic is
>>> 38
>>>>> Gb.
>>>>>>>>>>
>>>>>>>>>> I had set the MAX_POLL_INTERVAL_MS_CONFIG to one
hour to
>>> cover
>>>>> the
>>>>>>>>>> initialization time,
>>>>>>>>>> but that does not seem nearly enough, I'm looking
at more
>>> than
>>>>> two
>>>>>>> hour
>>>>>>>>>> startup times, and
>>>>>>>>>> that starts to be a bit ridiculous.
>>>>>>>>>>
>>>>>>>>>> Any tips / experiences on how to deal with this case?
Move
>>> away
>>>>>> from
>>>>>>>>> Rocks
>>>>>>>>>> and use an external
>>>>>>>>>> data store? Any tuning tips on how to tune Rocks
to be a
>> bit
>>>> more
>>>>>>>> useful
>>>>>>>>>> here?
>>>>>>>>>>
>>>>>>>>>> regards, Frank
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
>
> ________________________________




________________________________

This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.

________________________________

Mime
View raw message