flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sebastian Klemke (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
Date Tue, 07 Jun 2016 17:29:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15318949#comment-15318949
] 

Sebastian Klemke commented on FLINK-4015:
-----------------------------------------

For our application, it should never drop records. Instead, it should be determined which
record failed and this record and the following records produced to that partition should
be repeated in order. This would require keeping a list of sent but not acknowledged records
per partition in the producer. But for other applications, dropping might be suitable.

> FlinkKafkaProducer08 fails when partition leader changes
> --------------------------------------------------------
>
>                 Key: FLINK-4015
>                 URL: https://issues.apache.org/jira/browse/FLINK-4015
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.2
>            Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager                          - Status
of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> 	at OPERATOR.flatMap2(OPERATOR.java:82)
> 	at OPERATOR.flatMap2(OPERATOR.java:16)
> 	at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
> 	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
> 	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next operator
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> 	... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next operator
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> 	... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is not the
leader for that topic-partition.
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> 	... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message