flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stephan Ewen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3368) Kafka 0.8 consumer fails to recover from broker shutdowns
Date Tue, 01 Mar 2016 19:26:18 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15174251#comment-15174251
] 

Stephan Ewen commented on FLINK-3368:
-------------------------------------

In Flink 1.0, that is pretty much how it works: If brokers rebalance, the partition handling
marked for re-assignment and and picked up by a different broker connection.
Works pretty quick and the job does not notice anything.

We tested it also on Kafka installations where the brokers were out of sync concerning their
metadata, so the cluster "rebalanced" many times very quickly.

> Kafka 0.8 consumer fails to recover from broker shutdowns
> ---------------------------------------------------------
>
>                 Key: FLINK-3368
>                 URL: https://issues.apache.org/jira/browse/FLINK-3368
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.0
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> It seems that the Kafka Consumer (0.8) fails to restart a job after it failed due to
a Kafka broker shutdown.
> {code}
> java.lang.Exception: Unable to get last offset for partitions [FetchPartition {topic=a,
partition=13, offset=-915623761776}, FetchPartition {topic=b, partition=13, offset=-915623761776},
FetchPartition {topic=c, partition=13, offset=-915623761776}, FetchPartition {topic=d, partition=13,
offset=-915623761776}, FetchPartition {topic=e, partition=13, offset=-915623761776}, FetchPartition
{topic=f, partition=13, offset=-915623761776}, FetchPartition {topic=g, partition=13, offset=-915623761776}].
> Exception for partition 13: kafka.common.NotLeaderForPartitionException
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> 	at java.lang.Class.newInstance(Class.java:442)
> 	at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> 	at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
> 	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:551)
> 	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:379)
> {code}
> I haven't understood the cause of this issue, but I'll investigate it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message