flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maximilian Michels (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4829) Accumulators are not thread safe
Date Tue, 18 Oct 2016 14:06:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585534#comment-15585534
] 

Maximilian Michels commented on FLINK-4829:
-------------------------------------------

To mitigate the problem, we have best-effort reporting for accumulators now.

master: 783dca56eedc95f0a8974a9b50f2b532ca8cf849, d95929e0110b53f03452e1ad453de2522f79a6b8
release-1.1: c1d6b24600e40700fa06caa28bc81788d8e92386, 210230c4ab44b84c28b9a62ff461de0955e67f8f

> Accumulators are not thread safe
> --------------------------------
>
>                 Key: FLINK-4829
>                 URL: https://issues.apache.org/jira/browse/FLINK-4829
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Till Rohrmann
>             Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live accumulator
snapshots which are sent to the {{JobManager}}, we've introduced a concurrent access to accumulators
without properly guard them against concurrent modifications. So if an accumulator snapshot
is taken for an accumulator which is at the same time modified, it can cause an {{ConcurrentModificationException}}
as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     - Failed to serialize
accumulators for task.
> java.util.ConcurrentModificationException
>         at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
>         at java.util.TreeMap.writeObject(TreeMap.java:2436)
>         at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
>         at java.util.HashMap.writeObject(HashMap.java:1362)
>         at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
>         at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
>         at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
>         at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message