flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From simon peyer <simon.pe...@soom-it.ch>
Subject No key found restore States
Date Wed, 01 Jun 2016 13:06:05 GMT
Hi together

I did implement a little pipeline, which has some statefull computation:

Conntaing a function which extends RichFlatMapFunction and Checkpointed.

The state is stored in the field:

  private var state_item: ValueState[Option[Pathsection]] = null


  override def open(conf: Configuration):Unit = {
    log.info("Open a new Checkpointed FlatMap function with configuration: {}", conf)
    state_item = getRuntimeContext.getState(new ValueStateDescriptor("State of Pathsection",
(Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]], None))
  }


  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Option[Pathsection]
= {
    log.debug("Snapshote State with checkpointId: {} at Timestamp {}", checkpointId, checkpointTimestamp)
    removeOldEntries(checkpointTimestamp)
    state_item.value()
  }

  override def restoreState(state: Option[Pathsection]):Unit = {
    
    if (state == null){
      log.debug("Restore Snapshot: Null")
    }
    else if(state == None){
      log.debug("Restore Snapshot: None")
    }    
    else if (state_item == null){
      log.debug("State Item not initialized")
    }
    else{
      state_item.update(state)
    }
  }

But when I do run this computation and get the program to fail, I get the following error:


java.lang.Exception: Could not restore checkpointed state to operators and functions
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Failed to restore state to function: No key available.
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449)
	... 3 more
Caused by: java.lang.RuntimeException: No key available.
	at org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69)
	at ....Function which has the Checkpointed thingy (CheckpointedIncrAddrPositions.scala:68)

What am I missing?

Thanks 
Simon


Mime
View raw message