samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@apache.org>
Subject Re: More control over checkpoints
Date Fri, 06 Feb 2015 20:28:50 GMT
Hey Thomas,

Ah, I see what you're saying. You want to store your offsets and your state
remotely in an atomic way.

The only way to set offsets right now is to use (or implement) a
CheckpointManager. One hacky solution would be to implement your own
CheckpointManager, and read the offsets from your remote store when
readLastCheckpoint is called.

Let me think about this a bit more. Could you elaborate specifically on
what you're trying to do? Part of Samza's goal is to keep state local, and
replicate it out through a Kafka changelog. I'm wondering if there might be
an alternative implementation that fits better with Samza's design, but
it's a bit hard to figure out unless we can know a bit more about what
you're doing.

Cheers,
Chris

On Fri, Feb 6, 2015 at 9:07 AM, Thomas Bernhardt <
bernhardttom@yahoo.com.invalid> wrote:

> Thank you, that helps in respect to checkpointing/writing.I'm not clear
> yet how recovery or restart would work. It would need to read the offsets
> from the external state and provide these offsets to Kafka readers. I don't
> see how InitableTask can return or set the offsets?
>
> Thank you again, Best regards,Tom      From: Chris Riccomini <
> criccomini@apache.org>
>  To: Chris Riccomini <criccomini@apache.org>
> Cc: dev@samza.apache.org; Thomas Bernhardt <bernhardttom@yahoo.com>
>  Sent: Friday, February 6, 2015 11:47 AM
>  Subject: Re: More control over checkpoints
>
> Hey Tom,
>
> > I'm planning to have my external state also keep the topic+partition
> offsets in the same place that holds the aggregation state.
>
> Also, for this, you'll have to keep track of the SSP->offset mapping in
> your task. This can be done with a Map<SystemStreamPartition, String>().
> Every time an IncomingMessageEnvelope comes in, you can update this map
> (getSystemStreamPartition, getOffset). When you go to checkpoint your
> external state, you'll have the latest offset for each SSP that you've
> processed, and can attach that to your external state.
>
> Cheers,
> Chris
>
>
>
> On Fri, Feb 6, 2015 at 8:45 AM, Chris Riccomini <criccomini@apache.org>
> wrote:
>
> > Hey Tom,
> >
> > > questions:Can my StreamTask receive a call when a checkpoint takes
> > place so that it can write its state?
> >
> > The pattern for this is to take control of the checkpointing in your
> task,
> > rather than using task.commit.ms. If you:
> >
> > # Set task.commit.ms=-1
> > # Set task.window.ms=60000
> > # Implement WindowableTask
> >
> > Your task will have a window() method that receives a TaskCoordinator.
> You
> > can use the TaskCoordinator to call commit(). In this way, you have full
> > control over when your task commits, and can manage the committing of
> your
> > state in conjunction with that. For example, you could do:
> >
> > window(...) {
> >  state.commit()
> >  coordinator.commit()
> > }
> >
> > This would guarantee that you commit all external state before committing
> > your checkpoints (guaranteeing that you never lose data).
> >
> > Cheers,
> > Chris
> >
> > On Fri, Feb 6, 2015 at 7:11 AM, Thomas Bernhardt <
> > bernhardttom@yahoo.com.invalid> wrote:
> >
> >> I would like to aggregate in-memory and when a checkpoint takes place,
> >> write the aggregation state someplace. I therefore have a few
> questions:Can
> >> my StreamTask receive a call when a checkpoint takes place so that it
> can
> >> write its state?Or can the CheckpointManager receive the task for which
> to
> >> checkpoint and the CheckpointManager can write the StreamTask state?I'm
> >> planning to have my external state also keep the topic+partition
> offsets in
> >> the same place that holds the aggregation state.
> >>
> >> Thank you, Best regards,Tom
> >>
> >>
> >>
> >
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message