flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jürgen Kreileder (JIRA) <j...@apache.org>
Subject [jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
Date Wed, 13 Mar 2019 15:58:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16791832#comment-16791832
] 

Jürgen Kreileder commented on FLINK-11654:
------------------------------------------

[~aljoscha] There might be an additional issue here. I'm occasionally seeing this problem
with a single sink which has a unique name/uid. Might parallelism be an issue?

> Multiple transactional KafkaProducers writing to same cluster have clashing transaction
IDs
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11654
>                 URL: https://issues.apache.org/jira/browse/FLINK-11654
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.7.1
>            Reporter: Jürgen Kreileder
>            Priority: Major
>             Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic from identically
named sinks. When EXACTLY_ONCE semantic is enabled for the KafkaProducers we run into a lot
of ProducerFencedExceptions and all jobs go into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing append operation
on partition finding-commands-dev-1-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer
valid. There is probably another producer with a newer epoch. 483 (request epoch), 484 (server
epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash between
different Jobs (and Clusters).
>  
>  
> {code:java}
> --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
>                 nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)
getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message