spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "holdenk (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-12469) Consistent Accumulators for Spark
Date Mon, 21 Dec 2015 21:13:46 GMT
holdenk created SPARK-12469:
-------------------------------

             Summary: Consistent Accumulators for Spark
                 Key: SPARK-12469
                 URL: https://issues.apache.org/jira/browse/SPARK-12469
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
            Reporter: holdenk


Tasks executed on Spark workers are unable to modify values from the driver, and accumulators
are the one exception for this. Accumulators in Spark are implemented in such a way that when
a stage is recomputed (say for cache eviction) the accumulator will be updated a second time.
This makes accumulators inside of transformations more difficult to use for things like counting
invalid records (one of the primary potential use cases of collecting side information during
a transformation). However in some cases this counting during re-evaluation is exactly the
behaviour we want (say in tracking total execution time for a particular function). Spark
would benefit from a version of accumulators which did not double count even if stages were
re-executed.

Motivating example:
{code}
val parseTime = sc.accumulator(0L)
val parseFailures = sc.accumulator(0L)
val parsedData = sc.textFile(...).flatMap { line =>
  val start = System.currentTimeMillis()
  val parsed = Try(parse(line))
  if (parsed.isFailure) parseFailures += 1
  parseTime += System.currentTimeMillis() - start
  parsed.toOption
}
parsedData.cache()

val resultA = parsedData.map(...).filter(...).count()

// some intervening code.  Almost anything could happen here -- some of parsedData may
// get kicked out of the cache, or an executor where data was cached might get lost

val resultB = parsedData.filter(...).map(...).flatMap(...).count()

// now we look at the accumulators
{code}

Here we would want parseFailures to only have been added to once for every line which failed
to parse.  Unfortunately, the current Spark accumulator API doesn’t support the current
parseFailures use case since if some data had been evicted its possible that it will be double
counted.


See the full design document at https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message