flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jark Wu (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-17918) Blink Jobs are loosing data on recovery
Date Tue, 02 Jun 2020 03:24:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-17918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121526#comment-17121526
] 

Jark Wu edited comment on FLINK-17918 at 6/2/20, 3:23 AM:
----------------------------------------------------------

Thanks [~pnowojski] and [~AHeise] for the investigating. After discussing with [~yunta], [~yunta]
pointed that user should take care of deep copying when using heap statebackend, the Javadoc
of CopyOnWriteStateMap says:

{quote}
IMPORTANT: the contracts for this class rely on the user not holding any references to objects
returned by this map
beyond the life cycle of per-element operations. Or phrased differently, all get-update-put
operations on a mapping
should be within one call of processElement. Otherwise, the user must take care of taking
deep copies, e.g. for
caching purposes.
{quote}

However, I didn't find the note on the documentation. If this is true, I think we should update
documentation and update the {{AppendOnlyTopNFunction}} to put the copied list into {{dataState}}
(and this will bring additional cost for RocksDB statebackend). We should also go through
all the other operators.

Besides of this, I can also reproduce the failure of {{AggregateITCase#testDifferentTypesSumWithRetract}}
with [~AHeise]'s commit. But I think this maybe another issue, because the simplest case "[LocalGlobal=OFF,
MiniBatch=OFF, StateBackend=HEAP]" which uses {{GroupAggFunction}} will also fail. In the
{{GroupAggFunction}}, all the {{accumulators}} of [{{accState.update(accumulators)}}|https://github.com/apache/flink/blob/4ff59a72b4c8532510cca349840fcbe668de911e/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L179]
is always a new object.



was (Author: jark):
Thanks [~pnowojski] and [~AHeise] for the investigating. After discussing with [~yunta], [~yunta]
pointed that user should take care of deep copying when using heap statebackend, the Javadoc
of CopyOnWriteStateMap says:

{quote}
IMPORTANT: the contracts for this class rely on the user not holding any references to objects
returned by this map
beyond the life cycle of per-element operations. Or phrased differently, all get-update-put
operations on a mapping
should be within one call of processElement. Otherwise, the user must take care of taking
deep copies, e.g. for
caching purposes.
{quote}

However, I didn't find the note on the documentation. If this is true, I think we should update
documentation and update the {{AppendOnlyTopNFunction}} to put the copied list into {{dataState}}
(and this will bring additional cost for RocksDB statebackend). We should also go through
all the other operators.

Besides of this, I can also reproduce the failure for {{AggregateITCase#testDifferentTypesSumWithRetract}}
with [~AHeise]'s commit. But I think this maybe another issue, because the simplest case "[LocalGlobal=OFF,
MiniBatch=OFF, StateBackend=HEAP]" which uses {{GroupAggFunction}} will also fail. In the
{{GroupAggFunction}}, all the {{accumulators}} of [{{accState.update(accumulators)}}|https://github.com/apache/flink/blob/4ff59a72b4c8532510cca349840fcbe668de911e/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L179]
is always a new object.


> Blink Jobs are loosing data on recovery
> ---------------------------------------
>
>                 Key: FLINK-17918
>                 URL: https://issues.apache.org/jira/browse/FLINK-17918
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Table SQL / Runtime
>    Affects Versions: 1.11.0
>            Reporter: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> After trying to enable unaligned checkpoints by default, a lot of Blink streaming SQL/Table
API tests containing joins or set operations are throwing errors that are indicating we are
loosing some data (full records, without deserialisation errors). Example errors:
> {noformat}
> [ERROR] Failures: 
> [ERROR]   JoinITCase.testFullJoinWithEqualPk:775 expected:<List(1,1, 2,2, 3,3, null,4,
null,5)> but was:<List(2,2, 3,3, null,1, null,4, null,5)>
> [ERROR]   JoinITCase.testStreamJoinWithSameRecord:391 expected:<List(1,1,1,1, 1,1,1,1,
2,2,2,2, 2,2,2,2, 3,3,3,3, 3,3,3,3, 4,4,4,4, 4,4,4,4, 5,5,5,5, 5,5,5,5)> but was:<List()>
> [ERROR]   SemiAntiJoinStreamITCase.testAntiJoin:352 expected:<0> but was:<1>
> [ERROR]   SetOperatorsITCase.testIntersect:55 expected:<MutableList(1,1,Hi, 2,2,Hello,
3,2,Hello world)> but was:<List()>
> [ERROR]   JoinITCase.testJoinPushThroughJoin:1272 expected:<List(1,0,Hi, 2,1,Hello,
2,1,Hello world)> but was:<List(2,1,Hello, 2,1,Hello world)>
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message