flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: CaseClassSerializer and/or TraversableSerializer may still not be threadsafe?
Date Tue, 17 Apr 2018 18:35:56 GMT
Thanks for reporting this, also thanks for checking out that this works
with RocksDB and also with synchronous checkpoints.

I would assume that this issue lies not in the serializer itself, but in
accidental sharing in the FsStateBackend async snapshots.
Do you know if the issue still exists in Flink 1.4.2?

On Tue, Apr 17, 2018 at 6:14 PM, joshlemer <joshlemer@gmail.com> wrote:

> Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that
> asynchronous
> snapshots fail when using the Filesystem back-end. Synchronous snapshots
> succeed, and RocksDB snapshots succeed (both async and sync), but async
> Filesystem snapshots fail with this error:
>
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>         at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>         at java.util.ArrayList.set(ArrayList.java:448)
>         at
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(
> MapReferenceResolver.java:56)
>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(
> KryoSerializer.java:189)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:101)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:32)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$
> anonfun$copy$1.apply(TraversableSerializer.scala:69)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$
> anonfun$copy$1.apply(TraversableSerializer.scala:69)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(
> TraversableSerializer.scala:69)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(
> TraversableSerializer.scala:33)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
> CopyOnWriteStateTable.java:282)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
> CopyOnWriteStateTable.java:306)
>         at
> org.apache.flink.runtime.state.heap.HeapValueState.
> value(HeapValueState.java:55)
>         at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentA
> ssignments.enqueueSegmentAssignment(JoinSegmentMappingWithSegmentA
> ssignments.scala:102)
>         at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentA
> ssignments.processElement2(JoinSegmentMappingWithSegmentA
> ssignments.scala:218)
>         at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentA
> ssignments.processElement2(JoinSegmentMappingWithSegmentA
> ssignments.scala:76)
>         at
> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.
> processElement2(KeyedCoProcessOperator.java:86)
>         at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.
> processInput(StreamTwoInputProcessor.java:270)
>         at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(
> TwoInputStreamTask.java:91)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:264)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
>
> This stack trace occurs when I am trying to access the value of a
>
> `ValueState[scala.collection.mutable.PriorityQueue[(
> AJavaObjectThatUsesTwitterChillProtoSerialization,
> Long, scala.collection.mutable.BitSet)]` while a checkpoint is going on.
>
> I have found similar errors occurring in already-fixed tickets like this
> one:
> https://issues.apache.org/jira/browse/FLINK-7484
> which is part of this umbrella issue:
> https://issues.apache.org/jira/browse/FLINK-7830
>
> However these tickets are apparently resolved, maybe the bug has not been
> completely fixed? Or maybe I am making a mistake in programming? When I get
> the value of the state, I do mutate it, and I also mutate the
> mutable.BitSet
> before persisting again. But as far as I know this is perfectly ok by flink
> yes?
>
> Thanks for any help or pointers!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Mime
View raw message