flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
Date Thu, 23 Jun 2016 09:20:16 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346141#comment-15346141

ASF GitHub Bot commented on FLINK-3974:

Github user aljoscha commented on the issue:

    Thanks for the thorough review, @tillrohrmann!
    Your points are valid, maybe I'll have to change this PR but let me first explain my reasoning.
    The shallow copy is performed in the one place that all code paths have to go through
because it is the point right before control is passed to the operator. Putting it in different
place would mean placing it in `BroadcastingOutputCollector`, as you mentioned, as well as
in https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java,
which is used when the user does a split()/select() operation (`DataStream.split()`). The
number of places where we have to put this might evolve in the future.
    Also, putting it in `BroadcastingOutputCollector` and `DirectedOutput` would mean that
we always do two copies per record for the common case of having object-copying enabled (which
is the default).
    About the ITCase. I also don't like having that in there because we are approaching the
2h mark on Travis but I think in this case it's valid. This test really verifies that the
whole system works correctly when the user uses a certain feature (I would also add a test
for split()/select() now that I thought about it). 

> enableObjectReuse fails when an operator chains to multiple downstream operators
> --------------------------------------------------------------------------------
>                 Key: FLINK-3974
>                 URL: https://issues.apache.org/jira/browse/FLINK-3974
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.0.3
>            Reporter: B Wyatt
>         Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
> Given a topology that looks like this:
> {code:java}
> DataStream<A> input = ...
> input
>     .map(MapFunction<A,B>...)
>     .addSink(...);
> input
>     .map(MapFunction<A,C>...)
>     ‚Äč.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of {{"java.lang.ClassCastException:
B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output<StreamRecord<A>>.collect}}
which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which mutates
the value stored in the StreamRecord<>.  
> As a result, when the {{Output<StreamRecord<A>>.collect}} call passes the
{{StreamRecord<A>}} to the second map operation it is actually a {{StreamRecord<B>}}
and behaves as if the two map operations were serial instead of parallel.

This message was sent by Atlassian JIRA

View raw message