spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Tanase <>
Subject Re: Accumulators internals and reliability
Date Mon, 26 Oct 2015 10:29:39 GMT
I can reply from an user’s perspective – I defer to semantic guarantees to someone with
more experience.

I’ve successfully implemented the following using a custom Accumulable class:

  *   Created a MapAccumulator with dynamic keys (they are driven by the data coming in),
as opposed to creating many discrete accumulators
     *   The merge operation is add the values on key conflict
  *   I’m adding K->Vs to this accumulator in a variety of places (maps, flatmaps, transforms
and updateStateBy key)
  *   In a foreachRdd at the end of the transformations I’m reading the accumulator and
writing the counters to OpenTSDB
     *   after this I’m resetting it to the “zero” value (e.g. Empty map)

Everything works as expected in terms of functionality - with 2 caveats:

  *   On task/job failure you might get duplicate values for the tasks that are retried in
the active job since adding to an Accumulator in a transformation is a side effect
     *   I’m partially working around this by also referring to the RDD time and overwriting
the values in OpenTSDB (idempotent operation)
  *   If you have stateful transformations and you use checkpointing, the accumulator code
becomes really intrusive in your codebase
     *   You will need to have a global singleton in your driver and “getInstance” in
a foreachRdd or transform, to force code execution on the driver
     *   This is because on restoring from checkpoint your accumulators will be NULL as the
checkpoint recovery makes no attempt to initialize them (See SPARK-5206<>)

Hope this helps,

From: "Sela, Amit"
Date: Monday, October 26, 2015 at 11:13 AM
To: "<>"
Subject: Accumulators internals and reliability

It seems like there is not much literature about Spark's Accumulators so I thought I'd ask

Do Accumulators reside in a Task ? Are they being serialized with the task ? Sent back on
task completion as part of the ResultTask ?

Are they reliable ? If so, when ? Can I relay on accumulators value only after the task was
successfully complete (meaning in the driver) ? Or also during the task execution as well
(what about speculative execution) ?

What are the limitations on the number (or size) of Accumulators ?

View raw message