kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Morin <st...@stevemorin.com>
Subject Re: New Producer - ONLY sync mode?
Date Wed, 04 Feb 2015 17:58:08 GMT
Looking at this thread I would ideally want something at least the right
recipe to mimic sync behavior like Otis is talking about.

In the second case, would like to be able to individually know if messages
have failed even regardless if they are in separate batches, sort of like
what Kinesis does as Pradeep mentioned.
-Steve

On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps <jay.kreps@gmail.com> wrote:

> Yeah totally. Using a callback is, of course, the Right Thing for this kind
> of stuff. But I have found that kind of asynchronous thinking can be hard
> for people. Even if you get out of the pre-java 8 syntactic pain that
> anonymous inner classes inflict just dealing with multiple threads of
> control without creating async spaghetti can be a challenge for complex
> stuff. That is really the only reason for the futures in the api, they are
> strictly less powerful than the callbacks, but at least using them you can
> just call .get() and pretend it is blocking.
>
> -Jay
>
> On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <joe.stein@stealth.ly> wrote:
>
> > Now that 0.8.2.0 is in the wild I look forward to working with more and
> > seeing what folks start to-do with this function
> >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > ,
> > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > blocking.
> >
> > One sprint I know of coming up is going to have the new producer as a
> > component in their reactive calls and handling bookkeeping and retries
> > through that type of call back approach. Should work well (haven't tried
> > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc, etc
> in
> > functional languages and frameworks.
> >
> > I think as JDK 8 starts to get out in the wild too more (may after jdk7
> > eol) the use of .get will be reduced (imho) and folks will be thinking
> more
> > about non-blocking vs blocking and not as so much sync vs async but my
> > crystal ball just back from the shop so well see =8^)
> >
> > /*******************************************
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > ********************************************/
> >
> > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
> >
> > > Hey guys,
> > >
> > > I guess the question is whether it really matters how many underlying
> > > network requests occur? It is very hard for an application to depend on
> > > this even in the old producer since it depends on the partitions
> > placement
> > > (a send to two partitions may go to either one machine or two and so it
> > > will send either one or two requests). So when you send a batch in one
> > call
> > > you may feel that is "all at once", but that is only actually
> guaranteed
> > if
> > > all messages have the same partition.
> > >
> > > The challenge is allowing even this in the presence of bounded request
> > > sizes which we have in the new producer. The user sends a list of
> objects
> > > and the serialized size that will result is not very apparent to them.
> If
> > > you break it up into multiple requests then that is kind of further
> > ruining
> > > the illusion of a single send. If you don't then you have to just error
> > out
> > > which is equally annoying to have to handle.
> > >
> > > But I'm not sure if from your description you are saying you actually
> > care
> > > how many physical requests are issued. I think it is more like it is
> just
> > > syntactically annoying to send a batch of data now because it needs a
> for
> > > loop.
> > >
> > > Currently to do this you would do:
> > >
> > > List responses = new ArrayList();
> > > for(input: recordBatch)
> > >     responses.add(producer.send(input));
> > > for(response: responses)
> > >     response.get
> > >
> > > If you don't depend on the offset/error info we could add a flush call
> so
> > > you could instead do
> > > for(input: recordBatch)
> > >     producer.send(input);
> > > producer.flush();
> > >
> > > But if you do want the error/offset then you are going to be back to
> the
> > > original case.
> > >
> > > Thoughts?
> > >
> > > -Jay
> > >
> > > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gshapira@cloudera.com>
> > > wrote:
> > >
> > > > I've been thinking about that too, since both Flume and Sqoop rely on
> > > > send(List) API of the old API.
> > > >
> > > > I'd like to see this API come back, but I'm debating how we'd handle
> > > > errors. IIRC, the old API would fail an entire batch on a single
> > > > error, which can lead to duplicates. Having N callbacks lets me retry
> > > > / save / whatever just the messages that had issues.
> > > >
> > > > If messages had identifiers from the producer side, we could have the
> > > > API call the callback with a list of message-ids and their status.
> But
> > > > they don't :)
> > > >
> > > > Any thoughts on how you'd like it to work?
> > > >
> > > > Gwen
> > > >
> > > >
> > > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <
> > pradeepg26@gmail.com>
> > > > wrote:
> > > > > This is a great question Otis. Like Gwen said, you can accomplish
> > Sync
> > > > mode
> > > > > by setting the batch size to 1. But this does highlight a
> shortcoming
> > > of
> > > > > the new producer API.
> > > > >
> > > > > I really like the design of the new API and it has really great
> > > > properties
> > > > > and I'm enjoying working with it. However, once API that I think
> > we're
> > > > > lacking is a "batch" API. Currently, I have to iterate over a batch
> > and
> > > > > call .send() on each record, which returns n callbacks instead of
1
> > > > > callback for the whole batch. This significantly complicates
> recovery
> > > > logic
> > > > > where we need to commit a batch as opposed 1 record at a time.
> > > > >
> > > > > Do you guys have any plans to add better semantics around batches?
> > > > >
> > > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <
> gshapira@cloudera.com>
> > > > wrote:
> > > > >
> > > > >> If I understood the code and Jay correctly - if you wait for
the
> > > > >> future it will be a similar delay to that of the old sync
> producer.
> > > > >>
> > > > >> Put another way, if you test it out and see longer delays than
the
> > > > >> sync producer had, we need to find out why and fix it.
> > > > >>
> > > > >> Gwen
> > > > >>
> > > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > > > >> <otis.gospodnetic@gmail.com> wrote:
> > > > >> > Hi,
> > > > >> >
> > > > >> > Nope, unfortunately it can't do that.  X is a remote app,
> doesn't
> > > > listen
> > > > >> to
> > > > >> > anything external, calls Y via HTTPS.  So X has to decide
what
> to
> > do
> > > > with
> > > > >> > its data based on Y's synchronous response.  It has to block
> > until Y
> > > > >> > responds.  And it wouldn't be pretty, I think, because nobody
> > wants
> > > to
> > > > >> run
> > > > >> > apps that talk to remove servers and hang on to connections
more
> > > than
> > > > >> they
> > > > >> > have to.  But perhaps that is the only way?  Or maybe the
answer
> > to
> > > > "I'm
> > > > >> > guessing the delay would be more or less the same as if
the
> > Producer
> > > > was
> > > > >> > using SYNC mode?" is YES, in which case the connection from
X
> to Y
> > > > would
> > > > >> be
> > > > >> > open for just as long as with a SYNC producer running in
Y?
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Otis
> > > > >> > --
> > > > >> > Monitoring * Alerting * Anomaly Detection * Centralized
Log
> > > Management
> > > > >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > >> >
> > > > >> >
> > > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <
> > gshapira@cloudera.com
> > > >
> > > > >> wrote:
> > > > >> >
> > > > >> >> Can Y have a callback that will handle the notification
to X?
> > > > >> >> In this case, perhaps Y can be async and X can buffer
the data
> > > until
> > > > >> >> the callback triggers and says "all good" (or resend
if the
> > > callback
> > > > >> >> indicates an error)
> > > > >> >>
> > > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > > > >> >> <otis.gospodnetic@gmail.com> wrote:
> > > > >> >> > Hi,
> > > > >> >> >
> > > > >> >> > Thanks for the info.  Here's the use case.  We
have something
> > up
> > > > >> stream
> > > > >> >> > sending data, say a log shipper called X.  It sends
it to
> some
> > > > remote
> > > > >> >> > component Y.  Y is the Kafka Producer and it puts
data into
> > > Kafka.
> > > > >> But Y
> > > > >> >> > needs to send a reply to X and tell it whether
it
> successfully
> > > put
> > > > all
> > > > >> >> its
> > > > >> >> > data into Kafka.  If it did not, Y wants to tell
X to buffer
> > data
> > > > >> locally
> > > > >> >> > and resend it later.
> > > > >> >> >
> > > > >> >> > If producer is ONLY async, Y can't easily do that.
 Or maybe
