kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Morin <st...@stevemorin.com>
Subject Re: New Producer - ONLY sync mode?
Date Mon, 09 Feb 2015 16:22:16 GMT
Jay,
Thanks I'll look at that more closely.

On Sat, Feb 7, 2015 at 1:23 PM, Jay Kreps <jay.kreps@gmail.com> wrote:

> Steve
>
> In terms of mimicing the sync behavior, I think that is what .get() does,
> no?
>
> We are always returning the offset and error information. The example I
> gave didn't make use of it, but you definitely can make use of it if you
> want to.
>
> -Jay
>
> On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin <steve@stevemorin.com> wrote:
>
> > Looking at this thread I would ideally want something at least the right
> > recipe to mimic sync behavior like Otis is talking about.
> >
> > In the second case, would like to be able to individually know if
> messages
> > have failed even regardless if they are in separate batches, sort of like
> > what Kinesis does as Pradeep mentioned.
> > -Steve
> >
> > On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps <jay.kreps@gmail.com> wrote:
> >
> > > Yeah totally. Using a callback is, of course, the Right Thing for this
> > kind
> > > of stuff. But I have found that kind of asynchronous thinking can be
> hard
> > > for people. Even if you get out of the pre-java 8 syntactic pain that
> > > anonymous inner classes inflict just dealing with multiple threads of
> > > control without creating async spaghetti can be a challenge for complex
> > > stuff. That is really the only reason for the futures in the api, they
> > are
> > > strictly less powerful than the callbacks, but at least using them you
> > can
> > > just call .get() and pretend it is blocking.
> > >
> > > -Jay
> > >
> > > On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <joe.stein@stealth.ly>
> wrote:
> > >
> > > > Now that 0.8.2.0 is in the wild I look forward to working with more
> and
> > > > seeing what folks start to-do with this function
> > > >
> > > >
> > >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > > > ,
> > > > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > > > blocking.
> > > >
> > > > One sprint I know of coming up is going to have the new producer as a
> > > > component in their reactive calls and handling bookkeeping and
> retries
> > > > through that type of call back approach. Should work well (haven't
> > tried
> > > > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc,
> > etc
> > > in
> > > > functional languages and frameworks.
> > > >
> > > > I think as JDK 8 starts to get out in the wild too more (may after
> jdk7
> > > > eol) the use of .get will be reduced (imho) and folks will be
> thinking
> > > more
> > > > about non-blocking vs blocking and not as so much sync vs async but
> my
> > > > crystal ball just back from the shop so well see =8^)
> > > >
> > > > /*******************************************
> > > >  Joe Stein
> > > >  Founder, Principal Consultant
> > > >  Big Data Open Source Security LLC
> > > >  http://www.stealth.ly
> > > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > ********************************************/
> > > >
> > > > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <jay.kreps@gmail.com>
> > wrote:
> > > >
> > > > > Hey guys,
> > > > >
> > > > > I guess the question is whether it really matters how many
> underlying
> > > > > network requests occur? It is very hard for an application to
> depend
> > on
> > > > > this even in the old producer since it depends on the partitions
> > > > placement
> > > > > (a send to two partitions may go to either one machine or two and
> so
> > it
> > > > > will send either one or two requests). So when you send a batch in
> > one
> > > > call
> > > > > you may feel that is "all at once", but that is only actually
> > > guaranteed
> > > > if
> > > > > all messages have the same partition.
> > > > >
> > > > > The challenge is allowing even this in the presence of bounded
> > request
> > > > > sizes which we have in the new producer. The user sends a list of
> > > objects
> > > > > and the serialized size that will result is not very apparent to
> > them.
> > > If
> > > > > you break it up into multiple requests then that is kind of further
> > > > ruining
> > > > > the illusion of a single send. If you don't then you have to just
> > error
> > > > out
> > > > > which is equally annoying to have to handle.
> > > > >
> > > > > But I'm not sure if from your description you are saying you
> actually
> > > > care
> > > > > how many physical requests are issued. I think it is more like it
> is
> > > just
> > > > > syntactically annoying to send a batch of data now because it
> needs a
> > > for
> > > > > loop.
> > > > >
> > > > > Currently to do this you would do:
> > > > >
> > > > > List responses = new ArrayList();
> > > > > for(input: recordBatch)
> > > > >     responses.add(producer.send(input));
> > > > > for(response: responses)
> > > > >     response.get
> > > > >
> > > > > If you don't depend on the offset/error info we could add a flush
> > call
> > > so
> > > > > you could instead do
> > > > > for(input: recordBatch)
> > > > >     producer.send(input);
> > > > > producer.flush();
> > > > >
> > > > > But if you do want the error/offset then you are going to be back
> to
> > > the
> > > > > original case.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <
> gshapira@cloudera.com>
> > > > > wrote:
> > > > >
> > > > > > I've been thinking about that too, since both Flume and Sqoop
> rely
> > on
> > > > > > send(List) API of the old API.
> > > > > >
> > > > > > I'd like to see this API come back, but I'm debating how we'd
> > handle
> > > > > > errors. IIRC, the old API would fail an entire batch on a single
> > > > > > error, which can lead to duplicates. Having N callbacks lets
me
> > retry
> > > > > > / save / whatever just the messages that had issues.
> > > > > >
> > > > > > If messages had identifiers from the producer side, we could
have
> > the
> > > > > > API call the callback with a list of message-ids and their
> status.
> > > But
> > > > > > they don't :)
> > > > > >
> > > > > > Any thoughts on how you'd like it to work?
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <
> > > > pradeepg26@gmail.com>
> > > > > > wrote:
> > > > > > > This is a great question Otis. Like Gwen said, you can
> accomplish
> > > > Sync
> > > > > > mode
> > > > > > > by setting the batch size to 1. But this does highlight
a
> > > shortcoming
> > > > > of
> > > > > > > the new producer API.
> > > > > > >
> > > > > > > I really like the design of the new API and it has really
great
> > > > > > properties
> > > > > > > and I'm enjoying working with it. However, once API that
I
> think
> > > > we're
> > > > > > > lacking is a "batch" API. Currently, I have to iterate
over a
> > batch
> > > > and
> > > > > > > call .send() on each record, which returns n callbacks
instead
> > of 1
> > > > > > > callback for the whole batch. This significantly complicates
> > > recovery
> > > > > > logic
> > > > > > > where we need to commit a batch as opposed 1 record at
a time.
> > > > > > >
> > > > > > > Do you guys have any plans to add better semantics around
> > batches?
> > > > > > >
> > > > > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <
> > > gshapira@cloudera.com>
> > > > > > wrote:
> > > > > > >
> > > > > > >> If I understood the code and Jay correctly - if you
wait for
> the
> > > > > > >> future it will be a similar delay to that of the old
sync
> > > producer.
> > > > > > >>
> > > > > > >> Put another way, if you test it out and see longer
delays than
> > the
> > > > > > >> sync producer had, we need to find out why and fix
it.
> > > > > > >>
> > > > > > >> Gwen
> > > > > > >>
> > > > > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > > > > > >> <otis.gospodnetic@gmail.com> wrote:
> > > > > > >> > Hi,
> > > > > > >> >
> > > > > > >> > Nope, unfortunately it can't do that.  X is a
remote app,
> > > doesn't
> > > > > > listen
> > > > > > >> to
> > > > > > >> > anything external, calls Y via HTTPS.  So X has
to decide
> what
> > > to
> > > > do
> > > > > > with
> > > > > > >> > its data based on Y's synchronous response.  It
has to block
> > > > until Y
> > > > > > >> > responds.  And it wouldn't be pretty, I think,
because
> nobody
> > > > wants
> > > > > to
> > > > > > >> run
> > > > > > >> > apps that talk to remove servers and hang on to
connections
> > more
> > > > > than
> > > > > > >> they
> > > > > > >> > have to.  But perhaps that is the only way?  Or
maybe the
> > answer
> > > > to
> > > > > > "I'm
> > > > > > >> > guessing the delay would be more or less the same
as if the
> > > > Producer
> > > > > > was
> > > > > > >> > using SYNC mode?" is YES, in which case the connection
from
> X
> > > to Y
> > > > > > would
> > > > > > >> be
> > > > > > >> > open for just as long as with a SYNC producer
running in Y?
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> > Otis
> > > > > > >> > --
> > > > > > >> > Monitoring * Alerting * Anomaly Detection * Centralized
Log
> > > > > Management
> > > > > > >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <
> > > > gshapira@cloudera.com
> > > > > >
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> >> Can Y have a callback that will handle the
notification to
> X?
> > > > > > >> >> In this case, perhaps Y can be async and X
can buffer the
> > data
> > > > > until
> > > > > > >> >> the callback triggers and says "all good"
(or resend if the
> > > > > callback
> > > > > > >> >> indicates an error)
> > > > > > >> >>
> > > > > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > > > > > >> >> <otis.gospodnetic@gmail.com> wrote:
> > > > > > >> >> > Hi,
> > > > > > >> >> >
> > > > > > >> >> > Thanks for the info.  Here's the use
case.  We have
> > something
> > > > up
> > > > > > >> stream
> > > > > > >> >> > sending data, say a log shipper called
X.  It sends it to
> > > some
> > > > > > remote
> > > > > > >> >> > component Y.  Y is the Kafka Producer
and it puts data
> into
> > > > > Kafka.
> > > > > > >> But Y
> > > > > > >> >> > needs to send a reply to X and tell it
whether it
> > > successfully
> > > > > put
> > > > > > all
> > > > > > >> >> its
> > > > > > >> >> > data into Kafka.  If it did not, Y wants
to tell X to
> > buffer
> > > > data
> > > > > > >> locally
> > > > > > >> >> > and resend it later.
> > > > > > >> >> >
> > > > > > >> >> > If producer is ONLY async, Y can't easily
do that.  Or
> > maybe
> > > Y
> > > > > > would
> > > > > > >> just
> > > > > > >> >> > need to wait for the Future to come back
and only then
> send
> > > the
> > > > > > >> response
> > > > > > >> >> > back to X?  If so, I'm guessing the delay
would be more
> or
> > > less
> > > > > the
> > > > > > >> same
> > > > > > >> >> as
> > > > > > >> >> > if the Producer was using SYNC mode?
> > > > > > >> >> >
> > > > > > >> >> > Thanks,
> > > > > > >> >> > Otis
> > > > > > >> >> > --
> > > > > > >> >> > Monitoring * Alerting * Anomaly Detection
* Centralized
> Log
> > > > > > Management
> > > > > > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > > >> >> >
> > > > > > >> >> >
> > > > > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps
<
> > > jay.kreps@gmail.com
> > > > >
> > > > > > >> wrote:
> > > > > > >> >> >
> > > > > > >> >> >> Yeah as Gwen says there is no sync/async
mode anymore.
> > There
> > > > is
> > > > > a
> > > > > > new
> > > > > > >> >> >> configuration which does a lot of
what async did in
> terms
> > of
> > > > > > allowing
> > > > > > >> >> >> batching:
> > > > > > >> >> >>
> > > > > > >> >> >> batch.size - This is the target amount
of data per
> > partition
> > > > the
> > > > > > >> server
> > > > > > >> >> >> will attempt to batch together.
> > > > > > >> >> >> linger.ms - This is the time the
producer will wait for
> > > more
> > > > > data
> > > > > > >> to be
> > > > > > >> >> >> sent to better batch up writes. The
default is 0 (send
> > > > > > immediately).
> > > > > > >> So
> > > > > > >> >> if
> > > > > > >> >> >> you set this to 50 ms the client
will send immediately
> if
> > it
> > > > has
> > > > > > >> already
> > > > > > >> >> >> filled up its batch, otherwise it
will wait to
> accumulate
> > > the
> > > > > > number
> > > > > > >> of
> > > > > > >> >> >> bytes given by batch.size.
> > > > > > >> >> >>
> > > > > > >> >> >> To send asynchronously you do
> > > > > > >> >> >>    producer.send(record)
> > > > > > >> >> >> whereas to block on a response you
do
> > > > > > >> >> >>    producer.send(record).get();
> > > > > > >> >> >> which will wait for acknowledgement
from the server.
> > > > > > >> >> >>
> > > > > > >> >> >> One advantage of this model is that
the client will do
> > it's
> > > > best
> > > > > > to
> > > > > > >> >> batch
> > > > > > >> >> >> under the covers even if linger.ms=0.
It will do this
> by
> > > > > batching
> > > > > > >> any
> > > > > > >> >> data
> > > > > > >> >> >> that arrives while another send is
in progress into a
> > single
> > > > > > >> >> >> request--giving a kind of "group
commit" effect.
> > > > > > >> >> >>
> > > > > > >> >> >> The hope is that this will be both
simpler to understand
> > (a
> > > > > single
> > > > > > >> api
> > > > > > >> >> that
> > > > > > >> >> >> always works the same) and more powerful
(you always
> get a
> > > > > > response
> > > > > > >> with
> > > > > > >> >> >> error and offset information whether
or not you choose
> to
> > > use
> > > > > it).
> > > > > > >> >> >>
> > > > > > >> >> >> -Jay
> > > > > > >> >> >>
> > > > > > >> >> >>
> > > > > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM,
Gwen Shapira <
> > > > > > gshapira@cloudera.com
> > > > > > >> >
> > > > > > >> >> >> wrote:
> > > > > > >> >> >>
> > > > > > >> >> >> > If you want to emulate the old
sync producer behavior,
> > you
> > > > > need
> > > > > > to
> > > > > > >> set
> > > > > > >> >> >> > the batch size to 1  (in producer
config) and wait on
> > the
> > > > > future
> > > > > > >> you
> > > > > > >> >> >> > get from Send (i.e. future.get)
> > > > > > >> >> >> >
> > > > > > >> >> >> > I can't think of good reasons
to do so, though.
> > > > > > >> >> >> >
> > > > > > >> >> >> > Gwen
> > > > > > >> >> >> >
> > > > > > >> >> >> >
> > > > > > >> >> >> > On Mon, Feb 2, 2015 at 11:08
AM, Otis Gospodnetic
> > > > > > >> >> >> > <otis.gospodnetic@gmail.com>
wrote:
> > > > > > >> >> >> > > Hi,
> > > > > > >> >> >> > >
> > > > > > >> >> >> > > Is the plan for New Producer
to have ONLY async
> mode?
> > > I'm
> > > > > > asking
> > > > > > >> >> >> because
> > > > > > >> >> >> > > of this info from the Wiki:
> > > > > > >> >> >> > >
> > > > > > >> >> >> > >
> > > > > > >> >> >> > >    - The producer will
always attempt to batch data
> > and
> > > > will
> > > > > > >> always
> > > > > > >> >> >> > >    immediately return a
SendResponse which acts as a
> > > > Future
> > > > > to
> > > > > > >> allow
> > > > > > >> >> >> the
> > > > > > >> >> >> > >    client to await the
completion of the request.
> > > > > > >> >> >> > >
> > > > > > >> >> >> > >
> > > > > > >> >> >> > > The word "always" makes
me think there will be no
> sync
> > > > mode.
> > > > > > >> >> >> > >
> > > > > > >> >> >> > > Thanks,
> > > > > > >> >> >> > > Otis
> > > > > > >> >> >> > > --
> > > > > > >> >> >> > > Monitoring * Alerting *
Anomaly Detection *
> > Centralized
> > > > Log
> > > > > > >> >> Management
> > > > > > >> >> >> > > Solr & Elasticsearch
Support * http://sematext.com/
> > > > > > >> >> >> >
> > > > > > >> >> >>
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message