kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka streams application failed after a long time due to rocks db errors
Date Wed, 13 Sep 2017 00:57:59 GMT
Hi Sachin,

Debugging wise, unfortunately today RocksDB JNI does not provide better
stack traces. However, in newer versions (0.11.0+) we have been using a new
version of RocksDB so it will print the error message instead of empty
"org.rocksdb.RocksDBException:"
or garbage like "org.rocksdb.RocksDBException: N".

Issue wise, I suspect you are hit by a known issue of state store file lock
such that threads within the same JVM may override on each other's file
locks and hence get to concurrent access to the underlying rocksdb files,
causing it to fail (with better error message it should help a lot
validating). This issue has been fixed in the coming 0.11.0.1 and I'd
recommend you upgrading to that version and see if this issue goes away.


Guozhang


On Sun, Sep 10, 2017 at 9:46 PM, Sachin Mittal <sjmittal@gmail.com> wrote:

> Hi,
> We have been running a clustered kafka streams application and say after 3
> months or so of uninterrupted running few threads of couple of instances
> failed.
> We checked the logs and we found these two common stack traces pointing to
> underlying cause of fetch and put operations of rocksdb.
>
> Cause 1 - flush
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
> while executing flush from store key-table-201709080400
>     at
> org.apache.kafka.streams.state.internals.RocksDBStore.
> flushInternal(RocksDBStore.java:354)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.RocksDBStore.
> flush(RocksDBStore.java:345)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:134)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(
> RocksDBSegmentedBytesStore.java:114)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.WrappedStateStore$
> AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.flush(
> MeteredSegmentedBytesStore.java:111)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.flush(
> RocksDBWindowStore.java:91)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(
> ProcessorStateManager.java:323)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     ...
> Caused by: org.rocksdb.RocksDBException:
>     at org.rocksdb.RocksDB.flush(Native Method) ~[rocksdbjni-5.0.1.jar:na]
>     at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> ~[rocksdbjni-5.0.1.jar:na]
>     at
> org.apache.kafka.streams.state.internals.RocksDBStore.
> flushInternal(RocksDBStore.java:352)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     ...
>
> Cause 2 - put
> ERROR 2017-09-08 09:40:47,305 [StreamThread-1]:
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
> while executing put key .... and value [...] from store
> key-table-201709080410
>     at
> org.apache.kafka.streams.state.internals.RocksDBStore.
> putInternal(RocksDBStore.java:257)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.RocksDBStore.
> put(RocksDBStore.java:232)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(
> RocksDBSegmentedBytesStore.java:74)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesSto
> re.put(ChangeLoggingSegmentedBytesStore.java:54)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(
> MeteredSegmentedBytesStore.java:101)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(
> RocksDBWindowStore.java:109)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$
> KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:83)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:83)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:70)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:197)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     ...
> Caused by: org.rocksdb.RocksDBException:
>     at org.rocksdb.RocksDB.put(Native Method) ~[rocksdbjni-5.0.1.jar:na]
>     at org.rocksdb.RocksDB.put(RocksDB.java:488)
> ~[rocksdbjni-5.0.1.jar:na]
>     at
> org.apache.kafka.streams.state.internals.RocksDBStore.
> putInternal(RocksDBStore.java:254)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     ...
>
> So I had few questions here:
> 1. Can we know anything from stack trace as what caused rocksdb to fail at
> these operations.
> 2. Is there a way we can get to know more about failure by looking into
> some rocks db logs.
> 3. Are these some known issues and upgrading to 0.11.x will fix such
> issues?
>
> Thanks
> Sachin
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message