kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrik Kleindl <pklei...@gmail.com>
Subject RocksDB not closed on error during CachingKeyValueStore.flush?
Date Tue, 23 Oct 2018 13:24:33 GMT
Hello

Can someone please verify if my assumption is correct?
In CachingKeyValueStore, if an exception happens during flush() the store
will not be closed properly.

@Override
public void flush() {
    lock.writeLock().lock();
    try {
        cache.flush(cacheName);
        underlying.flush();
    } finally {
        lock.writeLock().unlock();
    }
}

@Override
public void close() {
    flush();
    underlying.close();
    cache.close(cacheName);

An exception leading to this, notice that another store is already closed
and therefore not available:
2018-10-04 12:18:44,961 ERROR
[org.apache.kafka.streams.processor.internals.ProcessorStateManager]
(...-StreamThread-8) - task [8_11] Failed to close state store
...-STATE-STORE-0000000038: :
org.apache.kafka.streams.errors.InvalidStateStoreException: Store
KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed.
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132)
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245)
at
org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546)
at
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
at
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

If the store is not closed we have witnessed that the lock is RocksDB is
not removed properly which can lead to

2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   Caused
by: org.rocksdb.RocksDBException: While lock file:
...-STATE-STORE-0000000038/LOCK: No locks available
2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -  	at
org.rocksdb.RocksDB.open(Native Method)

best regards

Patrik

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message