flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Flink Jira Bot (Jira)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed
Date Sun, 30 May 2021 23:15:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Flink Jira Bot updated FLINK-22497:
-----------------------------------
    Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community
manage its development. I see this issues has been marked as Major but is unassigned and neither
itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major"
to the issue". If this ticket is a Major, please either assign yourself or give an update.
Afterwards, please remove the label or in 7 days the issue will be deprioritized.


> When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-22497
>                 URL: https://issues.apache.org/jira/browse/FLINK-22497
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.11.2
>         Environment: hadoop-2.8.4,Flink-1.11.2
>            Reporter: ChangjiGuo
>            Priority: Major
>              Labels: stale-major
>         Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by procTimeService.
If the rolling interval is not met this time, it will be delayed to the next timer trigger
point (after 60s), so this is not real-time and does not match the maximum duration. For example,
if the checkpoint period is set to 60s, the file should be converted to finished at the second
checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of Bucket.write
method, the file will be set to finished as we expect at the second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
>     if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)
|| rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
>         if (LOG.isDebugEnabled()) {
>             LOG.info("Subtask {} closing in-progress part file for bucket id={} due to
element {}.", subtaskIndex, bucketId, element);
> 	}
> 		rollPartFile(currentTime);
>     }
>     inProgressPart.write(element, currentTime);
> }
> {code}
> Maybe we can replace periodic detection with this?
> Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message