kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Rader <jrader...@gmail.com>
Subject Re: Database Replication Question
Date Wed, 04 Mar 2015 13:18:02 GMT
Thanks everyone for your responses!  These are great.  It seems our cases
matches closest to Jay's recommendations.

The one part that sounds a little tricky is point #5 'Include in each
message the database's transaction id, scn, or other identifier '.  This is
pretty straightforward with the RDBMS case that I mentioned, but I could
see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
which might not always have a readily available monotonic id, particularly
in failover scenarios.  I guess in that case we can think about creating
this id ourselves from the single producer.

Xiao,

I think in the Kafka failover cases you mention if we also store the offset
with replicated data we should be able to pick up where we left off since
we are using the low level consumer.  Maybe I am missing your point
though...

Guozhang,

Very good point that we didn't think of.  We will need to think this
through, as you say avoid resending other messages in a batch if one is
failed.  I wonder if we might also manage this on the consumer side too
with idempotency.  Thanks for raising this!

Josh



On Tue, Mar 3, 2015 at 6:08 PM, Xiao <lixiao1983@gmail.com> wrote:

> Hey Josh,
>
> Sorry, after reading codes, Kafka did fsync the data using a separate
> thread. The recovery point (oldest transaction timestamp) can be got from
> the file recovery-point-offset-checkpoint.
>
> You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if you
> think the speed is not quick enough. When the workloads is huge, the
> bottleneck could be in your target side or source side. That means, your
> apply could have enough jobs to do.
>
> Basically, you need to keep reading this file for determining the oldest
> timestamps of all relevant partitions. Then, apply the transactions until
> that timestamp.
>
> Note, this does not protect the transaction consistency. This is just for
> ensuring the data at the target side is consistent at one timestamp when
> you have multiple channel to send data changes. The implementation should
> be simple if you can understand the concepts. I am unable to find the filed
> patent application about it. This is one related paper. It covers the main
> concepts about the issues you are facing. "Inter-Data-Center Large-Scale
> Database Replication Optimization – A Workload Driven Partitioning Approach"
>
> Hopefully, you understood what I explained above.
>
> Best wishes,
>
> Xiao Li
>
> Best wishes,
>
> Xiao Li
>
> On Mar 3, 2015, at 4:23 PM, Xiao <lixiao1983@gmail.com> wrote:
>
> > Hey Josh,
> >
> > If you put different tables into different partitions or topics, it
> might break transaction ACID at the target side. This is risky for some use
> cases. Besides unit of work issues, you also need to think about the load
> balancing too.
> >
> > For failover, you have to find the timestamp for point-in-time
> consistency. This part is tricky. You have to ensure all the changes before
> a specific timestamp have been flushed to the disk. Normally, you can
> maintain a bookmark for different partition at the target side to know what
> is the oldest transactions have been flushed to the disk. Unfortunately,
> based on my understanding, Kafka is unable to do it because it does not do
> fsync regularly for achieving better throughput.
> >
> > Best wishes,
> >
> > Xiao Li
> >
> >
> > On Mar 3, 2015, at 3:45 PM, Xiao <lixiao1983@gmail.com> wrote:
> >
> >> Hey Josh,
> >>
> >> Transactions can be applied in parallel in the consumer side based on
> transaction dependency checking.
> >>
> >> http://www.google.com.ar/patents/US20080163222
> >>
> >> This patent documents how it work. It is easy to understand, however,
> you also need to consider the hash collision issues. This has been
> implemented in IBM Q Replication since 2001.
> >>
> >> Thanks,
> >>
> >> Xiao Li
> >>
> >>
> >> On Mar 3, 2015, at 3:36 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
> >>
> >>> Hey Josh,
> >>>
> >>> As you say, ordering is per partition. Technically it is generally
> possible
> >>> to publish all changes to a database to a single partition--generally
> the
> >>> kafka partition should be high throughput enough to keep up. However
> there
> >>> are a couple of downsides to this:
> >>> 1. Consumer parallelism is limited to one. If you want a total order
> to the
> >>> consumption of messages you need to have just 1 process, but often you
> >>> would want to parallelize.
> >>> 2. Often what people want is not a full stream of all changes in all
> tables
> >>> in a database but rather the changes to a particular table.
> >>>
> >>> To some extent the best way to do this depends on what you will do
> with the
> >>> data. However if you intend to have lots
> >>>
> >>> I have seen pretty much every variation on this in the wild, and here
> is
> >>> what I would recommend:
> >>> 1. Have a single publisher process that publishes events into Kafka
> >>> 2. If possible use the database log to get these changes (e.g. mysql
> >>> binlog, Oracle xstreams, golden gate, etc). This will be more complete
> and
> >>> more efficient than polling for changes, though that can work too.
> >>> 3. Publish each table to its own topic.
> >>> 4. Partition each topic by the primary key of the table.
> >>> 5. Include in each message the database's transaction id, scn, or other
> >>> identifier that gives the total order within the record stream. Since
> there
> >>> is a single publisher this id will be monotonic within each partition.
> >>>
> >>> This seems to be the best set of tradeoffs for most use cases:
> >>> - You can have parallel consumers up to the number of partitions you
> chose
> >>> that still get messages in order per ID'd entity.
> >>> - You can subscribe to just one table if you like, or to multiple
> tables.
> >>> - Consumers who need a total order over all updates can do a "merge"
> across
> >>> the partitions to reassemble the fully ordered set of changes across
> all
> >>> tables/partitions.
> >>>
> >>> One thing to note is that the requirement of having a single consumer
> >>> process/thread to get the total order isn't really so much a Kafka
> >>> restriction as it just is a restriction about the world, since if you
> had
> >>> multiple threads even if you delivered messages to them in order their
> >>> processing might happen out of order (just do to the random timing of
> the
> >>> processing).
> >>>
> >>> -Jay
> >>>
> >>>
> >>>
> >>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader <jrader940@gmail.com>
> wrote:
> >>>
> >>>> Hi Kafka Experts,
> >>>>
> >>>>
> >>>>
> >>>> We have a use case around RDBMS replication where we are investigating
> >>>> Kafka.  In this case ordering is very important.  Our understanding
is
> >>>> ordering is only preserved within a single partition.  This makes
> sense as
> >>>> a single thread will consume these messages, but our question is can
> we
> >>>> somehow parallelize this for better performance?   Is there maybe some
> >>>> partition key strategy trick to have your cake and eat it too in
> terms of
> >>>> keeping ordering, but also able to parallelize the processing?
> >>>>
> >>>>
> >>>>
> >>>> I am sorry if this has already been asked, but we tried to search
> through
> >>>> the archives and couldn’t find this response.
> >>>>
> >>>>
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Josh
> >>>>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message