samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gaurav Agarwal <gauravagarw...@gmail.com>
Subject Re: Per task/topic checkpoint?
Date Wed, 01 Nov 2017 16:07:03 GMT
Thanks Jake.

In this application we control the checkpointing explicitly to accumulate
certain amount of delta in memory before committing them to stores, and
checkpointing. This is to reduce the commit counts and some other business
case like deduplication of deltas.

The scenario that I was trying to fit in is - to have two different
checkpointing frequencies for different topics(with different processing
cost semantics) - say, for metric messages I wanted to create inmemory
batches of 15 minutes of data before writing it out and checkpointing
(reason being that metric sdms are cheap to redo and larger batch result in
much better compression - Prometheus kind of compression techniques),
whereas configuration messages are much more expensive to process and so I
want to commit any changes due to these more frequently - say every 2 mins
and checkpointing the associated topic.

On Wed, 1 Nov 2017 at 9:11 PM, Jacob Maes <jacob.maes@gmail.com> wrote:

> Hey Gaurav,
>
> Samza automatically keeps track of the offsets your job has successfully
> processed for each SSP. When your task requests a checkpoint, Samza will
> write the offset of the latest successfully-processed message for each SSP
> that task consumes.
>
> So if task0 consumes partition 0 of two topics (configuration, metrics) and
> has successfully processed:
> * offset 3 in the configuration topic, partition 0
> * offset 7 in the metrics topic, partition 0
>
> Then Samza will write a checkpoint that looks something like this
> task0 -> {"SystemStreamPartition [kafka, configuration,
>
> 0]":{"system":"kafka","partition":"0","offset":"3","stream":"configuration"},"SystemStreamPartition
> [kafka, metrics,
> 0]":{"system":"kafka","partition":"0","offset":"7","stream":"metrics"}}
>
> Where "kafka" was an arbitrarily-chosen system name.
>
> So while you can't explicitly set the checkpoint offset on a per SSP basis,
> the checkpoints are actually recorded that way. And if your job consumes
> multiple topics, the offsets will be granular enough, because they're
> per-SSP.
>
> More details on checkpointing here:
>
> http://samza.apache.org/learn/documentation/0.13/container/checkpointing.html
>
> -Jake
>
>
> On Wed, Nov 1, 2017 at 5:48 AM, Gaurav Agarwal <gauravagarwal4@gmail.com>
> wrote:
>
> > Thanks, I'll check it out.
> >
> > I have a samza application that is consuming a lot of different types of
> > messages (these messages are related to each other but do not require
> join
> > - think of these like different configuration and metric information of
> > virtual machines that modify some central sates like databases,
> timeseries
> > stores etc). We have used a single KafkaTopic so far with partitions for
> > parallelism.
> >
> > Now, there is a message type (metrics) for which I want to perform larger
> > "batching" for cost reasons.
> >
> > Hence I was exploring ways in which I can put those messages on a
> separate
> > Kafka Topic but use the same samza application that we have been using so
> > far, instead of creating a new one. There is some state (caches etc) that
> > are shared between messages and hence it will be wasteful to launch an
> > independent application.
> >
> > If I could control the checkpointing per topic independently, this
> approach
> > could work.
> >
> > Please let me know if this sounds like a reasonable approach for this?
> >
> > On Sat, Oct 28, 2017 at 8:41 PM, Jagadish Venkatraman <
> > jagadish1989@gmail.com> wrote:
> >
> > > In Samza, the logical unit of processing (and hence, checkpointing) is
> a
> > > task. Hence, you cannot selectively checkpoint SSPs within a task.
> > >
> > > However, you can configure how you group your SSPs into tasks by
> choosing
> > > a Grouper. If you want to control checkpointing at the granularity of
> an
> > > SSP, then you can choose the org.apache.samza.container.grouper.stream.
> > > GroupBySystemStreamPartitionFactory.
> > >
> > > Config reference: https://samza.apache.org/
> > learn/documentation/0.10/jobs/
> > > configuration-table.html
> > >
> > > What are you trying to do? Maybe, there's a simpler way to achieve it?
> > >
> > >
> > >
> > > On Sat, Oct 28, 2017 at 4:09 AM, Gaurav Agarwal <
> > gauravagarwal4@gmail.com>
> > > wrote:
> > >
> > >> Hi All,
> > >>
> > >> If I had Samza Tasks that were consuming message from multiple topics,
> > >> how would checkpoint/commit work in that case? On calling
> > >> taskCordinator.commit(), would current offset of all topics be saved
> for
> > >> the caller task  (only the partitions assigned to the caller task)? Is
> > >> there a way to control this behavior more granularly where I can
> request
> > >> samza to commit the offset for only a given task/topic combination
> only?
> > >>
> > >> --
> > >> thanks,
> > >> gaurav
> > >>
> > >
> > >
> > >
> > > --
> > > Jagadish V,
> > > Graduate Student,
> > > Department of Computer Science,
> > > Stanford University
> > >
> >
>
-- 
-gaurav Sent from my iPhone

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message