flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4829) Accumulators are not thread safe
Date Mon, 17 Oct 2016 12:41:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582140#comment-15582140
] 

ASF GitHub Bot commented on FLINK-4829:
---------------------------------------

GitHub user mxm opened a pull request:

    https://github.com/apache/flink/pull/2649

    [FLINK-4829] snapshot accumulators on a best-effort basis

    Heartbeats should not fail when accumulators could not be snapshotted. Instead, we should
simply skip the reporting of the failed accumulator. Eventually, the accumulator will be reported;
at the latest, when the job finishes.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mxm/flink FLINK-4829

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2649
    
----
commit 2a424a6120b7f1be8f357522a3fe964e77bd4cce
Author: Maximilian Michels <mxm@apache.org>
Date:   2016-10-14T13:15:50Z

    [FLINK-4829] protect user accumulators against concurrent updates

commit f162142b1d274895e16712e42f0f32a43e187db9
Author: Maximilian Michels <mxm@apache.org>
Date:   2016-10-17T12:19:00Z

    [FLINK-4829] snapshot accumulators on a best-effort basis
    
    Heartbeats should not fail when accumulators could not be snapshotted. Instead,
    we should simply skip the reporting of the failed accumulator. Eventually, the
    accumulator will be reported; at the latest, when the job finishes.

----


> Accumulators are not thread safe
> --------------------------------
>
>                 Key: FLINK-4829
>                 URL: https://issues.apache.org/jira/browse/FLINK-4829
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Till Rohrmann
>             Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live accumulator
snapshots which are sent to the {{JobManager}}, we've introduced a concurrent access to accumulators
without properly guard them against concurrent modifications. So if an accumulator snapshot
is taken for an accumulator which is at the same time modified, it can cause an {{ConcurrentModificationException}}
as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     - Failed to serialize
accumulators for task.
> java.util.ConcurrentModificationException
>         at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
>         at java.util.TreeMap.writeObject(TreeMap.java:2436)
>         at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
>         at java.util.HashMap.writeObject(HashMap.java:1362)
>         at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
>         at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
>         at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
>         at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message