I can reply from an user’s perspective – I defer to semantic guarantees to someone with
more experience.
I’ve successfully implemented the following using a custom Accumulable class:
* Created a MapAccumulator with dynamic keys (they are driven by the data coming in),
as opposed to creating many discrete accumulators
* The merge operation is add the values on key conflict
* I’m adding K->Vs to this accumulator in a variety of places (maps, flatmaps, transforms
and updateStateBy key)
* In a foreachRdd at the end of the transformations I’m reading the accumulator and
writing the counters to OpenTSDB
* after this I’m resetting it to the “zero” value (e.g. Empty map)
Everything works as expected in terms of functionality - with 2 caveats:
* On task/job failure you might get duplicate values for the tasks that are retried in
the active job since adding to an Accumulator in a transformation is a side effect
* I’m partially working around this by also referring to the RDD time and overwriting
the values in OpenTSDB (idempotent operation)
* If you have stateful transformations and you use checkpointing, the accumulator code
becomes really intrusive in your codebase
* You will need to have a global singleton in your driver and “getInstance” in
a foreachRdd or transform, to force code execution on the driver
* This is because on restoring from checkpoint your accumulators will be NULL as the
checkpoint recovery makes no attempt to initialize them (See SPARK-5206<https://issues.apache.org/jira/browse/SPARK-5206?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22accumulator%20null%22>)
Hope this helps,
-adrian
From: "Sela, Amit"
Date: Monday, October 26, 2015 at 11:13 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Accumulators internals and reliability
It seems like there is not much literature about Spark's Accumulators so I thought I'd ask
here:
Do Accumulators reside in a Task ? Are they being serialized with the task ? Sent back on
task completion as part of the ResultTask ?
Are they reliable ? If so, when ? Can I relay on accumulators value only after the task was
successfully complete (meaning in the driver) ? Or also during the task execution as well
(what about speculative execution) ?
What are the limitations on the number (or size) of Accumulators ?
Thanks,
Amit
|