flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maximilian Michels (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-12653) Keyed state backend fails to restore during rescaling
Date Tue, 28 May 2019 14:34:00 GMT
Maximilian Michels created FLINK-12653:
------------------------------------------

             Summary: Keyed state backend fails to restore during rescaling
                 Key: FLINK-12653
                 URL: https://issues.apache.org/jira/browse/FLINK-12653
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 1.8.0, 1.7.2, 1.6.4
         Environment: Beam 2.12.0 or any other Beam version
Flink >= 1.6
            Reporter: Maximilian Michels


The Flink Runner includes a test which verifies checkpoints/savepoints work correctly with
Beam on Flink. When adding additional tests for scaleup/scaledown [1], I came across a bug
with restoring the keyed state backend. After a fair amount of debugging Beam code and checking
any potential issues with serializers, I think this could be a Flink issue.

Steps to reproduce: 

1. {{git clone https://github.com/mxm/beam}}
2. {{cd beam && git checkout savepoint-problem}}
3. {{./gradlew :runners:flink:1.6:test --tests "**.FlinkSavepointTest.testSavepointRestoreLegacy"}}

Error:

{noformat}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for
DoFnOperator_76375152c4a81d5df72cf49e32c4ecb9_(4/4) from any of the 1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
	... 5 more
Caused by: java.lang.RuntimeException: Invalid namespace string: ''
	at org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:245)
	at org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246)
	at org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221)
	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
	at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:169)
	at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
	at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:513)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:474)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:431)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:370)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
	... 7 more
{noformat}

It is possible to change the {{maxParallelism}} to other values. The following lead to failure:
{noformat}
   options.setMaxParallelism(128); // default value
   options.setMaxParallelism(64);
    options.setMaxParallelism(118);
{noformat}

The following work fine:
{noformat}
    options.setMaxParallelism(110);
    options.setMaxParallelism(63);
    options.setMaxParallelism(24);
{noformat}

[1] https://github.com/apache/beam/commit/52d7291144f64eaa417862558d71a443fae3d690

Everything works fine with RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message