spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shixiong Zhu (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-17463) Serialization of accumulators in heartbeats is not thread-safe
Date Mon, 12 Sep 2016 17:33:20 GMT

    [ https://issues.apache.org/jira/browse/SPARK-17463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484720#comment-15484720
] 

Shixiong Zhu commented on SPARK-17463:
--------------------------------------

[~joshrosen] I think we can just leave LongAccum as it is. It's no worse than Spark 1.6. In
Spark 1.6, `sum` and `count` are different accumulators and have the same inconsistent issue.

We definitely should fix CollectionAccumulator and SetAccumulator. I will submit a PR to add
the necessary `synchronized` for them.

By the way, I didn't notice that AccumulatorV2 sends the whole object back to the driver.
Do you know any special reason? I remember previously we only send the values of accumulators
back to driver.

> Serialization of accumulators in heartbeats is not thread-safe
> --------------------------------------------------------------
>
>                 Key: SPARK-17463
>                 URL: https://issues.apache.org/jira/browse/SPARK-17463
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Josh Rosen
>            Priority: Critical
>
> Check out the following {{ConcurrentModificationException}}:
> {code}
> 16/09/06 16:10:29 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(2,[Lscala.Tuple2;@66e7b6e7,BlockManagerId(2,
HOST, 57743))] in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>     at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1862)
>     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.ArrayList.writeObject(ArrayList.java:766)
>     at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
>     at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
>     ... 13 more
> {code}
> Even though accumulators aren't thread-safe they can be concurrently read while serializing
executor heartbeats and modified while tasks are running, leading to ConcurrentModificationException
errors (thereby leading to missing heartbeats) or leading to inconsistent data (since individual
fields of a multi-field object might be serialized at different points in time, leading to
inconsistencies in accumulators like LongAccum).
> This seems like a pretty serious issue but I'm not sure what's the best way to fix this.
An obvious fix would be to properly synchronize all accesses to the fields of our accumulators
and to synchronize the writeObject and writeKryo methods, but this may have an adverse performance
impact



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message