kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jon Yeargers <jon.yearg...@cedexis.com>
Subject Re: What makes a KStream app exit?
Date Fri, 16 Dec 2016 19:19:47 GMT
And these bugs would cause the behaviors Im seeing?

On Fri, Dec 16, 2016 at 10:45 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> We just discovered a couple of bugs with regard to standby tasks... Not
> all bug fix PRs got merged yet.
>
> You can try running on trunk to get those fixes. Should only be a few
> days until the fixes get merged.
>
>
> -Matthias
>
> On 12/16/16 9:10 AM, Jon Yeargers wrote:
> > Have started having this issue with another KStream based app. Digging
> > through logs I ran across this message:
> >
> > When I've seen it before it certainly does kill the application. At the
> end
> > of the SNIP you can see the exit process starting.
> >
> >
> > 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> creating
> > new standby task 0_0
> >
> > 2016-12-16 17:04:51,507 [StreamThread-1] INFO
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> Creating
> > new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]
> >
> > 2016-12-16 17:04:51,508 [StreamThread-1] INFO
> > o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing state
> > stores
> >
> > 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
> > o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
> fetching
> > committed offsets for partitions: [rtdetail_breakout-0]
> >
> > 2016-12-16 17:04:51,819 [StreamThread-1] ERROR
> > o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > RtDetailBreakoutProcessor fa
> >
> > iled on partition assignment
> >
> > java.lang.UnsupportedOperationException: null
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StandbyContextImpl.
> recordCollector(StandbyContextImpl.java:81)
> >
> >         at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
> StoreChangeLogger.java:54)
> >
> >         at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
> StoreChangeLogger.java:46)
> >
> >         at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:197)
> >
> >         at
> > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> MeteredWindowStore.java:66)
> >
> >         at
> > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> CachingWindowStore.java:64)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:86)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StandbyTask.<init>(
> StandbyTask.java:68)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread.
> createStandbyTask(StreamThread.java:733)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStandbyTasks(StreamThread.java:757)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$200(
> StreamThread.java:69)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:125)
> >
> >         at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:229)
> >
> >         at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:313)
> >
> >         at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:277)
> >
> >         at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:260)
> >
> >         at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> >
> >         at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:442)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > 2016-12-16 17:04:51,820 [StreamThread-1] DEBUG
> > o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
> fetching
> > committed offsets for partitions: [rtdetail_breakout-2,
> > rtdetail_breakout-1, rtd
> >
> > etail_breakout-6, rtdetail_breakout-5, rtdetail_breakout_filtered-1]
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout-2 to the committed offset 1989670807
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout-1 to the committed offset 1991427117
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout-6 to the committed offset 1986565752
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout-5 to the committed offset 1982149459
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout_filtered-1 to the committed offset 92917
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.kafka.clients.NetworkClient - Initiating connection to node 115 at
> > 54.154.234.110:9092.
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.kafka.clients.NetworkClient - Initiating connection to node 114 at
> > 54.194.192.105:9092.
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.kafka.clients.NetworkClient - Initiating connection to node 112 at
> > 54.171.236.113:9092.
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.kafka.clients.NetworkClient - Initiating connection to node 111 at
> > 54.154.115.41:9092.
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] INFO
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> Shutting
> > down
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> > shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1,
> 0_5,
> > 0_6]] and standby tasks [[]]
> >
> > On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers <jon.yeargers@cedexis.com>
> > wrote:
> >
> >> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
> >> called unknowingly.
> >>
> >> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> >>             try {
> >>                 LOGGER.warn("ShutdownHook");
> >>                 kafkaStreams.close();
> >>             } catch (Exception e) {
> >>                 // ignored
> >>             }
> >>         }));
> >>
> >>
> >> Ran another test and the app closed after ~40min. The above message
> >> appears 3rd from the end (several seconds after the shutdown process has
> >> commenced).
> >>
> >> (attaching log section)
> >>
> >> This has *got* to be something that I've setup improperly... I just
> can't
> >> seem to see it.
> >>
> >> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers <jon.yeargers@cedexis.com
> >
> >> wrote:
> >>
> >>> Im seeing instances where my apps are exiting (gracefully, mind you)
> >>> without any obvious errors or cause. I have debug logs from many
> instances
> >>> of this and have yet to find a reason to explain what's happening.
> >>>
> >>> - nothing in the app log
> >>> - nothing in /var/log/messages (IE not OOM killed)
> >>> - not being closed via /etc/init.d
> >>> - nothing in the broker logs
> >>>
> >>> Running 0.10.1.0
> >>>
> >>> example log:
> >>>
> >>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0
> >>> /view?usp=sharing
> >>>
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message