kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jon Yeargers <jon.yearg...@cedexis.com>
Subject Re: What makes a KStream app exit?
Date Fri, 16 Dec 2016 17:10:57 GMT
Have started having this issue with another KStream based app. Digging
through logs I ran across this message:

When I've seen it before it certainly does kill the application. At the end
of the SNIP you can see the exit process starting.


2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] creating
new standby task 0_0

2016-12-16 17:04:51,507 [StreamThread-1] INFO
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Creating
new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]

2016-12-16 17:04:51,508 [StreamThread-1] INFO
o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing state
stores

2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor fetching
committed offsets for partitions: [rtdetail_breakout-0]

2016-12-16 17:04:51,819 [StreamThread-1] ERROR
o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
RtDetailBreakoutProcessor fa

iled on partition assignment

java.lang.UnsupportedOperationException: null

        at
org.apache.kafka.streams.processor.internals.StandbyContextImpl.recordCollector(StandbyContextImpl.java:81)

        at
org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:54)

        at
org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:46)

        at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:197)

        at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)

        at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)

        at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

        at
org.apache.kafka.streams.processor.internals.StandbyTask.<init>(StandbyTask.java:68)

        at
org.apache.kafka.streams.processor.internals.StreamThread.createStandbyTask(StreamThread.java:733)

        at
org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:757)

        at
org.apache.kafka.streams.processor.internals.StreamThread.access$200(StreamThread.java:69)

        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:125)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:229)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:260)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:442)

        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

2016-12-16 17:04:51,820 [StreamThread-1] DEBUG
o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor fetching
committed offsets for partitions: [rtdetail_breakout-2,
rtdetail_breakout-1, rtd

etail_breakout-6, rtdetail_breakout-5, rtdetail_breakout_filtered-1]

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout-2 to the committed offset 1989670807

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout-1 to the committed offset 1991427117

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout-6 to the committed offset 1986565752

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout-5 to the committed offset 1982149459

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout_filtered-1 to the committed offset 92917

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.kafka.clients.NetworkClient - Initiating connection to node 115 at
54.154.234.110:9092.

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.kafka.clients.NetworkClient - Initiating connection to node 114 at
54.194.192.105:9092.

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.kafka.clients.NetworkClient - Initiating connection to node 112 at
54.171.236.113:9092.

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.kafka.clients.NetworkClient - Initiating connection to node 111 at
54.154.115.41:9092.

2016-12-16 17:04:51,821 [StreamThread-1] INFO
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Shutting
down

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1, 0_5,
0_6]] and standby tasks [[]]

On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers <jon.yeargers@cedexis.com>
wrote:

> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
> called unknowingly.
>
> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>             try {
>                 LOGGER.warn("ShutdownHook");
>                 kafkaStreams.close();
>             } catch (Exception e) {
>                 // ignored
>             }
>         }));
>
>
> Ran another test and the app closed after ~40min. The above message
> appears 3rd from the end (several seconds after the shutdown process has
> commenced).
>
> (attaching log section)
>
> This has *got* to be something that I've setup improperly... I just can't
> seem to see it.
>
> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers <jon.yeargers@cedexis.com>
> wrote:
>
>> Im seeing instances where my apps are exiting (gracefully, mind you)
>> without any obvious errors or cause. I have debug logs from many instances
>> of this and have yet to find a reason to explain what's happening.
>>
>> - nothing in the app log
>> - nothing in /var/log/messages (IE not OOM killed)
>> - not being closed via /etc/init.d
>> - nothing in the broker logs
>>
>> Running 0.10.1.0
>>
>> example log:
>>
>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0
>> /view?usp=sharing
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message