flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nico Kruber <n...@data-artisans.com>
Subject Re: data loss after implementing checkpoint
Date Mon, 31 Jul 2017 15:27:17 GMT
Hi Sridhar,
sorry for not coming back to you earlier and tbh, I'm no expert on this field 
either.

I don't see this enabling/disabling of rules in the CEP library overview at 
[1]. How do you do this?

You'll probably have to create a stateful operator [2] to store this state in 
Flink. Maybe Kostas (cc'd) may shed some more light onto this topic or has 
some other workaround.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/
cep.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Wednesday, 19 July 2017 06:38:17 CEST Sridhar Chellappa wrote:
> A follow up question on this. I have a Complex Event processor implemented
> using the CEP library (1.3.0). The CEP library runs a variety of rules that
> are configured (enable/disable rule) VIA REST APIs.
> 
> Now, if my application crashes and recovers (or is cancelled and
> restarted), will my configuration(as to which rules are enabled) still
> hold? or do I have to persist the info into a backend?
> 
> On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <nico@data-artisans.com> wrote:
> > Hi Aftab,
> > looks like what you want is either an externalized checkpoint with
> > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> > 
> > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > as a
> > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > that the job can be restarted automatically based on the restart policy.
> > 
> > 
> > Nico
> > 
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > checkpoints.html
> > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > savepoints.html
> > 
> > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > Hi,
> > > I am new to flink. I am facing issue implementing checkpoint.
> > > 
> > > checkpoint related code:
> > > 
> > > long checkpointInterval = 5000;
> > > 
> > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > >  //specify backend
> > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> > 
> > true));
> > 
> > > env.setStateBackend(new
> > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > > 
> > >  //enable checkpoint
> > >  env.enableCheckpointing(checkpointInterval,
> > > 
> > > CheckpointingMode.EXACTLY_ONCE);
> > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > > 
> > > 
> > > When I run the code, I can see flink-state being written in my local
> > > machine. but when I stop the job , wait for a few minutes and restart
> > > the
> > > job, it does not pick up from the time it left but it starts from when I
> > > started the job.
> > > 
> > > Could you point out what i am doing wrong. I am testing it locally from
> > > ideaIntellij. below is what i see from localhost. Any help would be
> > > appreciated. Thanks
> > > [image: Inline images 1]
> > > Br,


Mime
View raw message