flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 周思华 <summerle...@163.com>
Subject Re:Re: Problem with Flink restoring from checkpoints
Date Thu, 20 Jul 2017 02:46:46 GMT
Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's state? If
so, then i think the problem is not about Kafka, but about the DateTimeBucketer. Flink won't
take snapshot for the DataTimeBucketer if it not in any state.

Sihua Zhou

At 2017-07-20 03:02:20, "Fabian Hueske" <fhueske@gmail.com> wrote:

Hi Fran,

did you observe actual data loss due to the problem you are describing or are you discussing
a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes these in the
checkpoints. In case of a recovery, it does not rely on the offsets which were committed back
to Kafka but only on the offsets it checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can give a more
detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <francisco.blaya@hivehome.com>:


We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to
S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity
period of 5 mins.The rate at which events are written to Kafka in the first place is very
low so it is easy for us to investigate how the Flink job would recover in respect to Kafka
offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence
of this seems to be that if the job gets cancelled before the acked events are flushed to
S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to
keep in its checkpointed state the fact that it acked those events but never flushed them
to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration
key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)"
in the job so that they don't automatically get cleaned up when the job gets cancelled or
the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still
we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for
the use of the individual(s) to which it is addressed. It may contain information which is
confidential and/or covered by legal professional or other privilege. The views expressed
in this email are not necessarily the views of Centrica plc, and the company, its directors,
officers or employees make no representation or accept any liability for their accuracy or
completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with
its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

View raw message