kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files
Date Fri, 28 Jun 2019 23:29:16 GMT
Hey all,

If you want to figure it out theoretically, if you print out the
topology description, you'll have some number of state stores listed
in there. The number of Rocks instances should just be
(#global_state_stores +
sum(#partitions_of_topic_per_local_state_store)) . The number of
stream threads isn't relevant here.

You can also figure it out empirically: the first level of
subdirectories in the state dir are Tasks, and then within that, the
next level is Stores. You should see the store directory names match
up with the stores listed in the topology description. The number of
Store directories is exactly the number of RocksDB instances you have.

There are also metrics corresponding to each of the state stores, so
you can compute it from what you find in the metrics.

Hope that helps,
-john

On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl <pkleindl@gmail.com> wrote:
>
> Hi Kiran
> Without much research my guess would be "num_stream_threads *
> (#global_state_stores + sum(#partitions_of_topic_per_local_state_store))"
> So 10 stores (regardless if explicitly defined or implicitely because of
> some stateful operation) with 10 partitions each should result in 100
> Rocksdb instances if you are running at the default of num_stream_threads=1.
>
> As I wrote before, start with 100.
> If the error persists, half the number, if not, double it ;-) Repeat as
> needed.
>
> If you reach the single-digit-range and the error still shows up, start
> searching for any iterators over a store you might not have closed.
>
> br, Patrik
>
> On Thu, 27 Jun 2019 at 13:11, emailtokirank@gmail.com <
> emailtokirank@gmail.com> wrote:
>
> >
> >
> > On 2019/06/27 09:02:39, Patrik Kleindl <pkleindl@gmail.com> wrote:
> > > Hello Kiran
> > >
> > > First, the value for maxOpenFiles is per RocksDB instance, and the number
> > > of those can get high if you have a lot of topic partitions etc.
> > > Check the directory (state dir) to see how many there are.
> > > Start with a low value (100) and see if that has some effect.
> > >
> > > Second, because I just found out, you should use
> > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > options.tableFormatConfig();
> > >         tableConfig.setBlockCacheSize(100*1024*1024L);
> > >         tableConfig.setBlockSize(8*1024L);
> > > instead of creating a new object to prevent accidently messing up
> > > references.
> > >
> > > Hope that helps
> > > best regards
> > > Patrik
> > >
> > > On Thu, 27 Jun 2019 at 10:46, emailtokirank@gmail.com <
> > > emailtokirank@gmail.com> wrote:
> > >
> > > >
> > > >
> > > > On 2019/06/26 21:58:02, Patrik Kleindl <pkleindl@gmail.com> wrote:
> > > > > Hi Kiran
> > > > > You can use the RocksDBConfigSetter and pass
> > > > >
> > > > > options.setMaxOpenFiles(100);
> > > > >
> > > > > to all RocksDBs for the Streams application which limits how many
are
> > > > > kept open at the same time.
> > > > >
> > > > > best regards
> > > > >
> > > > > Patrik
> > > > >
> > > > >
> > > > > On Wed, 26 Jun 2019 at 16:14, emailtokirank@gmail.com <
> > > > > emailtokirank@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > We are using Kafka streams DSL APIs for doing some counter
> > aggregations
> > > > > > (running on OpenJDK 11.0.2). Our topology has some 400 sub
> > topologies
> > > > & we
> > > > > > are using 8 partitions in source topic. When we start pumping
more
> > > > load, we
> > > > > > start getting RockDBException stating "too many open files".
> > > > > >
> > > > > > Here are the stack trace samples:
> > > > > >
> > > > > >
> > > >
> > ------------------------------------------------------------------------------------------
> > > > > > Caused by: org.rocksdb.RocksDBException: while open a file for
> > lock:
> > > > > > PPPPPPPPPPP.1512000000/LOCK: Too many open files
> > > > > >         at org.rocksdb.RocksDB.open(Native Method)
> > > > > >         at org.rocksdb.RocksDB.open(RocksDB.java:235)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:156)
> > > > > >         ... 24 common frames omitted
> > > > > >
> > > > > >
> > > > > > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> > > > Error
> > > > > > while executing flush from store XXXXXXXXXXX.1512000000
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:397)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:388)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:163)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(RocksDBSegmentedBytesStore.java:178)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217)
> > > > > >         ... 10 more
> > > > > > Caused by: org.rocksdb.RocksDBException: While open a file for
> > > > appending:
> > > > > > YYYYYYYYYYYYY.1512000000/000007.dbtmp: Too many open files
> > > > > >         at org.rocksdb.RocksDB.flush(Native Method)
> > > > > >         at org.rocksdb.RocksDB.flush(RocksDB.java:3401)
> > > > > >         at org.rocksdb.RocksDB.flush(RocksDB.java:3361)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:395)
> > > > > >
> > > > > >
> > > >
> > ------------------------------------------------------------------------------------------
> > > > > >
> > > > > > We tried increasing the open files limit at OS level to some
decent
> > > > > > number.. but still no luck. Obviously we don't want to have
> > boundless
> > > > open
> > > > > > files..
> > > > > >
> > > > > > We also tried to play with commit interval(
> > kafka.commit.interval.ms)
> > > > and
> > > > > > cache size (kafka.cache.max.bytes.buffering) .. but no luck
there
> > > > either.
> > > > > >
> > > > > > KAFKA-3904 talks about it.. but it was resolved long back..
> > > > > >
> > > > > > Any other config tuning we have to do?
> > > > > >
> > > > > > Appreciate any help in this regard!
> > > > > >
> > > > > > Thanks,
> > > > > > Kiran
> > > > > >
> > > > > >
> > > > >
> > > >
> > > > Hi Patrik/All,
> > > >
> > > > Thanks for providing some valuable pointer!
> > > >
> > > > I did that & it doesn't seems to work.
> > > >
> > > > Here is how my custom config setter looks like:
> > > >
> > > >
> > > >
> > ----------------------------------------------------------------------------------------------------
> > > >
> > > >
> > > > @Override
> > > >           public void setConfig(final String storeName, final Options
> > > > options, final Map<String, Object> configs) {
> > > >             // See #1 below.
> > > >             BlockBasedTableConfig tableConfig = new
> > > > org.rocksdb.BlockBasedTableConfig();
> > > >
> > > >             tableConfig.setBlockCacheSize(16 * 1024 * 1024L);
> > > >             // See #2 below.
> > > >             tableConfig.setBlockSize(16 * 1024L);
> > > >             // See #3 below.
> > > >             tableConfig.setCacheIndexAndFilterBlocks(false);
> > > >            // tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
> > > >             options.setMaxOpenFiles(-1);
> > > >             options.setTableFormatConfig(tableConfig);
> > > >             // See #4 below.
> > > >             options.setMaxWriteBufferNumber(2);
> > > >           }
> > > >
> > > >
> > ----------------------------------------------------------------------------------------------------
> > > > I tried many options with this:
> > > > 1. tableConfig.setCacheIndexAndFilterBlocks(true);  ----> as per docs
(
> > > >
> > https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
> > )
> > > > if we set to true, the max_open_files shouldn't play a role. But I was
> > > > still getting too many open files exception from Rocksdb
> > > >
> > > > 2. tableConfig.setCacheIndexAndFilterBlocks(true);
> > > > tableConfig.setPinL0FilterAndIndexBlocksInCache(true); ----> no luck;
> > same
> > > > exception
> > > >
> > > > 3. tableConfig.setCacheIndexAndFilterBlocks(false);  and
> > > >  options.setMaxOpenFiles(50000);    ----->  This also resulted with
> > same
> > > > exception.. with java process having ~24K open files
> > > >
> > > > 4. tableConfig.setCacheIndexAndFilterBlocks(false);  and
> > > >  options.setMaxOpenFiles(-1);    ----->  This also resulted with same
> > > > exception.. with java process having ~24K ope files. As per the doc,
> > if we
> > > > set to -1, it means infinite and controlled by underlying OS limit.
> > > >
> > > > I am using MacOS Mojave (10.14.4) and OpenJDK 11.0.2.  At OS level, I
> > have
> > > > bumped the open files limit to 1 million.
> > > >
> > > > $ ulimit -a
> > > > core file size          (blocks, -c) 0
> > > > data seg size           (kbytes, -d) unlimited
> > > > file size               (blocks, -f) unlimited
> > > > max locked memory       (kbytes, -l) unlimited
> > > > max memory size         (kbytes, -m) unlimited
> > > > open files                      (-n) 1000000
> > > > pipe size            (512 bytes, -p) 1
> > > > stack size              (kbytes, -s) 8192
> > > > cpu time               (seconds, -t) unlimited
> > > > max user processes              (-u) 1418
> > > > virtual memory          (kbytes, -v) unlimited
> > > >
> > > >
> > > > Am I missing some other config here?
> > > >
> > > > Thanks,
> > > > Kiran
> > > >
> > > >
> > >
> >
> > Hi Patrik,
> >
> > Thanks for quick response!
> >
> > I just checked the state dir. It has total of 3152 sub folders and total
> > of 15451 files (within sub folders..).
> >
> > With this what should be the number that I should use?
> >
> > How many instances of rocksdb will be used? Any formula to determine that?
> >
> > Thanks again,
> > Kiran
> >

Mime
View raw message