flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
Date Thu, 21 May 2015 09:49:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553994#comment-14553994
] 

ASF GitHub Bot commented on FLINK-2004:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/674#discussion_r30787324
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
---
    @@ -225,7 +226,16 @@ public void close() {
     
     	@Override
     	public void restoreState(long[] state) {
    -		// we maintain the offsets in Kafka, so nothing to do.
    +		if(lastOffsets == null) {
    +			LOG.warn("Restore state called before open() has been called");
    +			return;
    +		}
    +		LOG.info("Restoring state to {}", Arrays.toString(state));
    +		if(lastOffsets.length != state.length) {
    --- End diff --
    
    How about sanity checking before logging that things are going to happen? Usually gives
better logging insights...


> Memory leak in presence of failed checkpoints in KafkaSource
> ------------------------------------------------------------
>
>                 Key: FLINK-2004
>                 URL: https://issues.apache.org/jira/browse/FLINK-2004
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Robert Metzger
>            Priority: Critical
>             Fix For: 0.9
>
>
> Checkpoints that fail never send a commit message to the tasks.
> Maintaining a map of all pending checkpoints introduces a memory leak, as entries for
failed checkpoints will never be removed.
> Approaches to fix this:
>   - The source cleans up entries from older checkpoints once a checkpoint is committed
(simple implementation in a linked hash map)
>   - The commit message could include the optional state handle (source needs not maintain
the map)
>   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message