spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sunil Rangwani (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-17463) Serialization of accumulators in heartbeats is not thread-safe
Date Sun, 01 Jan 2017 18:20:58 GMT

    [ https://issues.apache.org/jira/browse/SPARK-17463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15791469#comment-15791469
] 

Sunil Rangwani commented on SPARK-17463:
----------------------------------------

Hi [~zsxwing]

Below is how I am using the Collection accumulator. 

{code}
// val spark: SparkSession
// val dataFrame: DataFrame
val updatedRecordKeysAcc = spark.sparkContext.collectionAccumulator[String]

dataFrame.toJSON.foreach(processRecord(_, updatedRecordKeysAcc))

import collection.JavaConverters._
val updatedRecordKeys: String = updatedRecordKeysAcc.value.asScala.mkString("\n")
// write `updatedRecordKeys` to an S3 file using aws-sdk... 

def processRecord(recordJSON: String, updatedRecordKeysAcc: CollectionAccumulator) {
	// do Stuff with recordJSON... 
	// val recordKey: String = getKeyFromJSON(recordJSON)
	updatedRecordKeysAcc.add(recordKey)
}

{code}

It works when I am working with smaller datasets of 10s of 1000s but when I try to run it
with a dataset of 10s of millions, it fails with the exception above. I tried to scale up
the cluster to 5 to 6 nodes but it still gives the same error and the cluster remains underutilized.
 
It also works alright when I comment out the line {noformat}updatedRecordKeysAcc.add(recordKey){noformat}

> Serialization of accumulators in heartbeats is not thread-safe
> --------------------------------------------------------------
>
>                 Key: SPARK-17463
>                 URL: https://issues.apache.org/jira/browse/SPARK-17463
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Josh Rosen
>            Assignee: Shixiong Zhu
>            Priority: Critical
>             Fix For: 2.0.1, 2.1.0
>
>
> Check out the following {{ConcurrentModificationException}}:
> {code}
> 16/09/06 16:10:29 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(2,[Lscala.Tuple2;@66e7b6e7,BlockManagerId(2,
HOST, 57743))] in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>     at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1862)
>     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.ArrayList.writeObject(ArrayList.java:766)
>     at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
>     at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
>     ... 13 more
> {code}
> Even though accumulators aren't thread-safe they can be concurrently read while serializing
executor heartbeats and modified while tasks are running, leading to ConcurrentModificationException
errors (thereby leading to missing heartbeats) or leading to inconsistent data (since individual
fields of a multi-field object might be serialized at different points in time, leading to
inconsistencies in accumulators like LongAccum).
> This seems like a pretty serious issue but I'm not sure what's the best way to fix this.
An obvious fix would be to properly synchronize all accesses to the fields of our accumulators
and to synchronize the writeObject and writeKryo methods, but this may have an adverse performance
impact



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message