samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bae, Jae Hyeon" <metac...@gmail.com>
Subject Re: How to synchronize KeyValueStore and Kafka cleanup
Date Sat, 03 Oct 2015 00:24:33 GMT
Hi Chinmay

> Why wouldn't you want to use a changelog ?

Because log compaction won't work here because I want to cache unique ids
which will not be repeated like timestamp. But for restoration, I have to
use changelog. Also, my StreamTask should consume that topic to add new ids
unique ids to KV store and delete outdated ones.

My concern is, if my StreamTask consume the topic and call put() method as
the following:

if
(envelope.getSystemStreamPartition().getStream().equals("changelog_topic"))
{
            store.put((String) envelope.getKey(), (String)
envelope.getMessage());
} else {
...
}

store will send messages to Kafka for the changelog but it will make just a
duplicate.

So, should I create another topic which is different from the source for
the change log? For example, if unique ids is coming from topicA, do I need
to create something like changelog_topicA?

On Fri, Oct 2, 2015 at 5:11 PM, Chinmay Soman <chinmay.cerebro@gmail.com>
wrote:

> > Does KV-store consume automatically from a Kafka topic?
> Yes - if you've configured changelog stream for your store
>
> >  Does it consume only on restore()?
> It consumes only during container initialization (again, assuming if you
> have changelog configured)
>
> > implement the StreamTask job to consume a Kafka topic and call add()
> method?
> Why wouldn't you want to use a changelog ?
>
>
> On Fri, Oct 2, 2015 at 3:09 PM, Bae, Jae Hyeon <metacret@gmail.com> wrote:
>
> > Thanks Yi Pan, I have one more question.
> >
> > Does KV-store consume automatically from a Kafka topic? Does it consume
> > only on restore()? If so, do I have to implement the StreamTask job to
> > consume a Kafka topic and call add() method?
> >
> > On Fri, Oct 2, 2015 at 2:01 PM, Yi Pan <nickpan47@gmail.com> wrote:
> >
> > > Hi, Jae Hyeon,
> > >
> > > Good to see you back on the mailing list again! Regarding to your
> > > questions, please see the answers below:
> > >
> > > > My KeyValueStore usage is a little bit different from usual cases
> > because
> > > > >  I have to cache all unique ids for the past six hours, which can
> be
> > > > > configured for the retention usage. Unique ids won't be repeated
> such
> > > as
> > > > > timestamp. In this case, log.cleanup.policy=compact will keep
> growing
> > > the
> > > > > KeyValueStore size, right?
> > > >
> > >
> > > It will grow as big as the accumulative size of your unique ids.
> > >
> > >
> > > > >
> > > > > Can I use Samza KeyValueStore for the topics
> > > > > with log.cleanup.policy=delete? If not, what's your recommended way
> > for
> > > > > state management of non-changelog Kafka topic? If it's possible,
> how
> > > does
> > > > > Kafka cleanup remove outdated records in KeyValueStore?
> > > >
> > >
> > > I am not quite sure about your definition of "non-changelog" Kafka
> > topics.
> > > If you want to retire some of the old records in a KV-store
> periodically,
> > > you will have to run the pruning manually in the window() method in the
> > > current release. In the upcoming 0.10 release, we have incorporated
> > RocksDB
> > > TTL features in the KV-store, which would automatically prune the old
> > > entries in the RocksDB store automatically. That said, the upcoming TTL
> > > feature is not fully synchronized w/ the Kafka cleanup yet and is an
> > > on-going work in the future. The recommendation is to use the TTL
> feature
> > > and set the Kafka changelog to be time-retention based, w/ a retention
> > time
> > > longer than the RocksDB TTL to ensure no data loss.
> > >
> > > Hope the above answered your questions.
> > >
> > > Cheers!
> > >
> > > -Yi
> > >
> >
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message