kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Kirwin <...@kirw.in>
Subject Re: Kafka as an event store for Event Sourcing
Date Tue, 14 Jul 2015 03:25:55 GMT
Ah, just saw this. I actually just submitted a patch this evening --
just for the partitionwide version at the moment, since it turns out
to be pretty simple to implement. Still very interested in moving
forward with this stuff, though not always as much time as I would
like...

On Thu, Jul 9, 2015 at 9:39 AM, Daniel Schierbeck
<daniel.schierbeck@gmail.com> wrote:
> Ben, are you still interested in working on this?
>
> On Mon, Jun 15, 2015 at 9:49 AM Daniel Schierbeck <
> daniel.schierbeck@gmail.com> wrote:
>
>> I like to refer to it as "conditional write" or "conditional request",
>> semantically similar to HTTP's If-Match header.
>>
>> Ben: I'm adding a comment about per-key checking to your JIRA.
>>
>> On Mon, Jun 15, 2015 at 4:06 AM Ben Kirwin <ben@kirw.in> wrote:
>>
>>> Yeah, it's definitely not a standard CAS, but it feels like the right
>>> fit for the commit log abstraction -- CAS on a 'current value' does
>>> seem a bit too key-value-store-ish for Kafka to support natively.
>>>
>>> I tried to avoid referring to the check-offset-before-publish
>>> functionality as a CAS in the ticket because, while they're both types
>>> of 'optimistic concurrency control', they are a bit different -- and
>>> the offset check is both easier to implement and handier for the stuff
>>> I tend to work on. (Though that ticket's about checking the latest
>>> offset on a whole partition, not the key -- there's a different set of
>>> tradeoffs for the latter, and I haven't thought it through properly
>>> yet.)
>>>
>>> On Sat, Jun 13, 2015 at 3:35 PM, Ewen Cheslack-Postava
>>> <ewen@confluent.io> wrote:
>>> > If you do CAS where you compare the offset of the current record for the
>>> > key, then yes. This might work fine for applications that track key,
>>> value,
>>> > and offset. It is not quite the same as doing a normal CAS.
>>> >
>>> > On Sat, Jun 13, 2015 at 12:07 PM, Daniel Schierbeck <
>>> > daniel.schierbeck@gmail.com> wrote:
>>> >
>>> >> But wouldn't the key->offset table be enough to accept or reject
a
>>> write?
>>> >> I'm not familiar with the exact implementation of Kafka, so I may be
>>> wrong.
>>> >>
>>> >> On lør. 13. jun. 2015 at 21.05 Ewen Cheslack-Postava <
>>> ewen@confluent.io>
>>> >> wrote:
>>> >>
>>> >> > Daniel: By random read, I meant not reading the data sequentially
as
>>> is
>>> >> the
>>> >> > norm in Kafka, not necessarily a random disk seek. That in-memory
>>> data
>>> >> > structure is what enables the random read. You're either going
to
>>> need
>>> >> the
>>> >> > disk seek if the data isn't in the fs cache or you're trading memory
>>> to
>>> >> > avoid it. If it's a full index containing keys and values then
you're
>>> >> > potentially committing to a much larger JVM memory footprint (and
>>> all the
>>> >> > GC issues that come with it) since you'd be storing that data in
the
>>> JVM
>>> >> > heap. If you're only storing the keys + offset info, then you
>>> potentially
>>> >> > introduce random disk seeks on any CAS operation (and making page
>>> caching
>>> >> > harder for the OS, etc.).
>>> >> >
>>> >> >
>>> >> > On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck <
>>> >> > daniel.schierbeck@gmail.com> wrote:
>>> >> >
>>> >> > > Ewen: would single-key CAS necessitate random reads? My idea
was to
>>> >> have
>>> >> > > the broker maintain an in-memory table that could be rebuilt
from
>>> the
>>> >> log
>>> >> > > or a snapshot.
>>> >> > > On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava <
>>> >> ewen@confluent.io>
>>> >> > > wrote:
>>> >> > >
>>> >> > > > Jay - I think you need broker support if you want CAS
to work
>>> with
>>> >> > > > compacted topics. With the approach you described you
can't turn
>>> on
>>> >> > > > compaction since that would make it last-writer-wins,
and using
>>> any
>>> >> > > > non-infinite retention policy would require some external
>>> process to
>>> >> > > > monitor keys that might expire and refresh them by rewriting
the
>>> >> data.
>>> >> > > >
>>> >> > > > That said, I think any addition like this warrants a
lot of
>>> >> discussion
>>> >> > > > about potential use cases since there are a lot of ways
you
>>> could go
>>> >> > > adding
>>> >> > > > support for something like this. I think this is an obvious
next
>>> >> > > > incremental step, but someone is bound to have a use
case that
>>> would
>>> >> > > > require multi-key CAS and would be costly to build atop
single
>>> key
>>> >> CAS.
>>> >> > > Or,
>>> >> > > > since the compare requires a random read anyway, why
not throw in
>>> >> > > > read-by-key rather than sequential log reads, which would
allow
>>> for
>>> >> > > > minitransactions a la Sinfonia?
>>> >> > > >
>>> >> > > > I'm not convinced trying to make Kafka support traditional
>>> key-value
>>> >> > > store
>>> >> > > > functionality is a good idea. Compacted topics made it
possible
>>> to
>>> >> use
>>> >> > > it a
>>> >> > > > bit more in that way, but didn't change the public interface,
>>> only
>>> >> the
>>> >> > > way
>>> >> > > > storage was implemented, and importantly all the potential
>>> additional
>>> >> > > > performance costs & data structures are isolated
to background
>>> >> threads.
>>> >> > > >
>>> >> > > > -Ewen
>>> >> > > >
>>> >> > > > On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck <
>>> >> > > > daniel.schierbeck@gmail.com> wrote:
>>> >> > > >
>>> >> > > > > @Jay:
>>> >> > > > >
>>> >> > > > > Regarding your first proposal: wouldn't that mean
that a
>>> producer
>>> >> > > > wouldn't
>>> >> > > > > know whether a write succeeded? In the case of event
sourcing,
>>> a
>>> >> > failed
>>> >> > > > CAS
>>> >> > > > > may require re-validating the input with the new
state. Simply
>>> >> > > discarding
>>> >> > > > > the write would be wrong.
>>> >> > > > >
>>> >> > > > > As for the second idea: how would a client of the
writer
>>> service
>>> >> know
>>> >> > > > which
>>> >> > > > > writer is the leader? For example, how would a load
balancer
>>> know
>>> >> > which
>>> >> > > > web
>>> >> > > > > app process to route requests to? Ideally, all processes
would
>>> be
>>> >> > able
>>> >> > > to
>>> >> > > > > handle requests.
>>> >> > > > >
>>> >> > > > > Using conditional writes would allow any producer
to write and
>>> >> > provide
>>> >> > > > > synchronous feedback to the producers.
>>> >> > > > > On fre. 12. jun. 2015 at 18.41 Jay Kreps <jay@confluent.io>
>>> wrote:
>>> >> > > > >
>>> >> > > > > > I have been thinking a little about this. I
don't think CAS
>>> >> > actually
>>> >> > > > > > requires any particular broker support. Rather
the two
>>> writers
>>> >> just
>>> >> > > > write
>>> >> > > > > > messages with some deterministic check-and-set
criteria and
>>> all
>>> >> the
>>> >> > > > > > replicas read from the log and check this criteria
before
>>> >> applying
>>> >> > > the
>>> >> > > > > > write. This mechanism has the downside that
it creates
>>> additional
>>> >> > > > writes
>>> >> > > > > > when there is a conflict and requires waiting
on the full
>>> >> roundtrip
>>> >> > > > > (write
>>> >> > > > > > and then read) but it has the advantage that
it is very
>>> flexible
>>> >> as
>>> >> > > to
>>> >> > > > > the
>>> >> > > > > > criteria you use.
>>> >> > > > > >
>>> >> > > > > > An alternative strategy for accomplishing the
same thing a
>>> bit
>>> >> more
>>> >> > > > > > efficiently is to elect leaders amongst the
writers
>>> themselves.
>>> >> > This
>>> >> > > > > would
>>> >> > > > > > require broker support for single writer to
avoid the
>>> possibility
>>> >> > of
>>> >> > > > > split
>>> >> > > > > > brain. I like this approach better because
the leader for a
>>> >> > partition
>>> >> > > > can
>>> >> > > > > > then do anything they want on their local data
to make the
>>> >> decision
>>> >> > > of
>>> >> > > > > what
>>> >> > > > > > is committed, however the downside is that
the mechanism is
>>> more
>>> >> > > > > involved.
>>> >> > > > > >
>>> >> > > > > > -Jay
>>> >> > > > > >
>>> >> > > > > > On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin
<ben@kirw.in>
>>> wrote:
>>> >> > > > > >
>>> >> > > > > > > Gwen: Right now I'm just looking for feedback
-- but yes,
>>> if
>>> >> > folks
>>> >> > > > are
>>> >> > > > > > > interested, I do plan to do that implementation
work.
>>> >> > > > > > >
>>> >> > > > > > > Daniel: Yes, that's exactly right. I haven't
thought much
>>> about
>>> >> > > > > > > per-key... it does sound useful, but the
implementation
>>> seems a
>>> >> > bit
>>> >> > > > > > > more involved. Want to add it to the ticket?
>>> >> > > > > > >
>>> >> > > > > > > On Fri, Jun 12, 2015 at 7:49 AM, Daniel
Schierbeck
>>> >> > > > > > > <daniel.schierbeck@gmail.com> wrote:
>>> >> > > > > > > > Ben: your solutions seems to focus
on partition-wide CAS.
>>> >> Have
>>> >> > > you
>>> >> > > > > > > > considered per-key CAS? That would
make the feature more
>>> >> useful
>>> >> > > in
>>> >> > > > my
>>> >> > > > > > > > opinion, as you'd greatly reduce
the contention.
>>> >> > > > > > > >
>>> >> > > > > > > > On Fri, Jun 12, 2015 at 6:54 AM Gwen
Shapira <
>>> >> > > > gshapira@cloudera.com>
>>> >> > > > > > > wrote:
>>> >> > > > > > > >
>>> >> > > > > > > >> Hi Ben,
>>> >> > > > > > > >>
>>> >> > > > > > > >> Thanks for creating the ticket.
Having check-and-set
>>> >> > capability
>>> >> > > > will
>>> >> > > > > > be
>>> >> > > > > > > >> sweet :)
>>> >> > > > > > > >> Are you planning to implement
this yourself? Or is it
>>> just
>>> >> an
>>> >> > > idea
>>> >> > > > > for
>>> >> > > > > > > >> the community?
>>> >> > > > > > > >>
>>> >> > > > > > > >> Gwen
>>> >> > > > > > > >>
>>> >> > > > > > > >> On Thu, Jun 11, 2015 at 8:01
PM, Ben Kirwin <
>>> ben@kirw.in>
>>> >> > > wrote:
>>> >> > > > > > > >> > As it happens, I submitted
a ticket for this feature a
>>> >> > couple
>>> >> > > > days
>>> >> > > > > > > ago:
>>> >> > > > > > > >> >
>>> >> > > > > > > >> > https://issues.apache.org/jira/browse/KAFKA-2260
>>> >> > > > > > > >> >
>>> >> > > > > > > >> > Couldn't find any existing
proposals for similar
>>> things,
>>> >> but
>>> >> > > > it's
>>> >> > > > > > > >> > certainly possible they're
out there...
>>> >> > > > > > > >> >
>>> >> > > > > > > >> > On the other hand, I think
you can solve your
>>> particular
>>> >> > issue
>>> >> > > > by
>>> >> > > > > > > >> > reframing the problem: treating
the messages as
>>> 'requests'
>>> >> > or
>>> >> > > > > > > >> > 'commands' instead of statements
of fact. In your
>>> >> > > flight-booking
>>> >> > > > > > > >> > example, the log would correctly
reflect that two
>>> >> different
>>> >> > > > people
>>> >> > > > > > > >> > tried to book the same flight;
the stream consumer
>>> would
>>> >> be
>>> >> > > > > > > >> > responsible for finalizing
one booking, and notifying
>>> the
>>> >> > > other
>>> >> > > > > > client
>>> >> > > > > > > >> > that their request had failed.
(In-browser or by
>>> email.)
>>> >> > > > > > > >> >
>>> >> > > > > > > >> > On Wed, Jun 10, 2015 at
5:04 AM, Daniel Schierbeck
>>> >> > > > > > > >> > <daniel.schierbeck@gmail.com>
wrote:
>>> >> > > > > > > >> >> I've been working on
an application which uses Event
>>> >> > > Sourcing,
>>> >> > > > > and
>>> >> > > > > > > I'd
>>> >> > > > > > > >> like
>>> >> > > > > > > >> >> to use Kafka as opposed
to, say, a SQL database to
>>> store
>>> >> > > > events.
>>> >> > > > > > This
>>> >> > > > > > > >> would
>>> >> > > > > > > >> >> allow me to easily integrate
other systems by having
>>> them
>>> >> > > read
>>> >> > > > > off
>>> >> > > > > > > the
>>> >> > > > > > > >> >> Kafka topics.
>>> >> > > > > > > >> >>
>>> >> > > > > > > >> >> I do have one concern,
though: the consistency of the
>>> >> data
>>> >> > > can
>>> >> > > > > only
>>> >> > > > > > > be
>>> >> > > > > > > >> >> guaranteed if a command
handler has a complete
>>> picture of
>>> >> > all
>>> >> > > > > past
>>> >> > > > > > > >> events
>>> >> > > > > > > >> >> pertaining to some entity.
>>> >> > > > > > > >> >>
>>> >> > > > > > > >> >> As an example, consider
an airline seat reservation
>>> >> system.
>>> >> > > > Each
>>> >> > > > > > > >> >> reservation command
issued by a user is rejected if
>>> the
>>> >> > seat
>>> >> > > > has
>>> >> > > > > > > already
>>> >> > > > > > > >> >> been taken. If the seat
is available, a record
>>> describing
>>> >> > the
>>> >> > > > > event
>>> >> > > > > > > is
>>> >> > > > > > > >> >> appended to the log.
This works great when there's
>>> only
>>> >> one
>>> >> > > > > > producer,
>>> >> > > > > > > >> but
>>> >> > > > > > > >> >> in order to scale I
may need multiple producer
>>> processes.
>>> >> > > This
>>> >> > > > > > > >> introduces a
>>> >> > > > > > > >> >> race condition: two
command handlers may
>>> simultaneously
>>> >> > > > receive a
>>> >> > > > > > > >> command
>>> >> > > > > > > >> >> to reserver the same
seat. The event log indicates
>>> that
>>> >> the
>>> >> > > > seat
>>> >> > > > > is
>>> >> > > > > > > >> >> available, so each handler
will append a reservation
>>> >> event
>>> >> > –
>>> >> > > > thus
>>> >> > > > > > > >> >> double-booking that
seat!
>>> >> > > > > > > >> >>
>>> >> > > > > > > >> >> I see three ways around
that issue:
>>> >> > > > > > > >> >> 1. Don't use Kafka for
this.
>>> >> > > > > > > >> >> 2. Force a singler producer
for a given flight. This
>>> will
>>> >> > > > impact
>>> >> > > > > > > >> >> availability and make
routing more complex.
>>> >> > > > > > > >> >> 3. Have a way to do
optimistic locking in Kafka.
>>> >> > > > > > > >> >>
>>> >> > > > > > > >> >> The latter idea would
work either on a per-key basis
>>> or
>>> >> > > > globally
>>> >> > > > > > for
>>> >> > > > > > > a
>>> >> > > > > > > >> >> partition: when appending
to a partition, the
>>> producer
>>> >> > would
>>> >> > > > > > > indicate in
>>> >> > > > > > > >> >> its request that the
request should be rejected
>>> unless
>>> >> the
>>> >> > > > > current
>>> >> > > > > > > >> offset
>>> >> > > > > > > >> >> of the partition is
equal to x. For the per-key
>>> setup,
>>> >> > Kafka
>>> >> > > > > > brokers
>>> >> > > > > > > >> would
>>> >> > > > > > > >> >> track the offset of
the latest message for each
>>> unique
>>> >> key,
>>> >> > > if
>>> >> > > > so
>>> >> > > > > > > >> >> configured. This would
allow the request to specify
>>> that
>>> >> it
>>> >> > > > > should
>>> >> > > > > > be
>>> >> > > > > > > >> >> rejected if the offset
for key k is not equal to x.
>>> >> > > > > > > >> >>
>>> >> > > > > > > >> >> This way, only one of
the command handlers would
>>> succeed
>>> >> in
>>> >> > > > > writing
>>> >> > > > > > > to
>>> >> > > > > > > >> >> Kafka, thus ensuring
consistency.
>>> >> > > > > > > >> >>
>>> >> > > > > > > >> >> There are different
levels of complexity associated
>>> with
>>> >> > > > > > implementing
>>> >> > > > > > > >> this
>>> >> > > > > > > >> >> in Kafka depending on
whether the feature would work
>>> >> > > > > per-partition
>>> >> > > > > > or
>>> >> > > > > > > >> >> per-key:
>>> >> > > > > > > >> >> * For the per-partition
optimistic locking, the
>>> broker
>>> >> > would
>>> >> > > > just
>>> >> > > > > > > need
>>> >> > > > > > > >> to
>>> >> > > > > > > >> >> keep track of the high
water mark for each partition
>>> and
>>> >> > > reject
>>> >> > > > > > > >> conditional
>>> >> > > > > > > >> >> requests when the offset
doesn't match.
>>> >> > > > > > > >> >> * For per-key locking,
the broker would need to
>>> maintain
>>> >> an
>>> >> > > > > > in-memory
>>> >> > > > > > > >> table
>>> >> > > > > > > >> >> mapping keys to the
offset of the last message with
>>> that
>>> >> > key.
>>> >> > > > > This
>>> >> > > > > > > >> should
>>> >> > > > > > > >> >> be fairly easy to maintain
and recreate from the log
>>> if
>>> >> > > > > necessary.
>>> >> > > > > > It
>>> >> > > > > > > >> could
>>> >> > > > > > > >> >> also be saved to disk
as a snapshot from time to
>>> time in
>>> >> > > order
>>> >> > > > to
>>> >> > > > > > cut
>>> >> > > > > > > >> down
>>> >> > > > > > > >> >> the time needed to recreate
the table on restart.
>>> >> There's a
>>> >> > > > small
>>> >> > > > > > > >> >> performance penalty
associated with this, but it
>>> could be
>>> >> > > > opt-in
>>> >> > > > > > for
>>> >> > > > > > > a
>>> >> > > > > > > >> >> topic.
>>> >> > > > > > > >> >>
>>> >> > > > > > > >> >> Am I the only one thinking
about using Kafka like
>>> this?
>>> >> > Would
>>> >> > > > > this
>>> >> > > > > > > be a
>>> >> > > > > > > >> >> nice feature to have?
>>> >> > > > > > > >>
>>> >> > > > > > >
>>> >> > > > > >
>>> >> > > > >
>>> >> > > >
>>> >> > > >
>>> >> > > >
>>> >> > > > --
>>> >> > > > Thanks,
>>> >> > > > Ewen
>>> >> > > >
>>> >> > >
>>> >> >
>>> >> >
>>> >> >
>>> >> > --
>>> >> > Thanks,
>>> >> > Ewen
>>> >> >
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Thanks,
>>> > Ewen
>>>
>>

Mime
View raw message