flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Rohrmann (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-4829) Accumulators are not thread safe
Date Fri, 14 Oct 2016 09:56:21 GMT
Till Rohrmann created FLINK-4829:
------------------------------------

             Summary: Accumulators are not thread safe
                 Key: FLINK-4829
                 URL: https://issues.apache.org/jira/browse/FLINK-4829
             Project: Flink
          Issue Type: Bug
          Components: Local Runtime
    Affects Versions: 1.2.0
            Reporter: Till Rohrmann
             Fix For: 1.2.0


Flink's {{Accumulators}} are not thread safe. With the introduction of live accumulator snapshots
which are sent to the {{JobManager}}, we've introduced a concurrent access to accumulators
without properly guard them against concurrent modifications. So if an accumulator snapshot
is taken for an accumulator which is at the same time modified, it can cause an {{ConcurrentModificationException}}
as it was reported by an user: 

{code}
WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     - Failed to serialize
accumulators for task.
java.util.ConcurrentModificationException
        at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
        at java.util.TreeMap.writeObject(TreeMap.java:2436)
        at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
        at java.util.HashMap.writeObject(HashMap.java:1362)
        at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
        at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
        at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
        at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message