kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Hodges <berkeleybob2...@gmail.com>
Subject Re: Hadoop Summit Meetups
Date Tue, 10 Jun 2014 06:38:55 GMT
Thanks Neha.  I am looking at the API call you recommended.

Cheers, Robert


On Mon, Jun 9, 2014 at 12:42 PM, Neha Narkhede <neha.narkhede@gmail.com>
wrote:

> Is there a convenient way to fetch the last message posted on a particular
> topic across all partitions?
>
> Not really, unless the message itself has some sort of a timestamp. Even
> then, the order that the broker applies to the log is only guaranteed per
> partition per client. So it is tricky to know the last written message to a
> topic. You can try to find the last message per partition (using the
> getOffsetsBefore API).
>
> Thanks,
> Neha
>
>
> On Mon, Jun 9, 2014 at 8:55 AM, Robert Hodges <berkeleybob2105@gmail.com>
> wrote:
>
> > Hi Gouzhang,
> >
> > Thanks for the response.  Answers interpolated below.
> >
> > Cheers, Robert
> >
> > On Mon, Jun 9, 2014 at 8:15 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Robert,
> > >
> > > Thanks for the description. Just want to clarify on some of the points
> > > (assuming one transaction may include multiple messages below):
> > >
> > > 2) For the "one-to-one mapping" to work, does the consumer can only
> read
> > at
> > > transaction boundaries, i.e., all or none messages are returned to the
> > > consumer of a single transaction at once; or it is sufficient to let
> > > consumers just read committed messages? For the use case you described
> it
> > > seems the second option is good enough.
> > >
> >
> > Consumers just read committed messages from Kafka itself.  Application
> > transactions could be layered on top using the message key, since such
> > transactions might consist of multiple Kafka messages.  It's up to the
> > consumer to avoid committing a partial transaction.
> >
> > >
> > > 4) If an upstream data source / producer has failed and lost some
> > committed
> > > transactions, and then on restart regenerates them, since the
> transaction
> > > has been previously committed the downstream consumer may have already
> > > consumed their messages, and regenerating the transaction will
> inevitably
> > > result in duplicates. Is that OK for your case?
> > >
> > > Assuming it is possible to regenerate upstream transactions, downstream
> > consumers should do one of two things:
> >
> > a.) For non-idempotent consumers:  Remember the last committed
> application
> > transaction and ignore anything before that point.
> > b.) For idempotent consumers:  Just repeat them.
> >
> > The uglier problem is what to do when the logs diverge because the
> upstream
> > server cannot regenerate data.  In this case you start by hoping the
> > consumer is something like Hadoop that easily tolerates inconsistencies
> in
> > data. Things may go downhill quickly if the consumer is an RDBMS. :(
> >
> > Is there a convenient way to fetch the last message posted on a
> particular
> > topic across all partitions?  (My laptop currently is about 120 miles
> away
> > so it's hard to look.) If so, it looks to me as if there is enough in the
> > Kafka producer and consumer APIs to implement what I am describing
> without
> > too many holes. I believe the trick is to design a message key that
> > contains a monotonically increasing transaction ID with a fragment index
> to
> > allow transactions to span Kafka messages but keep all of them (for
> > example) in a single partition.
> >
> > If I have time next weekend I might try to create an example of this to
> see
> > what problems pop up.
> >
> > Cheers, Robert
> >
> >
> > > Thanks,
> > > Guozhang
> > >
> > >
> > > On Sat, Jun 7, 2014 at 11:30 PM, Robert Hodges <
> > berkeleybob2105@gmail.com>
> > > wrote:
> > >
> > > > Hi Jonathan and Jun,
> > > >
> > > > Transactional replication using Kafka between stores at either end is
> > an
> > > > interesting topic. I have some experience with this problem in
> database
> > > > replication products.
> > > >
> > > > To understand how to implement it properly in Kafka it would help to
> > > define
> > > > Jonathan's use case more formally.  As I read the description there
> are
> > > > three parts: a source DBMS, Kafka, and an analytics store.  These can
> > be
> > > > arranged as follows:
> > > >
> > > > Producer Store -> Kafka -> Consumer Store
> > > >
> > > > e.g.:
> > > >
> > > > MySQL -> Kafka -> Spark over HDFS
> > > >
> > > > This is like the usual producer/consumer model except that the
> > semantics
> > > > are as follows.  I added some details to the description to
> > accommodate a
> > > > number of practical problems that occur in replication topologies of
> > this
> > > > kind.
> > > >
> > > > 1.) The producer and consumer in the topology are stores with state
> and
> > > > some notion of a transaction that changes the state of the store to
> > which
> > > > they are applied.  Kafka is in the middle and also has transactions,
> > > namely
> > > > to produce and consume messages.
> > > >
> > > > 2.) If a transaction executes on the producer store, you would like
> to
> > > > execute a corresponding transaction on the consumer store.  The
> > > transaction
> > > > might not have the same effect downstream but the point is that
> > > > transactions are linked one-to-one between producer and consumer.
> > > >
> > > > 3.) All of the stores or Kafka can fail independently and at any
> time.
> > >  It
> > > > must be possible to recover and continue once a failed component
> > > restarts.
> > > >
> > > > 4.) It is possible to have failures where a store or Kafka itself
> loses
> > > > committed state and reverts to an earlier state.  This happens in
> MySQL
> > > for
> > > > example, when a host crashes before data are properly committed to
> > InnoDB
> > > > and/or the MySQL binlog. It can also happen if the upstream DBMS is
> > > > restored from a backup or as a result of cluster failover with data
> > loss.
> > > >  In this case you either want to regenerate lost transactions or (if
> it
> > > is
> > > > hopeless) fail cleanly.
> > > >
> > > > 5.) Producer transactions might be larger than a single Kafka message
> > > (e.g.
> > > > a KeyedMessage). They may not even fit into a single JVM's memory.
> >  This
> > > > can occur for example if you do a bulk load or an administrative
> > > operation
> > > > on a large table in the producer store.  You might not have this
> > problem
> > > > now but given your requirement to work with a range of stores it
> seems
> > > > likely to occur sooner rather than later. Such transactions must be
> > > broken
> > > > into a stream of smaller messages with a protocol to identify that
> they
> > > > belong to a single transaction. If there are failures such fragmented
> > > > transactions must not result in partial transactions being applied to
> > the
> > > > consumer.
> > > >
> > > > 6.) All of the preceding requirements should be met with minimal
> impact
> > > on
> > > > message throughput or transaction rates within stores at either end.
> > > >
> > > > Let me know if this is more than what you (Jonathan) intended.
>  Usually
> > > if
> > > > you really want #2, requirements #3-6 follow automatically.  #5 is
> > > > potentially a source of much pain if not addressed early on.
> > > >
> > > > Pending a response, I would just say solutions that require a
> > > transactional
> > > > commit across two stores are difficult to write, often have
> performance
> > > > downsides, and handle failures poorly because they cannot cover all
> the
> > > > corner cases.  The last point means they tend to drop data, generate
> > > > unmatched transactions (orphans), or send things multiple times
> > depending
> > > > on the failure.
> > > >
> > > > It's generally better to design systems that use a sliding window
> > > protocol
> > > > where a commit in the producer triggers a commit to Kafka triggers a
> > > commit
> > > > to the consumer. Assuming your requirements are as stated above the
> > > > question is how to design a transactional sliding window protocol
> that
> > > > works on Kafka.
> > > >
> > > > Cheers, Robert Hodges
> > > >
> > > >
> > > > On Thu, Jun 5, 2014 at 7:48 AM, Jun Rao <junrao@gmail.com> wrote:
> > > >
> > > > > It sounds like that you want to write to a data store and a data
> pipe
> > > > > atomically. Since both the data store and the data pipe that you
> want
> > > to
> > > > > use are highly available, the only case that you want to protect
is
> > the
> > > > > client failing btw the two writes. One way to do that is to let the
> > > > client
> > > > > publish to Kafka first with the strongest ack. Then, run a few
> > > consumers
> > > > to
> > > > > read data from Kafka and then write the data to the data store. Any
> > one
> > > > of
> > > > > those consumers can die and the work will be automatically picked
> up
> > by
> > > > the
> > > > > remaining ones. You can use partition id and the offset of each
> > message
> > > > as
> > > > > its UUID if needed.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Wed, Jun 4, 2014 at 10:56 AM, Jonathan Hodges <
> hodgesz@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Sorry didn't realize the mailing list wasn't copied...
> > > > > >
> > > > > >
> > > > > > ---------- Forwarded message ----------
> > > > > > From: Jonathan Hodges <hodgesz@gmail.com>
> > > > > > Date: Wed, Jun 4, 2014 at 10:56 AM
> > > > > > Subject: Re: Hadoop Summit Meetups
> > > > > > To: Neha Narkhede <neha.narkhede@gmail.com>
> > > > > >
> > > > > >
> > > > > > We have a number of customer facing online learning applications.
> > > >  These
> > > > > > applications are using heterogeneous technologies with different
> > data
> > > > > > models in underlying data stores such as RDBMS, Cassandra,
> MongoDB,
> > > > etc.
> > > > > >  We would like to run offline analysis on the data contained
in
> > these
> > > > > > learning applications with tools like Hadoop and Spark.
> > > > > >
> > > > > > One thought is to use Kafka as a way for these learning
> > applications
> > > to
> > > > > > emit data in near real-time for analytics.  We developed a common
> > > model
> > > > > > represented as Avro records in HDFS that spans these learning
> > > > > applications
> > > > > > so that we can accept the same structured message from them.
>  This
> > > > allows
> > > > > > for comparing apples to apples across these apps as opposed
to
> > messy
> > > > > > transformations.
> > > > > >
> > > > > > So this all sounds good until you dig into the details.  One
> > pattern
> > > is
> > > > > for
> > > > > > these applications to update state locally in their data stores
> > first
> > > > and
> > > > > > then publish to Kafka.  The problem with this is these two
> > operations
> > > > > > aren't atomic so the local persist can succeed and the publish
to
> > > Kafka
> > > > > > fail leaving the application and HDFS out of sync.  You can
try
> to
> > > add
> > > > > some
> > > > > > retry logic to the clients, but this quickly becomes very
> > complicated
> > > > and
> > > > > > still doesn't solve the underlying problem.
> > > > > >
> > > > > > Another pattern is to publish to Kafka first with -1 and wait
for
> > the
> > > > ack
> > > > > > from leader and replicas before persisting locally.  This is
> > probably
> > > > > > better than the other pattern but does add some complexity to
the
> > > > client.
> > > > > >  The clients must now generate unique entity IDs/UUID for
> > persistence
> > > > > when
> > > > > > they typically rely on the data store for creating these.  Also
> the
> > > > > publish
> > > > > > to Kafka can succeed and persist locally can fail leaving the
> > stores
> > > > out
> > > > > of
> > > > > > sync.  In this case the learning application needs to determine
> how
> > > to
> > > > > get
> > > > > > itself in sync.  It can rely on getting this back from Kafka,
but
> > it
> > > is
> > > > > > possible the local store failure can't be fixed in a timely
> manner
> > > e.g.
> > > > > > hardware failure, constraint, etc.  In this case the application
> > > needs
> > > > to
> > > > > > show an error to the user and likely need to do something like
> > send a
> > > > > > delete message to Kafka to remove the earlier published message.
> > > > > >
> > > > > > A third last resort pattern might be go the CDC route with
> > something
> > > > like
> > > > > > Databus.  This would require implementing additional fetchers
and
> > > > relays
> > > > > to
> > > > > > support Cassandra and MongoDB.  Also the data will need to be
> > > > transformed
> > > > > > on the Hadoop/Spark side for virtually every learning application
> > > since
> > > > > > they have different data models.
> > > > > >
> > > > > > I hope this gives enough detail to start discussing transactional
> > > > > messaging
> > > > > > in Kafka.  We are willing to help in this effort if it makes
> sense
> > > for
> > > > > our
> > > > > > use cases.
> > > > > >
> > > > > > Thanks
> > > > > > Jonathan
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Jun 4, 2014 at 9:44 AM, Neha Narkhede <
> > > neha.narkhede@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > If you are comfortable, share it on the mailing list. If
not,
> I'm
> > > > happy
> > > > > > to
> > > > > > > have this discussion privately.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > > On Jun 4, 2014 9:42 AM, "Neha Narkhede" <
> neha.narkhede@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Glad it was useful. It will be great if you can share
your
> > > > > requirements
> > > > > > >> on atomicity. A couple of us are very interested in
thinking
> > about
> > > > > > >> transactional messaging in Kafka.
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Neha
> > > > > > >> On Jun 4, 2014 6:57 AM, "Jonathan Hodges" <hodgesz@gmail.com>
> > > > wrote:
> > > > > > >>
> > > > > > >>> Hi Neha,
> > > > > > >>>
> > > > > > >>> Thanks so much to you and the Kafka team for putting
together
> > the
> > > > > > meetup.
> > > > > > >>>  It was very nice and gave people from out of town
like us
> the
> > > > > ability
> > > > > > to
> > > > > > >>> join in person.
> > > > > > >>>
> > > > > > >>> We are the guys from Pearson Education and we talked
a little
> > > about
> > > > > > >>> supplying some details on some of our use cases
with respect
> to
> > > > > > atomicity
> > > > > > >>> of source systems eventing data and persisting
locally.
>  Should
> > > we
> > > > > just
> > > > > > >>> post to the list or is there somewhere else we
should send
> > these
> > > > > > details?
> > > > > > >>>
> > > > > > >>> Thanks again!
> > > > > > >>> Jonathan
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On Fri, Apr 11, 2014 at 9:31 AM, Neha Narkhede
<
> > > > > > neha.narkhede@gmail.com>
> > > > > > >>> wrote:
> > > > > > >>>
> > > > > > >>> > Yes, that's a great idea. I can help organize
the meetup at
> > > > > LinkedIn.
> > > > > > >>> >
> > > > > > >>> > Thanks,
> > > > > > >>> > Neha
> > > > > > >>> >
> > > > > > >>> >
> > > > > > >>> > On Fri, Apr 11, 2014 at 8:44 AM, Saurabh Agarwal
> (BLOOMBERG/
> > > 731
> > > > > > >>> LEXIN) <
> > > > > > >>> > sagarwal144@bloomberg.net> wrote:
> > > > > > >>> >
> > > > > > >>> > > great idea. I am interested in attending
as well....
> > > > > > >>> > >
> > > > > > >>> > > ----- Original Message -----
> > > > > > >>> > > From: users@kafka.apache.org
> > > > > > >>> > > To: users@kafka.apache.org
> > > > > > >>> > > At: Apr 11 2014 11:40:56
> > > > > > >>> > >
> > > > > > >>> > > With the Hadoop Summit in San Jose 6/3
- 6/5 I wondered
> if
> > > any
> > > > of
> > > > > > the
> > > > > > >>> > > LinkedIn geniuses were thinking of putting
together a
> > meet-up
> > > > on
> > > > > > any
> > > > > > >>> of
> > > > > > >>> > the
> > > > > > >>> > > associated technologies like Kafka, Samza,
Databus, etc.
> >  For
> > > > us
> > > > > > poor
> > > > > > >>> > souls
> > > > > > >>> > > that don't live on the West Coast it
was a great
> experience
> > > > > > >>> attending the
> > > > > > >>> > > Kafka meetup last year.
> > > > > > >>> > >
> > > > > > >>> > > Jonathan
> > > > > > >>> > >
> > > > > > >>> > >
> > > > > > >>> > >
> > > > > > >>> > >
> > > > > > >>> > >
> > > > > > >>> >
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------
> > > > > > >>> > >
> > > > > > >>> >
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message