flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4829) Accumulators are not thread safe
Date Fri, 14 Oct 2016 13:26:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575329#comment-15575329
] 

ASF GitHub Bot commented on FLINK-4829:
---------------------------------------

GitHub user mxm opened a pull request:

    https://github.com/apache/flink/pull/2634

    [FLINK-4829] protect user accumulators against concurrent updates

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mxm/flink FLINK-4829

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2634.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2634
    
----
commit 922e765f7a8f6870b79c6abb65d6ea681901e80a
Author: Maximilian Michels <mxm@apache.org>
Date:   2016-10-14T13:15:50Z

    [FLINK-4829] protect user accumulators against concurrent updates

----


> Accumulators are not thread safe
> --------------------------------
>
>                 Key: FLINK-4829
>                 URL: https://issues.apache.org/jira/browse/FLINK-4829
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 1.2.0
>            Reporter: Till Rohrmann
>            Assignee: Maximilian Michels
>             Fix For: 1.2.0
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live accumulator
snapshots which are sent to the {{JobManager}}, we've introduced a concurrent access to accumulators
without properly guard them against concurrent modifications. So if an accumulator snapshot
is taken for an accumulator which is at the same time modified, it can cause an {{ConcurrentModificationException}}
as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     - Failed to serialize
accumulators for task.
> java.util.ConcurrentModificationException
>         at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
>         at java.util.TreeMap.writeObject(TreeMap.java:2436)
>         at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
>         at java.util.HashMap.writeObject(HashMap.java:1362)
>         at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
>         at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
>         at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
>         at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message