kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From João Peixoto <joao.harti...@gmail.com>
Subject Re: Can state stores function as a caching layer for persistent storage
Date Tue, 16 May 2017 23:03:24 GMT
Thanks Matthias. My doubt is on a more fundamental level, but I'll ask on a
separate thread as per Eno's recommendation.

On Tue, May 16, 2017 at 3:26 PM Matthias J. Sax <matthias@confluent.io>
wrote:

> Just to add to this:
>
> Streams create re-partitioning topics "lazy", meaning when the key is
> (potentially) changes, we only set a flag but don't add the
> re-partitioning topic. On groupByKey() or join() we check is this
> "re-partitioning required flag" is set in add the topic.
>
> Thus, if you have a
>
> stream.map().map().selectKey().groupBy()
>
> we only re-partition on groupBy, but not after the first two map() steps.
>
>
> -Matthias
>
> On 5/16/17 8:18 AM, Eno Thereska wrote:
> > (it's preferred to create another email thread for a different topic to
> make it easier to look back)
> >
> > Yes, there could be room for optimizations, e.g., see this:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3cCAJikTEUHR=r0ika6vLF_y+QaJXg8f_Q19og_-s+Q-gozPqBgEw@mail.gmail.com%3e
> <
> http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vLF_y+QaJXg8f_Q19og_-s+Q-gozPqBgEw@mail.gmail.com%3E
> >
> >
> > Eno
> >
> >> On 16 May 2017, at 16:01, João Peixoto <joao.hartimer@gmail.com> wrote:
> >>
> >> Follow up doubt (let me know if a new question should be created).
> >>
> >> Do we always need a repartitioning topic?
> >>
> >> If I'm reading things correctly when we change the key of a record we
> need
> >> make sure the new key falls on the same partition that we are
> processing.
> >> This makes a lot of sense if after such change we'd need to join on some
> >> other stream/table or cases where we sink to a topic.
> >> However, in cases where none of these things, the repartition topic does
> >> nothing? If this is true can we somehow not create it?
> >>
> >> On Sun, May 14, 2017 at 7:58 PM João Peixoto <joao.hartimer@gmail.com>
> >> wrote:
> >>
> >>> Very useful links, thank you.
> >>>
> >>> Part of my original misunderstanding was that the at-least-once
> guarantee
> >>> was considered fulfilled if the record reached a sink node.
> >>>
> >>> Thanks for all the feedback, you may consider my question answered.
> >>> Feel free to ask further questions about the use case if found
> interesting.
> >>> On Sun, May 14, 2017 at 4:31 PM Matthias J. Sax <matthias@confluent.io
> >
> >>> wrote:
> >>>
> >>>> Yes.
> >>>>
> >>>> It is basically "documented", as Streams guarantees at-least-once
> >>>> semantics. Thus, we make sure, you will not loose any data in case of
> >>>> failure. (ie, the overall guarantee is documented)
> >>>>
> >>>> To achieve this, we always flush before we commit offsets. (This is
> not
> >>>> explicitly documented as it's an implementation detail.)
> >>>>
> >>>> There is some doc's in the wiki:
> >>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management#KafkaStreamsInternalDataManagement-Commits
> >>>>
> >>>> This might also help in case you want to dig into the code:
> >>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 5/14/17 4:07 PM, João Peixoto wrote:
> >>>>> I think I now understand what Matthias meant when he said "If you
> use a
> >>>>> global remote store, you would not need to back your changes in
a
> >>>> changelog
> >>>>> topic, as the store would not be lost if case of failure".
> >>>>>
> >>>>> I had the misconception that if a state store threw an exception
> during
> >>>>> "flush", all messages received between now and the previous flush
> would
> >>>> be
> >>>>> "lost", hence the need for a changelog topic. However, it seems
that
> the
> >>>>> "repartition" topic actually solves this problem.
> >>>>>
> >>>>> There's very little information about the latter, at least that
I
> could
> >>>>> find, but an entry seems to be added whenever a record enters the
> >>>>> "aggregate", but the state store "consumer" of this topic only
> updates
> >>>> its
> >>>>> offset after the flush completes, meaning that the repartition topic
> >>>> will
> >>>>> be replayed! It seems this problem is already solved for me, I'd
> >>>> appreciate
> >>>>> if someone could point me to the documentation or code that backs
up
> the
> >>>>> above.
> >>>>>
> >>>>>
> >>>>> On Sat, May 13, 2017 at 3:11 PM João Peixoto <
> joao.hartimer@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Replies in line as well
> >>>>>>
> >>>>>>
> >>>>>> On Sat, May 13, 2017 at 3:25 AM Eno Thereska <
> eno.thereska@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi João,
> >>>>>>>
> >>>>>>> Some answers inline:
> >>>>>>>
> >>>>>>>> On 12 May 2017, at 18:27, João Peixoto <joao.hartimer@gmail.com>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>> Thanks for the comments, here are some clarifications:
> >>>>>>>>
> >>>>>>>> I did look at interactive queries, if I understood them
correctly
> it
> >>>>>>> means
> >>>>>>>> that my state store must hold all the results in order
for it to
> be
> >>>>>>>> queried, either in memory or through disk (RocksDB).
> >>>>>>>
> >>>>>>> Yes, that's correct.
> >>>>>>>
> >>>>>>>
> >>>>>>>> 1. The retention policy on my aggregate operations,
in my case,
> is 90
> >>>>>>> days,
> >>>>>>>> which is way too much data to hold in memory
> >>>>>>>
> >>>>>>> It will depend on how much data/memory you have, but perhaps
it
> could
> >>>> be
> >>>>>>> too much to hold in memory
> >>>>>>> for that long (especially because some failure is bound
to happen
> in
> >>>> 90
> >>>>>>> days)
> >>>>>>>
> >>>>>>>> 2. My stream instances do no have access to disk, even
if they
> did,
> >>>>>>>> wouldn't it mean I'd need almost twice the disk space
to hold the
> >>>> same
> >>>>>>>> data? I.e. kafka brokers golding the topics + RocksDB
holding the
> >>>> state?
> >>>>>>>
> >>>>>>> That's interesting, is there a reason why the streams instances
> don't
> >>>>>>> have access to a local file system? I'm curious
> >>>>>>> what kind of deployment you have.
> >>>>>>>
> >>>>>>
> >>>>>> My instances are deployed on Kubernetes
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>> It is true that twice the disk space is needed to hold the
data on
> >>>>>>> RocksDb as well as in the Kafka changelog topic,
> >>>>>>> however that is no different that the current situation
where the
> >>>> data is
> >>>>>>> stored to a remote database right?
> >>>>>>> I understand your point that you might not have access to
local
> disk
> >>>>>>> though.
> >>>>>>>
> >>>>>>
> >>>>>> Not quite. In my scenario the state store would be backed by
the
> >>>> changelog
> >>>>>> AND MongoDB. On bootstrap we would fetch the state stored in
the
> >>>> changelog
> >>>>>> into a structure like a LoadingCache (from Guava's), which would
> load
> >>>>>> additional fields from MongoDB if needed. Therefore the changelog
> could
> >>>>>> hold say 12 hours of records and 90 days is stored in MongoDB
only.
> >>>> That's
> >>>>>> exactly the whole strategy I'm trying to validate. The implications
> >>>> would
> >>>>>> be that in case of failure we'd need to recover within 12 hours
or
> else
> >>>>>> we'd need to replay from the source topics.
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>> 3. Because a crash may happen between an entry is added
to the
> >>>> changelog
> >>>>>>>> and the data store is flushed, I need to get all the
changes
> >>>> everytime
> >>>>>>> if I
> >>>>>>>> want to guarantee that all data is eventually persisted.
This is
> why
> >>>>>>>> checkpoint files may not work for me.
> >>>>>>>
> >>>>>>> The upcoming exactly-once support in 0.11 will help with
these
> kind of
> >>>>>>> guarantees.
> >>>>>>>
> >>>>>>
> >>>>>> Not sure I understand how exactly-once would help in this case.
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>> Standby tasks looks great, I forgot about those.
> >>>>>>>>
> >>>>>>>> I'm at the design phase so this is all tentative. Answering
> Matthias
> >>>>>>>> questions
> >>>>>>>>
> >>>>>>>> My state stores are local. As mentioned above I do not
have
> access to
> >>>>>>> disk
> >>>>>>>> therefore I need to recover all data from somewhere,
in this case
> I'm
> >>>>>>>> thinking about the changelog.
> >>>>>>>
> >>>>>>> So this is where I get confused a bit, since you mention
that your
> >>>> state
> >>>>>>> stores are "local", i.e., the streams instance
> >>>>>>> does have access to a local file system.
> >>>>>>>
> >>>>>>
> >>>>>> When I said "local" I meant that the state stores are partial
for
> each
> >>>>>> instance, i.e. they only have the partitions the task is responsible
> >>>> for
> >>>>>> (normal behavior) rather than a global store.
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>>> I read about Kafka Connect but have never used it, maybe
that'll
> >>>>>>> simplify
> >>>>>>>> things, but I need to do some studying there.
> >>>>>>>>
> >>>>>>>> The reason why even though my stores are local but still
I want to
> >>>> store
> >>>>>>>> them on a database and not use straight up RocksDB (or
global
> >>>> stores) is
> >>>>>>>> because this would allow me to migrate my current processing
> >>>> pipeline to
> >>>>>>>> Kafka Streams without needing to change the frontend
part of the
> >>>>>>>> application, which fetches data from MongoDB.
> >>>>>>>
> >>>>>>> Makes sense.
> >>>>>>>
> >>>>>>>>
> >>>>>>>> PS When you mention Global State Stores I'm thinking
of
> >>>>>>>>
> >>>>>>>
> >>>>
> http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
> >>>>>>> ,
> >>>>>>>> is this correct?
> >>>>>>>
> >>>>>>> No I think Matthias is saying that if you have a remote
server
> >>>> somewhere
> >>>>>>> where you store all your data (like a shared file system).
> >>>>>>> This is not something Kafka would provide.
> >>>>>>>
> >>>>>>> Eno
> >>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax <
> >>>> matthias@confluent.io
> >>>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> I am not sure about your overall setup. Are your
stores local
> >>>> (similar
> >>>>>>>>> to RocksDB) or are you using one global remote store?
If you use
> a
> >>>>>>>>> global remote store, you would not need to back
your changes in a
> >>>>>>>>> changelog topic, as the store would not be lost
if case of
> failure.
> >>>>>>>>>
> >>>>>>>>> Also (in case that your stores are remote), did
you consider
> using
> >>>>>>> Kafka
> >>>>>>>>> Connect to export your data into an external store
like MySQL or
> >>>>>>> MongoDB
> >>>>>>>>> instead of writing your own custom stores for Streams?
> >>>>>>>>>
> >>>>>>>>> If your stores are local, why do you write custom
stores? I am
> >>>> curious
> >>>>>>>>> to understand why RocksDB does not serve your needs.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> About your two comment:
> >>>>>>>>>
> >>>>>>>>> (1) Streams uses RocksDB by default and the default
> implementation
> >>>> is
> >>>>>>>>> using "checkpoint files" in next release. Those
checkpoint files
> >>>> track
> >>>>>>>>> the changelog offsets of the data that got flushed
to disc. This
> >>>> allows
> >>>>>>>>> to reduce the startup time, as only the tail of
the changelog
> needs
> >>>> to
> >>>>>>>>> be read to bring the store up to date. For this,
you would always
> >>>> (1)
> >>>>>>>>> write to the changelog, (2) write to you store.
Each time you
> need
> >>>> to
> >>>>>>>>> flush, you know that all data is in the changelog
already. After
> >>>> each
> >>>>>>>>> flush, you can update the "local offset checkpoint
file".
> >>>>>>>>>
> >>>>>>>>> I guess, if you use local stores you can apply a
similar pattern
> in
> >>>> you
> >>>>>>>>> custom store implementation. (And as mentioned above,
for global
> >>>> remote
> >>>>>>>>> store you would not need the changelog anyway. --
This also
> applies
> >>>> to
> >>>>>>>>> your recovery question from below.)
> >>>>>>>>>
> >>>>>>>>> (2) You can configure standby task (via StreamConfig
> >>>>>>>>> "num.standby.replicas"). This will set up standby
tasks that
> >>>> passively
> >>>>>>>>> replicate your stores to another instance. In error
case, state
> >>>> will be
> >>>>>>>>> migrated to those "hot standbys" reducing recovery
time
> >>>> significantly.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> About your question:
> >>>>>>>>>
> >>>>>>>>> (1) Yes.
> >>>>>>>>> (2) Partly parallel (ie, if you run with multiple
threads -- cf.
> >>>>>>>>> StreamsConfig "num.streams.thread"). Each thread,
flushes all
> it's
> >>>>>>>>> stores sequentially.
> >>>>>>>>> (3) Yes. There will be a store for each partition.
(If store is
> >>>> local.)
> >>>>>>>>> (4) Yes. The overall processing loop is sequential
(cf.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture
> >>>>>>>>> )
> >>>>>>>>> Also, the next commit point is computed after a
successful
> commit --
> >>>>>>>>> thus, if one commit is delayed, all consecutive
commit points are
> >>>>>>>>> "shifted" by this delay.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 5/12/17 9:00 AM, João Peixoto wrote:
> >>>>>>>>>> On a stream definition I perform an "aggregate"
which is
> configured
> >>>>>>> with
> >>>>>>>>> a
> >>>>>>>>>> state store.
> >>>>>>>>>>
> >>>>>>>>>> *Goal*: Persist the aggregation results into
a database, e.g.
> >>>> MySQL or
> >>>>>>>>>> MongoDB
> >>>>>>>>>>
> >>>>>>>>>> *Working approach*:
> >>>>>>>>>> I have created a custom StateStore backed by
a changelog topic
> like
> >>>>>>> the
> >>>>>>>>>> builtin state stores. Whenever the store gets
flushed I save to
> the
> >>>>>>>>>> database, mark the record as being persisted
and log the change
> in
> >>>> the
> >>>>>>>>>> changelog.
> >>>>>>>>>>
> >>>>>>>>>> If something goes wrong during processing, the
changelog
> guarantees
> >>>>>>> that
> >>>>>>>>> I
> >>>>>>>>>> do not lose data, restores the state and if
some data point was
> not
> >>>>>>>>>> persisted, the next stream instance will persist
it on its flush
> >>>>>>>>> operation.
> >>>>>>>>>>
> >>>>>>>>>> 1. I cannot store too much data in the changelog,
even with
> >>>>>>> compaction,
> >>>>>>>>> if
> >>>>>>>>>> I have too much data, bootstrapping a stream
instance would
> take a
> >>>>>>> long
> >>>>>>>>> time
> >>>>>>>>>> 2. On the other hand, if I take too long to
recover from a
> >>>> failure, I
> >>>>>>> may
> >>>>>>>>>> lose data. So there seems to be a delicate tradeoff
here
> >>>>>>>>>>
> >>>>>>>>>> *Questions*:
> >>>>>>>>>>
> >>>>>>>>>> 1. Is this a reasonable use case?
> >>>>>>>>>> 2. In a scenario where my stream would have
a fanout (multiple
> >>>>>>>>> sub-streams
> >>>>>>>>>> based on the same stream), each branch would
perform different
> >>>>>>>>> "aggregate"
> >>>>>>>>>> operations, each with its own state store. Are
state stores
> >>>> flushed in
> >>>>>>>>>> parallel or sequentially?
> >>>>>>>>>> 3. The above also applies per-partition. As
a stream definition
> is
> >>>>>>>>>> parallelized by partition, will one instance
hold different
> store
> >>>>>>>>> instances
> >>>>>>>>>> for each one?
> >>>>>>>>>> 4. Through synthetic sleeps I simulated slow
flushes, slower
> than
> >>>> the
> >>>>>>>>>> commit interval. The stream seems to be ok with
it and didn't
> >>>> throw, I
> >>>>>>>>>> assume the Kafka consumer does not poll more
records until all
> of
> >>>> the
> >>>>>>>>>> previous poll's are committed, but I couldn't
find
> documentation to
> >>>>>>> back
> >>>>>>>>>> this statement. Is there a timeout for "commit"
operations?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Sample code
> >>>>>>>>>>
> >>>>>>>>>> public class AggregateHolder {
> >>>>>>>>>>
> >>>>>>>>>>   private Long commonKey;
> >>>>>>>>>>   private List<Double> rawValues = new
ArrayList<>();
> >>>>>>>>>>   private boolean persisted;
> >>>>>>>>>>
> >>>>>>>>>> // ...
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> And stream definition
> >>>>>>>>>>
> >>>>>>>>>> source.groupByKey(Serdes.String(), recordSerdes)
> >>>>>>>>>>             .aggregate(
> >>>>>>>>>>                     AggregateHolder::new,
> >>>>>>>>>>                     (aggKey, value, aggregate)
->
> >>>>>>>>>> aggregate.addValue(value.getValue()),
> >>>>>>>>>>                     new DemoStoreSupplier<>(/*
... */)
> >>>>>>>>>>             )
> >>>>>>>>>>             .foreach((key, agg) -> log.debug("Aggregate:
{}={}
> >>>> ({})",
> >>>>>>>>>> key, agg.getAverage(), agg));
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>>
> >
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message