spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <>
Subject [jira] [Commented] (SPARK-17621) Accumulator value is doubled when using DataFrame.orderBy()
Date Fri, 23 Sep 2016 12:18:20 GMT


Sean Owen commented on SPARK-17621:

Yes, in that sense, the result is correct. The code accumulates counts for every call to the
map function. The possible surprise here is that it's executed twice, because of the internal
details of other operations.

I think it's generally not reliable to use accumulators in transformations; see the docs:

bq. For accumulator updates performed inside actions only, Spark guarantees that each task’s
update to the accumulator will only be applied once, i.e. restarted tasks will not update
the value. In transformations, users should be aware of that each task’s update may be applied
more than once if tasks or job stages are re executed.

Even without this, it's possible for things to be executed many times because of a failure,
or just recomputing a partition, speculatively, or because some cache partition was evicted.

You might get better semantics by caching the result of map because then it's (almost) sure
to be computed only once. But this is probably the wrong usage of accumulators for these reasons
if an exact count is needed.

> Accumulator value is doubled when using DataFrame.orderBy()
> -----------------------------------------------------------
>                 Key: SPARK-17621
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler, SQL
>    Affects Versions: 2.0.0
>         Environment: Development environment. (Eclipse . Single process) 
>            Reporter: Sreelal S L
>            Priority: Minor
> We are tracing the records read by our source using an accumulator.  We do a orderBy
on the Dataframe before the output operation. When the job is completed, the accumulator values
is becoming double of the expected value . . 
> Below is the sample code i ran . 
> {code} 
>  val sqlContext = SparkSession.builder() 
>       .config("spark.sql.retainGroupColumns", false).config("spark.sql.warehouse.dir",
>       .getOrCreate()
>     val sc = sqlContext.sparkContext
>     val accumulator1 = sc.accumulator(0, "accumulator1")
>     val usersDF ="C:\\users.json") //  single row {"name":"sreelal"
>     val usersDFwithCount = => { accumulator1 += 1; x });
>     val counterDF = sqlContext.createDataFrame(usersDFwithCount, usersDF.schema);
>     val oderedDF = counterDF.orderBy("name")
>     val collected = oderedDF.collect()
>     collected.foreach { x => println(x) }
>     println("accumulator1 : " + accumulator1.value)
>     println("Done");
> {code}
> I have only one row in the users.json file.  I expect accumulator1 to have value 1. But
its coming as 2. 
> In the Spark Sql UI , i see two jobs getting generated for the same. 

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message