flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints
Date Wed, 15 Mar 2017 16:53:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15926550#comment-15926550
] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3549#discussion_r106221016
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
---
    @@ -240,6 +240,7 @@ public void open(Configuration configuration) {
     		}
     
     		if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled())
{
    +			System.out.println(((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled());
    --- End diff --
    
    This looks like a debug message


> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>             Fix For: 1.3.0, 1.1.5, 1.2.1
>
>
> Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each invoke() and
decremented on each callback, used to check if the producer needs to sync on pending callbacks
on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff after flushing
the {{pendingRecords == 0}} and {{asyncException == null}} (currently, we’re only checking
{{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the {{snapshotState}}
method both before and after flushing and {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message