flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Periodically evicting internal states when using mapWithState()
Date Tue, 07 Jun 2016 09:05:08 GMT
Hi Jack,
right now this is not possible except when writing a custom operator. We
are working on support for a time-to-live setting on states, this should
solve your problem.

For writing a custom operator, check out DataStream.transform() and
StreamMap, which is the operator implementation for Map. Please let me know
if you have any further questions.


On Tue, 7 Jun 2016 at 03:05 Jack Huang <jackhuang@mz.com> wrote:

> Hi all,
> I have an incoming stream of event objects, each with its session ID. I am
> writing a task that aggregate the events by session. The general logics
> looks like
> case class Event(sessionId:Int, data:String)case class Session(id:Int, var events:List[Event])
> val events = ... //some source
> events.
> .keyBy((event:Event) => event.sessionId)
> .mapWithState((event:Event, state:Option[Session]) => {
>     val session = state.getOrElse(Session(id=event.session_id, events=List()))
>     session.event = session.event :+ event
>     (session, Some(session))
> })
> The problem is that there is no reliable way of knowing the end of a
> session, since events are likely to get lost. If I keep this process
> running, the number of stored sessions will keep growing until it fills up
> the disk.
> Is there a recommended way of periodically evicting sessions that are too
> old (e.g. a day old)?
> Thanks,
> Jack

View raw message