flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Seth Wiesman <swies...@mediamath.com>
Subject OperatorState partioning when recovering from failure
Date Thu, 04 May 2017 13:45:54 GMT
I am curious about how operator state is repartitioned to subtasks when a job is resumed from
a checkpoint or savepoint. The reason is that I am having issues with the ContinuousFileReaderOperator
when recovering from a failure.

I consume most of my data from files off S3. I have a custom file monitor that understands
how to walk my directory structure and outputs TimestampedFileSplits downstream in chronological
order to the stock ContinuousFileReaderOperator. The reader consumes those splits and stores
them a priority queue based on their last modified time ensuring that files are read in chronological
order which is exactly what I want. The problem is when recovering, the unread splits being
partitioned out to each of the subtasks seem to be heavily skewed in terms of last modified

While each task may have a similar number of files I find then one or two will have a disproportionate
number of old files. This in turn holds back my watermark (sometimes for several hours depending
on the number of unread splits) which keeps timers from firing, windows from purging, etc.

I was hoping there were some way I could add a custom partitioner to ensure that splits are
uniformly distributed in a temporal manner or if someone had other ideas of how I could mitigate
the problem.

Thank you,

Seth Wiesman

View raw message