flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From simon peyer <simon.pe...@soom-it.ch>
Subject Re: No key found restore States
Date Thu, 02 Jun 2016 09:40:46 GMT
Hi

In other words, what's the easiest way to clean up states in flink, if this key may never
arrive again?

--Thanks
Simon

> On 02 Jun 2016, at 10:16, simon peyer <simon.peyer@soom-it.ch> wrote:
> 
> Hi Max
> 
> Thanks for your answer.
> We have some states, on some keys, which we would like to delete after a certain time.
> And since there is no option at the moment to put an "expiriece" date on it, I just use
the snapshot function to test and verify if the current key is still in some threshold.
> 
> So I would like to have an option to perodically check the timestamp, and remove old
entries from the state.
> 
> Therefore I used the KeyedStream to key by a id.
> Can I use the internal function to do the snapshoting, but use the snapshot function
to do some cleanup on the states?
> 
> 
> Thanks
> Simon
> 
> 
>> On 01 Jun 2016, at 16:29, Maximilian Michels <mxm@apache.org> wrote:
>> 
>> Hi Simon,
>> 
>> You don't need to write any code to checkpoint the Keyed State. It is
>> done automatically by Flink. Just remove the `snapshoteState(..)` and
>> `restoreState(..)` methods.
>> 
>> Cheers,
>> Max
>> 
>> On Wed, Jun 1, 2016 at 4:00 PM, simon peyer <simon.peyer@soom-it.ch> wrote:
>>> Hi Max
>>> 
>>> I'm using a keyby but would like to store the state.
>>> 
>>> Thus what's the way to go?
>>> 
>>> How do I have to handle the state in option 2).
>>> 
>>> Could you give an example?
>>> 
>>> Thanks
>>> --Simon
>>> 
>>>> On 01 Jun 2016, at 15:55, Maximilian Michels <mxm@apache.org> wrote:
>>>> 
>>>> Hi Simon,
>>>> 
>>>> There are two types of state:
>>>> 
>>>> 
>>>> 1) Keyed State
>>>> 
>>>> The state you access via `getRuntimeContext().getState(..)` is scoped
>>>> by key. If no key is in the scope, the key is null and update
>>>> operations won't work. Use a `keyBy(..)` before your map function to
>>>> partition the state by key. The state is automatically checkpointed by
>>>> Flink.
>>>> 
>>>> 2) Operator State
>>>> 
>>>> This state is kept per parallel instance of the operator. You
>>>> implement the Checkpointed interface and use the `snapshotState(..)`
>>>> and `restoreState(..)` methods to checkpoint the state.
>>>> 
>>>> 
>>>> I think you want to use one of the two. Although it is possible to use
>>>> both, it looks like you're confusing the two in your example.
>>>> 
>>>> Cheers,
>>>> Max
>>>> 
>>>> On Wed, Jun 1, 2016 at 3:06 PM, simon peyer <simon.peyer@soom-it.ch>
wrote:
>>>>> Hi together
>>>>> 
>>>>> I did implement a little pipeline, which has some statefull computation:
>>>>> 
>>>>> Conntaing a function which extends RichFlatMapFunction and Checkpointed.
>>>>> 
>>>>> The state is stored in the field:
>>>>> 
>>>>> private var state_item: ValueState[Option[Pathsection]] = null
>>>>> 
>>>>> 
>>>>> override def open(conf: Configuration):Unit = {
>>>>>  log.info("Open a new Checkpointed FlatMap function with configuration:
>>>>> {}", conf)
>>>>>  state_item = getRuntimeContext.getState(new ValueStateDescriptor("State
>>>>> of Pathsection",
>>>>> (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]],
>>>>> None))
>>>>> }
>>>>> 
>>>>> 
>>>>> override def snapshotState(checkpointId: Long, checkpointTimestamp: Long):
>>>>> Option[Pathsection] = {
>>>>>  log.debug("Snapshote State with checkpointId: {} at Timestamp {}",
>>>>> checkpointId, checkpointTimestamp)
>>>>>  removeOldEntries(checkpointTimestamp)
>>>>>  state_item.value()
>>>>> }
>>>>> 
>>>>> override def restoreState(state: Option[Pathsection]):Unit = {
>>>>> 
>>>>> 
>>>>> 
>>>>>  if (state == null){
>>>>>    log.debug("Restore Snapshot: Null")
>>>>>  }
>>>>>  else if(state == None){
>>>>>    log.debug("Restore Snapshot: None")
>>>>>  }
>>>>>  else if (state_item == null){
>>>>>    log.debug("State Item not initialized")
>>>>>  }
>>>>>  else{
>>>>>    state_item.update(state)
>>>>>  }
>>>>> }
>>>>> 
>>>>> But when I do run this computation and get the program to fail, I get
the
>>>>> following error:
>>>>> 
>>>>> 
>>>>> java.lang.Exception: Could not restore checkpointed state to operators
and
>>>>> functions
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.Exception: Failed to restore state to function:
No key
>>>>> available.
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449)
>>>>> ... 3 more
>>>>> Caused by: java.lang.RuntimeException: No key available.
>>>>> at
>>>>> org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69)
>>>>> at ....Function which has the Checkpointed thingy
>>>>> (CheckpointedIncrAddrPositions.scala:68)
>>>>> 
>>>>> What am I missing?
>>>>> 
>>>>> Thanks
>>>>> Simon
>>>>> 
>>> 
> 


Mime
View raw message