flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kostas Kloudas (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5083) Race condition in Rolling/Bucketing Sink pending files cleanup
Date Wed, 16 Nov 2016 22:31:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671880#comment-15671880

Kostas Kloudas commented on FLINK-5083:

Thanks for reporting it!

There is a pending pull request here: https://github.com/apache/flink/pull/2797 that removes
the deleting all together. 
The reason is that deletion of lingering files does not play well with rescaling, which re-shuffles
the different state of 
individual tasks.

Given that this PR is about to be merged, I suppose that this issue will be resolved.
In addition I also have another PR for the RollingSink ready to open as soon as the aforementioned
one gets merged.

> Race condition in Rolling/Bucketing Sink pending files cleanup
> --------------------------------------------------------------
>                 Key: FLINK-5083
>                 URL: https://issues.apache.org/jira/browse/FLINK-5083
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Cliff Resnick
> In both Open and Restore methods there is code that:
> 1. gets a recursive listing from baseDir
> 2. iterates listing and name checks filenames based on subtaskIndex and other criteria
to find pending or in-progress files. If found delete.
> The problem is that the recursive listing gets all files for all subtaskIndexes. The
race error is when #hasNext is called as part of the iteration, a hidden existence check is
made on the "next" file, which was deleted by another task after-listing but pre-iteration,
so an error is thrown and the job fails. 
> Depending on the number of pending files, this condition may outlast the number of job
retries, each failing on a different file.
> A solution would be use #listStatus instead. The hadoop FileSystem supports a PathFilter
in its #listStatus calls, but not in the recursive #listFiles call. The cleanup is performed
from the baseDir so the recursive listing would have to be in Flink. 
> This touches on another issue. Over time, the directory listing is bound to get very
large, and re-listing everything from the baseDir may get increasingly expensive, especially
if the Fs is S3. Maybe we can have a Bucketer callback to return a list of cleanup root directories
based on the current file? I'm guessing most people are using time based bucketing, so there's
only so much of a period where cleanup will matter. If so, then this would solve for the above
recursive listing problem.

This message was sent by Atlassian JIRA

View raw message