samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Renato Marroquín Mogrovejo <renatoj.marroq...@gmail.com>
Subject Re: Questions about checkpointing and Kinesis
Date Sun, 12 Feb 2017 22:58:06 GMT
That sounds great Chad! Please let me know how you progress on.
I will try to update the references to the lastest Samza and try the code I
have out, and then report back.


Best,


Renato M.

2017-02-11 1:35 GMT+01:00 Chad Greenberg <evil_goodness@hotmail.com>:

> Thanks for the link Renato. I was able to incorporate some changes thanks
> to Jagadish and Boris, but I will test out your code. I will update the
> dependencies and take it from there.
>
>
> ________________________________
> From: Renato Marroquín Mogrovejo <renatoj.marroquin@gmail.com>
> Sent: Wednesday, February 8, 2017 5:10:12 PM
> To: dev@samza.apache.org
> Subject: Re: Questions about checkpointing and Kinesis
>
> Hi all,
>
> First of all I'd like to say sorry I have gone missing for such a long time
> but student life hasn't been easy last year and I have neglected the work
> done on this topic.
> Anyways, on [1] is the work done during GSoC (I did share it with people
> but publicly)
> @Chad I'd be more than happy (and thankful!) to work with you to bring it
> up to date and finally get it into Samza trunk, would you be up for that?
> :)
>
>
> Best,
>
> Renato M.
>
> [1] https://github.com/renato2099/SamzaKinesis
>
> 2017-02-08 2:26 GMT+01:00 Boris S <boryas@gmail.com>:
>
> > If you are implementing your own consumer, on start Samza will call
> > register method of the consumer and will pass the offsets.
> > If you are using CheckpointListener (like Jagadish mentioned) you will
> get
> > a callback on each checkpoint (so you can ignore the register call),
> > otherwise you can use checkpoint passed to you in the register call.
> >
> > On Tue, Feb 7, 2017 at 9:11 AM, Chad Greenberg <
> evil_goodness@hotmail.com>
> > wrote:
> >
> > > Thanks Jagadish for the reply. A few comments.
> > >
> > >
> > > I was under the impression that using the KCL would mean recording the
> > > offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL
> so
> > > that I can save state in Samza.
> > >
> > >
> > > I assumed that the checkpoint values were based on the values of the
> > put()
> > > method, but I did not see any explicit documentation to that effect (I
> > have
> > > been reading a ton, so I could have missed something). What I do not
> see
> > > however, is how to retrieve those values upon start-up. Is this the
> role
> > of
> > > the SystemAdmin? Is there any documentation about the use of
> SystemAdmin?
> > >
> > > ________________________________
> > > From: Jagadish Venkatraman <jagadish1989@gmail.com>
> > > Sent: Tuesday, February 7, 2017 1:48:00 AM
> > > To: dev@samza.apache.org
> > > Subject: Re: Questions about checkpointing and Kinesis
> > >
> > > Great to hear this development on the kinesis consumer!
> > >
> > > Let me answer some of your questions here.
> > >
> > > *1. "Kinesis does not have a listener/push framework (unless I missed
> > > something)"*
> > >
> > >  Let me point out that Kinesis has both a push and a pull based model.
> > You
> > > can choose to implement either for your use-case.
> > >
> > >    - *Pull:* The pull based model supports obtaining an *ShardIterator*
> > for
> > >    a shard and iterating on it. Please refer the docs for the
> > >    *ShardIteratorRequest* here
> > >    <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/
> > > amazonaws/services/kinesis/model/GetShardIteratorRequest.html>.
> > >    It's important that you are aware of re-sharding - The
> > >    record.getNextShardIterator() can return null if there was a merge
> / a
> > >    split for the shard. (You can trivially handle re-shards by
> > re-starting)
> > >    - *Push:* The push based model directly uses KCL (The Kinesis Client
> > >    Library) to subscribe to events. KCL will handle sharding,
> > re-balancing,
> > >    checkpointing internally.
> > >
> > > Your consumer can implement the *CheckpointListener
> > > <https://github.com/apache/samza/blob/master/samza-api/
> > > src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>*
> > > interface to get notified when Samza performs a checkpoint. You can
> save
> > > the offsets of the shards to kinesis by invoking
> > > IRecordCheckpointer.checkpoint
> > > <https://github.com/awslabs/amazon-kinesis-client/blob/
> > > master/src/main/java/com/amazonaws/services/kinesis/
> > > clientlibrary/interfaces/IRecordProcessorCheckpointer.java>
> > > APIs.
> > >
> > >
> > > *2. "My plan was to use the default KafkaCheckpointManagerFactory on an
> > > timed interval basis"*
> > >
> > > The checkpoint manager merely provides persistence for the checkpoints.
> > (In
> > > that sense, it's actually a checkpoint writer). You probably don't want
> > to
> > > implement a custom checkpoint manager.
> > >
> > >
> > > *3. "What exactly is being checkpointed? What value can I retrieve to
> use
> > > as an offset for my Kinesis stream? Or is this something I need to keep
> > > track of in a store? If so, what is the point of checkpointing? Can I
> use
> > > RocksDb to save the Kinesis offset at every document (efficiently that
> > > is)?"*
> > >
> > > *- *Samza checkpoints [ssp, offset] pairs for your tasks.
> > > - Kinesis has an implicit notion of sequence numbers for every shard
> in a
> > > stream. You can use that as offsets.
> > > - You don't want to record offsets in a separate store. If you want
> Samza
> > > to manage offsets, Samza will use Kafka internally. If you want Kinesis
> > to
> > > manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets.
> > >
> > >
> > >
> > >
> > > On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <
> > evil_goodness@hotmail.com>
> > > wrote:
> > >
> > > > Starting on an integration project between a Kinesis stream and
> Samza,
> > > > despite have no background in either, but I am familiar with most
> other
> > > > messaging/queuing systems.
> > > >
> > > >
> > > > Decided to keep all state management within Samza instead of using
> > > > Kinesis' client library. My plan was to use the default
> > > > KafkaCheckpointManagerFactory on an timed interval basis, but I have
> a
> > > few
> > > > questions.
> > > >
> > > >
> > > > What exactly is being checkpointed? What value can I retrieve to use
> as
> > > an
> > > > offset for my Kinesis stream? Or is this something I need to keep
> track
> > > of
> > > > in a store? If so, what is the point of checkpointing? Can I use
> > RocksDb
> > > to
> > > > save the Kinesis offset at every document (efficiently that is)?
> > > >
> > > >
> > > > Related to Kinesis and not quite Samza, it does not have a
> > listener/push
> > > > framework, but it requires constant polling (unless I missed
> > something).
> > > > First of all, I was going to have a partition for each Kinesis shard
> > > > partition. But the next question is, should I simply have a
> while(true)
> > > > polling method inside my consumer(BlockingEnvelopeMap)? Seems
> > > inefficient,
> > > > even with a timeout. How can I get new data to instantiate a new
> > > consumer?
> > > > My consumer will put a new document to my task.
> > > >
> > > >
> > > > Cheers.
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Jagadish V,
> > > Graduate Student,
> > > Department of Computer Science,
> > > Stanford University
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message