samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Bernhardt <bernhardt...@yahoo.com.INVALID>
Subject Re: More control over checkpoints
Date Fri, 06 Feb 2015 17:07:20 GMT
Thank you, that helps in respect to checkpointing/writing.I'm not clear yet how recovery or
restart would work. It would need to read the offsets from the external state and provide
these offsets to Kafka readers. I don't see how InitableTask can return or set the offsets?

Thank you again, Best regards,Tom      From: Chris Riccomini <criccomini@apache.org>
 To: Chris Riccomini <criccomini@apache.org> 
Cc: dev@samza.apache.org; Thomas Bernhardt <bernhardttom@yahoo.com> 
 Sent: Friday, February 6, 2015 11:47 AM
 Subject: Re: More control over checkpoints
   
Hey Tom,

> I'm planning to have my external state also keep the topic+partition
offsets in the same place that holds the aggregation state.

Also, for this, you'll have to keep track of the SSP->offset mapping in
your task. This can be done with a Map<SystemStreamPartition, String>().
Every time an IncomingMessageEnvelope comes in, you can update this map
(getSystemStreamPartition, getOffset). When you go to checkpoint your
external state, you'll have the latest offset for each SSP that you've
processed, and can attach that to your external state.

Cheers,
Chris



On Fri, Feb 6, 2015 at 8:45 AM, Chris Riccomini <criccomini@apache.org>
wrote:

> Hey Tom,
>
> > questions:Can my StreamTask receive a call when a checkpoint takes
> place so that it can write its state?
>
> The pattern for this is to take control of the checkpointing in your task,
> rather than using task.commit.ms. If you:
>
> # Set task.commit.ms=-1
> # Set task.window.ms=60000
> # Implement WindowableTask
>
> Your task will have a window() method that receives a TaskCoordinator. You
> can use the TaskCoordinator to call commit(). In this way, you have full
> control over when your task commits, and can manage the committing of your
> state in conjunction with that. For example, you could do:
>
> window(...) {
>  state.commit()
>  coordinator.commit()
> }
>
> This would guarantee that you commit all external state before committing
> your checkpoints (guaranteeing that you never lose data).
>
> Cheers,
> Chris
>
> On Fri, Feb 6, 2015 at 7:11 AM, Thomas Bernhardt <
> bernhardttom@yahoo.com.invalid> wrote:
>
>> I would like to aggregate in-memory and when a checkpoint takes place,
>> write the aggregation state someplace. I therefore have a few questions:Can
>> my StreamTask receive a call when a checkpoint takes place so that it can
>> write its state?Or can the CheckpointManager receive the task for which to
>> checkpoint and the CheckpointManager can write the StreamTask state?I'm
>> planning to have my external state also keep the topic+partition offsets in
>> the same place that holds the aggregation state.
>>
>> Thank you, Best regards,Tom
>>
>>
>>
>


  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message