flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Darcy Lin (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
Date Wed, 01 Jan 2020 10:13:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006361#comment-17006361
] 

Darcy Lin edited comment on FLINK-15406 at 1/1/20 10:12 AM:
------------------------------------------------------------

Hi, [~klion26] 

[^CountWord.java] This is the demo I written.
{code:java}
flink run CountWord.jar --init
{code}
if you run above command, you can generate a savepoint located "file:///tmp/flink/savepoint".Then
you need run following commands to reprocude this problem.
{code:java}
nc -lk 12345 // listen in port 12345
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream   // will checkpoint fail
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream1   // will checkpoint success
{code}
 


was (Author: lintingbin):
Hi, [~klion26] 

[^CountWord.java] This is the demo I written. 

{code:java}
flink run CountWord.jar --init
{code}
if you run above command, you can generate a savepoint located "file:///tmp/flink/savepoint".Then
you need run following commands to reprocude this problem.

{code:java}
nc -lk 12345 // listen in port 12345
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream   // will checkpoint fail
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream   // will checkpoint success
{code}
 

> The savepoint is writted by "State Processor API" can't be restore by map or flatmap
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-15406
>                 URL: https://issues.apache.org/jira/browse/FLINK-15406
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.9.1
>            Reporter: Darcy Lin
>            Priority: Major
>         Attachments: CountWord.java
>
>
> The savepoint is writted by "State Processor API" can't be restore by map or flatmap.
But it can be retored by KeyedProcessFunction.  
>  Following is the error message:
> {code:java}
> java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink:
device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: Could not write
timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint
state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.streaming.api.operators.InternalTimersSnapshot.<init>(InternalTimersSnapshot.java:52)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
... 19 more{code}
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message