> Y
> > > > would
> > > > >> just
> > > > >> >> > need to wait for the Future to come back and only
then send
> the
> > > > >> response
> > > > >> >> > back to X?  If so, I'm guessing the delay would
be more or
> less
> > > the
> > > > >> same
> > > > >> >> as
> > > > >> >> > if the Producer was using SYNC mode?
> > > > >> >> >
> > > > >> >> > Thanks,
> > > > >> >> > Otis
> > > > >> >> > --
> > > > >> >> > Monitoring * Alerting * Anomaly Detection * Centralized
Log
> > > > Management
> > > > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > >> >> >
> > > > >> >> >
> > > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <
> jay.kreps@gmail.com
> > >
> > > > >> wrote:
> > > > >> >> >
> > > > >> >> >> Yeah as Gwen says there is no sync/async mode
anymore. There
> > is
> > > a
> > > > new
> > > > >> >> >> configuration which does a lot of what async
did in terms of
> > > > allowing
> > > > >> >> >> batching:
> > > > >> >> >>
> > > > >> >> >> batch.size - This is the target amount of data
per partition
> > the
> > > > >> server
> > > > >> >> >> will attempt to batch together.
> > > > >> >> >> linger.ms - This is the time the producer will
wait for
> more
> > > data
> > > > >> to be
> > > > >> >> >> sent to better batch up writes. The default
is 0 (send
> > > > immediately).
> > > > >> So
> > > > >> >> if
> > > > >> >> >> you set this to 50 ms the client will send
immediately if it
> > has
> > > > >> already
> > > > >> >> >> filled up its batch, otherwise it will wait
to accumulate
> the
> > > > number
> > > > >> of
> > > > >> >> >> bytes given by batch.size.
> > > > >> >> >>
> > > > >> >> >> To send asynchronously you do
> > > > >> >> >>    producer.send(record)
> > > > >> >> >> whereas to block on a response you do
> > > > >> >> >>    producer.send(record).get();
> > > > >> >> >> which will wait for acknowledgement from the
server.
> > > > >> >> >>
> > > > >> >> >> One advantage of this model is that the client
will do it's
> > best
> > > > to
> > > > >> >> batch
> > > > >> >> >> under the covers even if linger.ms=0. It will
do this by
> > > batching
> > > > >> any
> > > > >> >> data
> > > > >> >> >> that arrives while another send is in progress
into a single
> > > > >> >> >> request--giving a kind of "group commit" effect.
> > > > >> >> >>
> > > > >> >> >> The hope is that this will be both simpler
to understand (a
> > > single
> > > > >> api
> > > > >> >> that
> > > > >> >> >> always works the same) and more powerful (you
always get a
> > > > response
> > > > >> with
> > > > >> >> >> error and offset information whether or not
you choose to
> use
> > > it).
> > > > >> >> >>
> > > > >> >> >> -Jay
> > > > >> >> >>
> > > > >> >> >>
> > > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira
<
> > > > gshapira@cloudera.com
> > > > >> >
> > > > >> >> >> wrote:
> > > > >> >> >>
> > > > >> >> >> > If you want to emulate the old sync producer
behavior, you
> > > need
> > > > to
> > > > >> set
> > > > >> >> >> > the batch size to 1  (in producer config)
and wait on the
> > > future
> > > > >> you
> > > > >> >> >> > get from Send (i.e. future.get)
> > > > >> >> >> >
> > > > >> >> >> > I can't think of good reasons to do so,
though.
> > > > >> >> >> >
> > > > >> >> >> > Gwen
> > > > >> >> >> >
> > > > >> >> >> >
> > > > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis
Gospodnetic
> > > > >> >> >> > <otis.gospodnetic@gmail.com> wrote:
> > > > >> >> >> > > Hi,
> > > > >> >> >> > >
> > > > >> >> >> > > Is the plan for New Producer to have
ONLY async mode?
> I'm
> > > > asking
> > > > >> >> >> because
> > > > >> >> >> > > of this info from the Wiki:
> > > > >> >> >> > >
> > > > >> >> >> > >
> > > > >> >> >> > >    - The producer will always attempt
to batch data and
> > will
> > > > >> always
> > > > >> >> >> > >    immediately return a SendResponse
which acts as a
> > Future
> > > to
> > > > >> allow
> > > > >> >> >> the
> > > > >> >> >> > >    client to await the completion
of the request.
> > > > >> >> >> > >
> > > > >> >> >> > >
> > > > >> >> >> > > The word "always" makes me think
there will be no sync
> > mode.
> > > > >> >> >> > >
> > > > >> >> >> > > Thanks,
> > > > >> >> >> > > Otis
> > > > >> >> >> > > --
> > > > >> >> >> > > Monitoring * Alerting * Anomaly Detection
* Centralized
> > Log
> > > > >> >> Management
> > > > >> >> >> > > Solr & Elasticsearch Support
* http://sematext.com/
> > > > >> >> >> >
> > > > >> >> >>
> > > > >> >>
> > > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message