kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: What makes a KStream app exit?
Date Fri, 16 Dec 2016 19:53:22 GMT
I guess. It's bugs, so always hard to be 100% sure.

We know about a null-pointer bug in task assignment/creating -- so I
assume it what you see.

-Matthias

On 12/16/16 11:19 AM, Jon Yeargers wrote:
> And these bugs would cause the behaviors Im seeing?
> 
> On Fri, Dec 16, 2016 at 10:45 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> We just discovered a couple of bugs with regard to standby tasks... Not
>> all bug fix PRs got merged yet.
>>
>> You can try running on trunk to get those fixes. Should only be a few
>> days until the fixes get merged.
>>
>>
>> -Matthias
>>
>> On 12/16/16 9:10 AM, Jon Yeargers wrote:
>>> Have started having this issue with another KStream based app. Digging
>>> through logs I ran across this message:
>>>
>>> When I've seen it before it certainly does kill the application. At the
>> end
>>> of the SNIP you can see the exit process starting.
>>>
>>>
>>> 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
>>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>> creating
>>> new standby task 0_0
>>>
>>> 2016-12-16 17:04:51,507 [StreamThread-1] INFO
>>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>> Creating
>>> new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]
>>>
>>> 2016-12-16 17:04:51,508 [StreamThread-1] INFO
>>> o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing state
>>> stores
>>>
>>> 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
>>> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
>> fetching
>>> committed offsets for partitions: [rtdetail_breakout-0]
>>>
>>> 2016-12-16 17:04:51,819 [StreamThread-1] ERROR
>>> o.a.k.c.c.i.ConsumerCoordinator - User provided listener
>>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
>>> RtDetailBreakoutProcessor fa
>>>
>>> iled on partition assignment
>>>
>>> java.lang.UnsupportedOperationException: null
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StandbyContextImpl.
>> recordCollector(StandbyContextImpl.java:81)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
>> StoreChangeLogger.java:54)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
>> StoreChangeLogger.java:46)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
>> RocksDBWindowStore.java:197)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
>> MeteredWindowStore.java:66)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.CachingWindowStore.init(
>> CachingWindowStore.java:64)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.AbstractTask.
>> initializeStateStores(AbstractTask.java:86)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StandbyTask.<init>(
>> StandbyTask.java:68)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StreamThread.
>> createStandbyTask(StreamThread.java:733)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.addStandbyTasks(StreamThread.java:757)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StreamThread.access$200(
>> StreamThread.java:69)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StreamThread$1.
>> onPartitionsAssigned(StreamThread.java:125)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>> onJoinComplete(ConsumerCoordinator.java:229)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:313)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:277)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:260)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1013)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:979)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:442)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:242)
>>>
>>> 2016-12-16 17:04:51,820 [StreamThread-1] DEBUG
>>> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
>> fetching
>>> committed offsets for partitions: [rtdetail_breakout-2,
>>> rtdetail_breakout-1, rtd
>>>
>>> etail_breakout-6, rtdetail_breakout-5, rtdetail_breakout_filtered-1]
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout-2 to the committed offset 1989670807
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout-1 to the committed offset 1991427117
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout-6 to the committed offset 1986565752
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout-5 to the committed offset 1982149459
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout_filtered-1 to the committed offset 92917
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.kafka.clients.NetworkClient - Initiating connection to node 115 at
>>> 54.154.234.110:9092.
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.kafka.clients.NetworkClient - Initiating connection to node 114 at
>>> 54.194.192.105:9092.
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.kafka.clients.NetworkClient - Initiating connection to node 112 at
>>> 54.171.236.113:9092.
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.kafka.clients.NetworkClient - Initiating connection to node 111 at
>>> 54.154.115.41:9092.
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] INFO
>>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>> Shutting
>>> down
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>>> shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1,
>> 0_5,
>>> 0_6]] and standby tasks [[]]
>>>
>>> On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers <jon.yeargers@cedexis.com>
>>> wrote:
>>>
>>>> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
>>>> called unknowingly.
>>>>
>>>> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>>>>             try {
>>>>                 LOGGER.warn("ShutdownHook");
>>>>                 kafkaStreams.close();
>>>>             } catch (Exception e) {
>>>>                 // ignored
>>>>             }
>>>>         }));
>>>>
>>>>
>>>> Ran another test and the app closed after ~40min. The above message
>>>> appears 3rd from the end (several seconds after the shutdown process has
>>>> commenced).
>>>>
>>>> (attaching log section)
>>>>
>>>> This has *got* to be something that I've setup improperly... I just
>> can't
>>>> seem to see it.
>>>>
>>>> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers <jon.yeargers@cedexis.com
>>>
>>>> wrote:
>>>>
>>>>> Im seeing instances where my apps are exiting (gracefully, mind you)
>>>>> without any obvious errors or cause. I have debug logs from many
>> instances
>>>>> of this and have yet to find a reason to explain what's happening.
>>>>>
>>>>> - nothing in the app log
>>>>> - nothing in /var/log/messages (IE not OOM killed)
>>>>> - not being closed via /etc/init.d
>>>>> - nothing in the broker logs
>>>>>
>>>>> Running 0.10.1.0
>>>>>
>>>>> example log:
>>>>>
>>>>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0
>>>>> /view?usp=sharing
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Mime
View raw message