I can reply from an user’s perspective – I defer to semantic guarantees to someone with more experience.
I’ve successfully implemented the following using a custom Accumulable class:
- Created a MapAccumulator with dynamic keys (they are driven by the data coming in), as opposed to creating many discrete accumulators
- The merge operation is add the values on key conflict
- I’m adding K->Vs to this accumulator in a variety of places (maps, flatmaps, transforms and updateStateBy key)
- In a foreachRdd at the end of the transformations I’m reading the accumulator and writing the counters to OpenTSDB
- after this I’m resetting it to the “zero” value (e.g. Empty map)
Everything works as expected in terms of functionality - with 2 caveats:
- On task/job failure you might get duplicate values for the tasks that are retried in the active job since adding to an Accumulator in a transformation is a side effect
- I’m partially working around this by also referring to the RDD time and overwriting the values in OpenTSDB (idempotent operation)
- If you have stateful transformations and you use checkpointing, the accumulator code becomes really intrusive in your codebase
- You will need to have a global singleton in your driver and “getInstance” in a foreachRdd or transform, to force code execution on the driver
- This is because on restoring from checkpoint your accumulators will be NULL as the checkpoint recovery makes no attempt to initialize them (See
Hope this helps,
Monday, October 26, 2015 at 11:13 AM
Accumulators internals and reliability
It seems like there is not much literature about Spark's Accumulators so I thought I'd ask here:
Do Accumulators reside in a Task ? Are they being serialized with the task ? Sent back on task completion as part of the ResultTask ?
Are they reliable ? If so, when ? Can I relay on accumulators value only after the task was successfully complete (meaning in the driver) ? Or also during the task execution as well (what about speculative execution) ?
What are the limitations on the number (or size) of Accumulators ?