flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Regarding BucketingSink
Date Mon, 19 Feb 2018 15:47:23 GMT
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed setting, and
especially with rescaling of Flink operators, it is sufficiently hard to figure out which
of the pending files you actually can delete and which of them you have to leave because they
will get moved to "final" as part of recovering from a checkpoint on some other parallel instance
of the sink.

You should only have these dangling pending files after a failure-recovery cycle, as you noticed.
My suggestion would be to periodically clean up older pending files.

Best,
Aljoscha

> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrmann@apache.org> wrote:
> 
> Hi Vishal,
> 
> what pending files should indeed get eventually finalized. This happens on a checkpoint
complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit
more light into the problem.
> 
> In order to further debug the problem, it would be really helpful to get access to DEBUG
log files of a TM which runs the BucketingSink.
> 
> Cheers,
> Till
> 
> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu.biz@gmail.com <mailto:kong.mu.biz@gmail.com>>
wrote:
> Hi Vishal,
> 
> I have the same concern about save pointing in BucketingSink.
> As for your question, I think before the pending files get cleared in handleRestoredBucketState
.
> They are finalized in notifyCheckpointComplete
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628>
> 
> I'm looking into this part of the source code now, since we are experiencing some unclosed
files after check pointing.
> It would be great if you can share more if you find something new about your problem,
which might help with our problem.
> 
> Best regards,
> Mu
> 
> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>
wrote:
> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
> 
> 
> This is strange, we had a few retries b'coz of an OOM on one of the TMs and we see this
situation. 2 files ( on either sides )  that were dealt with fine but a dangling .pending
file. I am sure this is not what is meant to be.   We I think have an edge condition and looking
at the code it is not obvious. May be some one who wrote the code can shed some light as to
how can this happen.
> 
> 
> 
> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>
wrote:
> without --allowNonRestoredState, on a suspend/resume we do see the length file along
with the finalized file ( finalized during resume ) 
> 
> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
> 
> that does makes much more sense. 
> 
> I guess we should document --allowNonRestoredState better ? It seems it actually drops
state ?
> 
> 
> 
> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>
wrote:
> This is 1.4 BTW.  I am not sure that I am reading this correctly but the lifecycle of
cancel/resume is 2 steps
> 
> 
> 
> 1. Cancel job with SP
> 
> 
> closeCurrentPartFile
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549>
> 
> is called from close()
> 
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416>
> 
> 
> and that moves files to pending state.  That I would presume is called when one does
a cancel.
> 
> 
> 
> 2. The restore on resume 
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369>
> 
> calls 
> 
> handleRestoredBucketState
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704>
> 
> clears the pending files from state without finalizing them?
> 
> 
> 
> That does not seem to be right. I must be reading the code totally wrong ?
> 
> I am not sure also whether --allowNonRestoredState is skipping getting the state . At
least https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints>
is not exactly clear what it does if we add an operator ( GDF I think will add a new operator
in the DAG without state even if stateful, in my case the Map operator is not even stateful
)
> 
> 
> Thanks and please bear with me if this is all something pretty simple.
> 
> Vishal
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <vishal.santoshi@gmail.com <mailto:vishal.santoshi@gmail.com>>
wrote:
> What should be the behavior of BucketingSink vis a vis state ( pending , inprogess and
finalization ) when we suspend and resume ?
> 
> So I did this
> 
> * I had a pipe writing to hdfs suspend and resume using 
> --allowNonRestoredState as in I had added a harmless MapOperator ( stateless ).
> 
> * I see that a file on hdfs, the file being written to ( before the cancel with save
point )  go into a pending state  _part-0-21.pending 
> 
> * I see a new file being written to in the resumed pipe    _part-0-22.in-progress.
> 
> What  I do not see is the file in  _part-0-21.pending being finalized ( as in renamed
to a just part-0-21. I would have assumed that would be the case in this controlled suspend/resume
circumstance. Further it is a rename and hdfs mv is not an expensive operation.
> 
> 
> Am I understanding the process correct and it yes any pointers ?
> 
> Regards,
> 
> Vishal
> 
> 
> 
> 
> 


Mime
View raw message