flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cliff Resnick (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-5083) Race condition in Rolling/Bucketing Sink pending files cleanup
Date Wed, 16 Nov 2016 20:18:58 GMT
Cliff Resnick created FLINK-5083:

             Summary: Race condition in Rolling/Bucketing Sink pending files cleanup
                 Key: FLINK-5083
                 URL: https://issues.apache.org/jira/browse/FLINK-5083
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.1.3, 1.2.0
            Reporter: Cliff Resnick

In both Open and Restore methods there is code that:

1. gets a recursive listing from baseDir
2. iterates listing and name checks filenames based on subtaskIndex and other criteria to
find pending or in-progress files. If found delete.

The problem is that the recursive listing gets all files for all subtaskIndexes. The race
error is when #hasNext is called as part of the iteration, a hidden existence check is made
on the "next" file, which was deleted by another task after-listing but pre-iteration, so
an error is thrown and the job fails. 

Depending on the number of pending files, this condition may outlast the number of job retries,
each failing on a different file.

A solution would be use #listStatus instead. The hadoop FileSystem supports a PathFilter in
its #listStatus calls, but not in the recursive #listFiles call. The cleanup is performed
from the baseDir so the recursive listing would have to be in Flink. 

This touches on another issue. Over time, the directory listing is bound to get very large,
and re-listing everything from the baseDir may get increasingly expensive, especially if the
Fs is S3. Maybe we can have a Bucketer callback to return a list of cleanup root directories
based on the current file? I'm guessing most people are using time based bucketing, so there's
only so much of a period where cleanup will matter. If so, then this would solve for the above
recursive listing problem.


This message was sent by Atlassian JIRA

View raw message