samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacob Maes <jacob.m...@gmail.com>
Subject Re: Per task/topic checkpoint?
Date Wed, 01 Nov 2017 16:28:24 GMT
Ahh, sorry, I misunderstood the problem.

Does the application use InMemoryKeyValueStore to accumulate the deltas? If
so, I would think you could enable the changelog on that store and commit
as often as you like, because having the deltas backed up durably should
allow you to decouple commit() from send().

If that still doesn't fit this case, then I'm afraid a separate job may be
necessary.

On Wed, Nov 1, 2017 at 9:07 AM, Gaurav Agarwal <gauravagarwal4@gmail.com>
wrote:

> Thanks Jake.
>
> In this application we control the checkpointing explicitly to accumulate
> certain amount of delta in memory before committing them to stores, and
> checkpointing. This is to reduce the commit counts and some other business
> case like deduplication of deltas.
>
> The scenario that I was trying to fit in is - to have two different
> checkpointing frequencies for different topics(with different processing
> cost semantics) - say, for metric messages I wanted to create inmemory
> batches of 15 minutes of data before writing it out and checkpointing
> (reason being that metric sdms are cheap to redo and larger batch result in
> much better compression - Prometheus kind of compression techniques),
> whereas configuration messages are much more expensive to process and so I
> want to commit any changes due to these more frequently - say every 2 mins
> and checkpointing the associated topic.
>
> On Wed, 1 Nov 2017 at 9:11 PM, Jacob Maes <jacob.maes@gmail.com> wrote:
>
>> Hey Gaurav,
>>
>> Samza automatically keeps track of the offsets your job has successfully
>> processed for each SSP. When your task requests a checkpoint, Samza will
>> write the offset of the latest successfully-processed message for each SSP
>> that task consumes.
>>
>> So if task0 consumes partition 0 of two topics (configuration, metrics)
>> and
>> has successfully processed:
>> * offset 3 in the configuration topic, partition 0
>> * offset 7 in the metrics topic, partition 0
>>
>> Then Samza will write a checkpoint that looks something like this
>> task0 -> {"SystemStreamPartition [kafka, configuration,
>> 0]":{"system":"kafka","partition":"0","offset":"3","
>> stream":"configuration"},"SystemStreamPartition
>> [kafka, metrics,
>> 0]":{"system":"kafka","partition":"0","offset":"7","stream":"metrics"}}
>>
>> Where "kafka" was an arbitrarily-chosen system name.
>>
>> So while you can't explicitly set the checkpoint offset on a per SSP
>> basis,
>> the checkpoints are actually recorded that way. And if your job consumes
>> multiple topics, the offsets will be granular enough, because they're
>> per-SSP.
>>
>> More details on checkpointing here:
>> http://samza.apache.org/learn/documentation/0.13/container/
>> checkpointing.html
>>
>> -Jake
>>
>>
>> On Wed, Nov 1, 2017 at 5:48 AM, Gaurav Agarwal <gauravagarwal4@gmail.com>
>> wrote:
>>
>> > Thanks, I'll check it out.
>> >
>> > I have a samza application that is consuming a lot of different types of
>> > messages (these messages are related to each other but do not require
>> join
>> > - think of these like different configuration and metric information of
>> > virtual machines that modify some central sates like databases,
>> timeseries
>> > stores etc). We have used a single KafkaTopic so far with partitions for
>> > parallelism.
>> >
>> > Now, there is a message type (metrics) for which I want to perform
>> larger
>> > "batching" for cost reasons.
>> >
>> > Hence I was exploring ways in which I can put those messages on a
>> separate
>> > Kafka Topic but use the same samza application that we have been using
>> so
>> > far, instead of creating a new one. There is some state (caches etc)
>> that
>> > are shared between messages and hence it will be wasteful to launch an
>> > independent application.
>> >
>> > If I could control the checkpointing per topic independently, this
>> approach
>> > could work.
>> >
>> > Please let me know if this sounds like a reasonable approach for this?
>> >
>> > On Sat, Oct 28, 2017 at 8:41 PM, Jagadish Venkatraman <
>> > jagadish1989@gmail.com> wrote:
>> >
>> > > In Samza, the logical unit of processing (and hence, checkpointing)
>> is a
>> > > task. Hence, you cannot selectively checkpoint SSPs within a task.
>> > >
>> > > However, you can configure how you group your SSPs into tasks by
>> choosing
>> > > a Grouper. If you want to control checkpointing at the granularity of
>> an
>> > > SSP, then you can choose the org.apache.samza.container.
>> grouper.stream.
>> > > GroupBySystemStreamPartitionFactory.
>> > >
>> > > Config reference: https://samza.apache.org/
>> > learn/documentation/0.10/jobs/
>> > > configuration-table.html
>> > >
>> > > What are you trying to do? Maybe, there's a simpler way to achieve it?
>> > >
>> > >
>> > >
>> > > On Sat, Oct 28, 2017 at 4:09 AM, Gaurav Agarwal <
>> > gauravagarwal4@gmail.com>
>> > > wrote:
>> > >
>> > >> Hi All,
>> > >>
>> > >> If I had Samza Tasks that were consuming message from multiple
>> topics,
>> > >> how would checkpoint/commit work in that case? On calling
>> > >> taskCordinator.commit(), would current offset of all topics be saved
>> for
>> > >> the caller task  (only the partitions assigned to the caller task)?
>> Is
>> > >> there a way to control this behavior more granularly where I can
>> request
>> > >> samza to commit the offset for only a given task/topic combination
>> only?
>> > >>
>> > >> --
>> > >> thanks,
>> > >> gaurav
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > Jagadish V,
>> > > Graduate Student,
>> > > Department of Computer Science,
>> > > Stanford University
>> > >
>> >
>>
> --
> -gaurav Sent from my iPhone
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message