flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Checkpointing large RocksDB state to S3 - tips?
Date Wed, 26 Oct 2016 15:54:13 GMT
Hi Josh,
might the bandwidth to S3 be shared by all the running nodes? (Not sure how
that is setup, so I'm just guessing here.)

If you're on 1.2-SNAPSHOT you should also get fully elastic jobs in about a
week. (I'm talking about the ability to restart from a savepoint with a
different parallelism here.) Keyed state and the Kafka source are already
properly elastic, the only missing bit are the timers (for the window
operator, mostly) but we're currently working on making that elastic as


On Tue, 25 Oct 2016 at 16:43 Josh <jofo90@gmail.com> wrote:

> Hi Aljoscha,
> Thanks for the reply!
> I found that my stateful operator (with parallelism 10) wasn't equally
> split between the task managers on the two nodes (it was split 9/1) - so I
> tweaked the task manager / slot configuration until Flink allocated them
> equally with 5 instances of the operator on each node. (Just wondering if
> there's a better way to get Flink to allocate this specific operator
> equally between nodes, regardless of the number of slots available on
> each?) Having split the stateful operator equally between 2 nodes, I am
> actually able to checkpoint 18.5MB of state in ~4 minutes. Which indicates
> an overall throughput of ~77MB/sec (38.5MB/sec per node).
> I did what you said and tried uploading a large file from one of those VMs
> to S3 using the AWS command line tool. It uploaded at a speed of ~76MB/sec.
> Which is nearly double 38MB/sec but at least it's not orders of magnitude
> out. Does that sound ok? - I guess there's more that goes on when Flink
> takes a checkpoint than just uploading anyway... I've upgraded my cluster
> to Flink 1.2-SNAPSHOT yesterday so yeah should be using the fully async
> mode.
> I'll have a proper look in the logs if I see it crash again, and for now
> will just add more nodes whenever we need to speed up the checkpointing.
> Thanks,
> Josh
> On Tue, Oct 25, 2016 at 3:12 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> Hi Josh,
> Checkpoints that take longer than the checkpoint interval should not be an
> issue (if you use an up-to-date version of Flink). The checkpoint
> coordinator will not issue another checkpoint while another one is still
> ongoing. Is there maybe some additional data for the crashes? A log perhaps?
> Regarding upload speed, yes, each instance of an operator is responsible
> for uploading its state so if state is equally distributed between
> operators on TaskManagers that would mean that each TaskManager would
> upload roughly the same amount of state. It might be interesting to see
> what the raw upload speed is when you have those to VMs upload to S3, if it
> is a lot larger than the speed you're seeing something would be wrong and
> we should investigate. One last thing: are you using the "fully async" mode
> of RocksDB? I think I remember that you do, just checking.
> If it is indeed a problem of upload speed to S3 per machine then yes,
> using more instances should speed up checkpointing.
> About incremental checkpoints: they're not going to make it into 1.2 with
> the current planning but after that, I don't know yet.
> Cheers,
> Aljoscha
> On Mon, 24 Oct 2016 at 19:06 Josh <jofo90@gmail.com> wrote:
> Hi all,
> I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am
> checkpointing a fairly large RocksDB state to S3.
> I've found that when the state size hits 10GB, the checkpoint takes around
> 6 minutes, according to the Flink dashboard. Originally my checkpoint
> interval was 5 minutes for the job, but I've found that the YARN container
> crashes (I guess because the checkpoint time is greater than the checkpoint
> interval), so have now decreased the checkpoint frequency to every 10
> minutes.
> I was just wondering if anyone has any tips about how to reduce the
> checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's
> uploading at ~30MB/sec. I believe the m3.xlarge instances should have
> around 125MB/sec network bandwidth each, so I think the bottleneck is S3.
> Since there are 2 instances, I'm not sure if that means each instance is
> uploading at 15MB/sec - do the state uploads get shared equally among the
> instances, assuming the state is split equally between the task managers?
> If the state upload is split between the instances, perhaps the only way
> to speed up the checkpoints is to add more instances and task managers, and
> split the state equally among the task managers?
> Also just wondering - is there any chance the incremental checkpoints work
> will be complete any time soon?
> Thanks,
> Josh

View raw message