flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Flink Jira Bot (Jira)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-18503) bug occurs when `HeapReducingState#add` method handles null
Date Sat, 22 May 2021 22:53:02 GMT

     [ https://issues.apache.org/jira/browse/FLINK-18503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Flink Jira Bot updated FLINK-18503:
-----------------------------------
      Labels: auto-deprioritized-major  (was: stale-major)
    Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so it is being
deprioritized. If this ticket is actually Major, please raise the priority and ask a committer
to assign you the issue or revive the public discussion.


> bug occurs when `HeapReducingState#add` method handles null
> -----------------------------------------------------------
>
>                 Key: FLINK-18503
>                 URL: https://issues.apache.org/jira/browse/FLINK-18503
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.12.0
>            Reporter: fanrui
>            Priority: Minor
>              Labels: auto-deprioritized-major
>             Fix For: 1.14.0
>
>         Attachments: image-2020-07-07-02-20-03-420.png, image-2020-07-07-02-20-57-299.png
>
>
> In our production environment, there are advertising billing jobs, which are keyBy according
to advertiserId. Calculate the cost of each advertiser in the specified window, that is, use
ReducingFunction to sum the price after keyBy by advertiser. 
> But it is found that the results calculated using FsStateBackend and RocksDBStateBackend
are different. The calculation result of FsStateBackend is wrong, and the calculation result
of RocksDBStateBackend is correct.
> After reading the source code, HeapReducingState#add code:[code link|https://github.com/apache/flink/blob/f730e16fb47b0fcace7d3a1a8c8e3cb2c837ceec/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java#L93]
>  
> {code:java}
> public void add(V value) throws IOException {
>    if (value == null) {
>       clear();
>       return;
>    }
>    try {
>       stateTable.transform(currentNamespace, value, reduceTransformation);
>    } catch (Exception e) {
>       throw new IOException("Exception while applying ReduceFunction in reducing state",
e);
>    }
> }
> {code}
> If value==null, the clear method deletes the data of the current <key,namespace>
from the StateTable. ReducingFunction will only be executed if value!=null.
> h2. Why is there a bug?
> For a job that calculates cost, if price != null, the price is added to result; if price
== null, then result is unchanged. 
> The ResultFunction method handles the case of price == null, our ResultFunction is as
follows:
>  
> {code:java}
> ReduceFunction<Long> sumFunction = new ReduceFunction<Long>() {
>    @Override
>    public Long reduce(Long previousState, Long newValue) throws Exception {
>       // if newValue ==null,
>       // consider newValue to be 0 and return previousState directly
>       if (newValue == null) {
>          return previousState;
>       }
>       return previousState + newValue;
>    }
> };
> {code}
>  
> However, when HeapReducingState#add finds that the input value == null, it directly executes
the clear method, and does not execute the user-defined ResultFunction at all.
> For example: if the input prices are 17, null, and 11, the price saved in the state is
17 when you enter 17, the price is cleared when you enter null, and the price is 11 when you
enter 11, so the result is wrong.
> Fortunately, the calculation result of RocksDBStateBackend is correct. The RocksDBReducingState#add
method does not perform special treatment for null. RocksDBReducingState#add code is as follows:[code
link|https://github.com/apache/flink/blob/f730e16fb47b0fcace7d3a1a8c8e3cb2c837ceec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java#L92]
>  
> {code:java}
> public void add(V value) throws Exception {
>    byte[] key = getKeyBytes();
>    V oldValue = getInternal(key);
>    V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, value);
>    updateInternal(key, newValue);
> }
> {code}
> h2. Flink UT can reproduce this bug
> StateBackendTestBase#testReducingStateAddAndGet can reproduce this bug.
> Need to be modified as follows:
>  # udf
> {code:java}
> ReduceFunction<Long> sumFunction = new ReduceFunction<Long>() {
>    @Override
>    public Long reduce(Long previousState, Long newValue) throws Exception {
>       // if newValue ==null,
>       // consider newValue to be 0 and return previousState directly
>       if (newValue == null) {
>          return previousState;
>       }
>       return previousState + newValue;
>    }
> };
> final ReducingStateDescriptor<Long> stateDescr =
>    new ReducingStateDescriptor<>("my-state", sumFunction, Long.class);{code}
>  # add element
> {code:java}
> keyedBackend.setCurrentKey("def");
> assertNull(state.get());
> state.add(17L);
> state.add(null);//new code
> state.add(11L);
> assertEquals(28L, state.get().longValue());{code}
> My code repository commit [link|https://github.com/1996fanrui/flink/commit/645118dd2f95de88580d07e00d88e8783a0f9680]
>  The UT execution output of RocksDBStateBackendTest is as follows:
> !image-2020-07-07-02-20-03-420.png!
>  
>  The UT execution output of FileStateBackendTest&MemoryStateBackendTest is as follows:
> !image-2020-07-07-02-20-57-299.png!
> {code:java}
> java.lang.AssertionError: 
> Expected :28
> Actual   :11{code}
> The above phenomenon shows that the HeapReducingState#add method has a bug. Regardless
of which state backend you choose, the semantics provided by the Flink engine should be consistent
and should not output different calculation results.
> h2. My solution
> Remove the processing logic of value == null in HeapReducingState#add. Result: All UTs
of FileStateBackendTest can be passed.
> h2. Similar bug
> HeapFoldingState#add & HeapAggregatingState#add
> h2. Question
> HeapReducingState#add When designing, why does the designer handle the null case specially?
I think the case of null should be handled by the user-defined ReducingFunction.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message