kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <gshap...@cloudera.com>
Subject Re: New Producer - ONLY sync mode?
Date Wed, 04 Feb 2015 21:14:58 GMT
I thought Jay Kreps had the right recipes:

To mimic the old Sync producer:
 producer.send(record).get();

To mimic old batches:
List responses = new ArrayList();
for(input: recordBatch)
    responses.add(producer.send(input));
for(response: responses)
    response.get

Perhaps we need to add this to the FAQ?

Gwen


On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin <steve@stevemorin.com> wrote:

> Looking at this thread I would ideally want something at least the right
> recipe to mimic sync behavior like Otis is talking about.
>
> In the second case, would like to be able to individually know if messages
> have failed even regardless if they are in separate batches, sort of like
> what Kinesis does as Pradeep mentioned.
> -Steve
>
> On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
> > Yeah totally. Using a callback is, of course, the Right Thing for this
> kind
> > of stuff. But I have found that kind of asynchronous thinking can be hard
> > for people. Even if you get out of the pre-java 8 syntactic pain that
> > anonymous inner classes inflict just dealing with multiple threads of
> > control without creating async spaghetti can be a challenge for complex
> > stuff. That is really the only reason for the futures in the api, they
> are
> > strictly less powerful than the callbacks, but at least using them you
> can
> > just call .get() and pretend it is blocking.
> >
> > -Jay
> >
> > On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <joe.stein@stealth.ly> wrote:
> >
> > > Now that 0.8.2.0 is in the wild I look forward to working with more and
> > > seeing what folks start to-do with this function
> > >
> > >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > > ,
> > > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > > blocking.
> > >
> > > One sprint I know of coming up is going to have the new producer as a
> > > component in their reactive calls and handling bookkeeping and retries
> > > through that type of call back approach. Should work well (haven't
> tried
> > > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc,
> etc
> > in
> > > functional languages and frameworks.
> > >
> > > I think as JDK 8 starts to get out in the wild too more (may after jdk7
> > > eol) the use of .get will be reduced (imho) and folks will be thinking
> > more
> > > about non-blocking vs blocking and not as so much sync vs async but my
> > > crystal ball just back from the shop so well see =8^)
> > >
> > > /*******************************************
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > ********************************************/
> > >
> > > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <jay.kreps@gmail.com>
> wrote:
> > >
> > > > Hey guys,
> > > >
> > > > I guess the question is whether it really matters how many underlying
> > > > network requests occur? It is very hard for an application to depend
> on
> > > > this even in the old producer since it depends on the partitions
> > > placement
> > > > (a send to two partitions may go to either one machine or two and so
> it
> > > > will send either one or two requests). So when you send a batch in
> one
> > > call
> > > > you may feel that is "all at once", but that is only actually
> > guaranteed
> > > if
> > > > all messages have the same partition.
> > > >
> > > > The challenge is allowing even this in the presence of bounded
> request
> > > > sizes which we have in the new producer. The user sends a list of
> > objects
> > > > and the serialized size that will result is not very apparent to
> them.
> > If
> > > > you break it up into multiple requests then that is kind of further
> > > ruining
> > > > the illusion of a single send. If you don't then you have to just
> error
> > > out
> > > > which is equally annoying to have to handle.
> > > >
> > > > But I'm not sure if from your description you are saying you actually
> > > care
> > > > how many physical requests are issued. I think it is more like it is
> > just
> > > > syntactically annoying to send a batch of data now because it needs a
> > for
> > > > loop.
> > > >
> > > > Currently to do this you would do:
> > > >
> > > > List responses = new ArrayList();
> > > > for(input: recordBatch)
> > > >     responses.add(producer.send(input));
> > > > for(response: responses)
> > > >     response.get
> > > >
> > > > If you don't depend on the offset/error info we could add a flush
> call
> > so
> > > > you could instead do
> > > > for(input: recordBatch)
> > > >     producer.send(input);
> > > > producer.flush();
> > > >
> > > > But if you do want the error/offset then you are going to be back to
> > the
> > > > original case.
> > > >
> > > > Thoughts?
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gshapira@cloudera.com>
> > > > wrote:
> > > >
> > > > > I've been thinking about that too, since both Flume and Sqoop rely
> on
> > > > > send(List) API of the old API.
> > > > >
> > > > > I'd like to see this API come back, but I'm debating how we'd
> handle
> > > > > errors. IIRC, the old API would fail an entire batch on a single
> > > > > error, which can lead to duplicates. Having N callbacks lets me
> retry
> > > > > / save / whatever just the messages that had issues.
> > > > >
> > > > > If messages had identifiers from the producer side, we could have
> the
> > > > > API call the callback with a list of message-ids and their status.
> > But
> > > > > they don't :)
> > > > >
> > > > > Any thoughts on how you'd like it to work?
> > > > >
> > > > > Gwen
> > > > >
> > > > >
> > > > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <
> > > pradeepg26@gmail.com>
> > > > > wrote:
> > > > > > This is a great question Otis. Like Gwen said, you can accomplish
> > > Sync
> > > > > mode
> > > > > > by setting the batch size to 1. But this does highlight a
> > shortcoming
> > > > of
> > > > > > the new producer API.
> > > > > >
> > > > > > I really like the design of the new API and it has really great
> > > > > properties
> > > > > > and I'm enjoying working with it. However, once API that I think
> > > we're
> > > > > > lacking is a "batch" API. Currently, I have to iterate over
a
> batch
> > > and
> > > > > > call .send() on each record, which returns n callbacks instead
> of 1
> > > > > > callback for the whole batch. This significantly complicates
> > recovery
> > > > > logic
> > > > > > where we need to commit a batch as opposed 1 record at a time.
> > > > > >
> > > > > > Do you guys have any plans to add better semantics around
> batches?
> > > > > >
> > > > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <
> > gshapira@cloudera.com>
> > > > > wrote:
> > > > > >
> > > > > >> If I understood the code and Jay correctly - if you wait
for the
> > > > > >> future it will be a similar delay to that of the old sync
> > producer.
> > > > > >>
> > > > > >> Put another way, if you test it out and see longer delays
than
> the
> > > > > >> sync producer had, we need to find out why and fix it.
> > > > > >>
> > > > > >> Gwen
> > > > > >>
> > > > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > > > > >> <otis.gospodnetic@gmail.com> wrote:
> > > > > >> > Hi,
> > > > > >> >
> > > > > >> > Nope, unfortunately it can't do that.  X is a remote
app,
> > doesn't
> > > > > listen
> > > > > >> to
> > > > > >> > anything external, calls Y via HTTPS.  So X has to
decide what
> > to
> > > do
> > > > > with
> > > > > >> > its data based on Y's synchronous response.  It has
to block
> > > until Y
> > > > > >> > responds.  And it wouldn't be pretty, I think, because
nobody
> > > wants
> > > > to
> > > > > >> run
> > > > > >> > apps that talk to remove servers and hang on to connections
> more
> > > > than
> > > > > >> they
> > > > > >> > have to.  But perhaps that is the only way?  Or maybe
the
> answer
> > > to
> > > > > "I'm
> > > > > >> > guessing the delay would be more or less the same as
if the
> > > Producer
> > > > > was
> > > > > >> > using SYNC mode?" is YES, in which case the connection
from X
> > to Y
> > > > > would
> > > > > >> be
> > > > > >> > open for just as long as with a SYNC producer running
in Y?
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Otis
> > > > > >> > --
> > > > > >> > Monitoring * Alerting * Anomaly Detection * Centralized
Log
> > > > Management
> > > > > >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >
> > > > > >> >
> > > > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <
> > > gshapira@cloudera.com
> > > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> >> Can Y have a callback that will handle the notification
to X?
> > > > > >> >> In this case, perhaps Y can be async and X can
buffer the
> data
> > > > until
> > > > > >> >> the callback triggers and says "all good" (or resend
if the
> > > > callback
> > > > > >> >> indicates an error)
> > > > > >> >>
> > > > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > > > > >> >> <otis.gospodnetic@gmail.com> wrote:
> > > > > >> >> > Hi,
> > > > > >> >> >
> > > > > >> >> > Thanks for the info.  Here's the use case.
 We have
> something
> > > up
> > > > > >> stream
> > > > > >> >> > sending data, say a log shipper called X.
 It sends it to
> > some
> > > > > remote
> > > > > >> >> > component Y.  Y is the Kafka Producer and
it puts data into
> > > > Kafka.
> > > > > >> But Y
> > > > > >> >> > needs to send a reply to X and tell it whether
it
> > successfully
> > > > put
> > > > > all
> > > > > >> >> its
> > > > > >> >> > data into Kafka.  If it did not, Y wants to
tell X to
> buffer
> > > data
> > > > > >> locally
> > > > > >> >> > and resend it later.
> > > > > >> >> >
> > > > > >> >> > If producer is ONLY async, Y can't easily
do that.  Or
> maybe
> > Y
> > > > > would
> > > > > >> just
> > > > > >> >> > need to wait for the Future to come back and
only then send
> > the
> > > > > >> response
> > > > > >> >> > back to X?  If so, I'm guessing the delay
would be more or
> > less
> > > > the
> > > > > >> same
> > > > > >> >> as
> > > > > >> >> > if the Producer was using SYNC mode?
> > > > > >> >> >
> > > > > >> >> > Thanks,
> > > > > >> >> > Otis
> > > > > >> >> > --
> > > > > >> >> > Monitoring * Alerting * Anomaly Detection
* Centralized Log
> > > > > Management
> > > > > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps
<
> > jay.kreps@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >> >> >
> > > > > >> >> >> Yeah as Gwen says there is no sync/async
mode anymore.
> There
> > > is
> > > > a
> > > > > new
> > > > > >> >> >> configuration which does a lot of what
async did in terms
> of
> > > > > allowing
> > > > > >> >> >> batching:
> > > > > >> >> >>
> > > > > >> >> >> batch.size - This is the target amount
of data per
> partition
> > > the
> > > > > >> server
> > > > > >> >> >> will attempt to batch together.
> > > > > >> >> >> linger.ms - This is the time the producer
will wait for
> > more
> > > > data
> > > > > >> to be
> > > > > >> >> >> sent to better batch up writes. The default
is 0 (send
> > > > > immediately).
> > > > > >> So
> > > > > >> >> if
> > > > > >> >> >> you set this to 50 ms the client will
send immediately if
> it
> > > has
> > > > > >> already
> > > > > >> >> >> filled up its batch, otherwise it will
wait to accumulate
> > the
> > > > > number
> > > > > >> of
> > > > > >> >> >> bytes given by batch.size.
> > > > > >> >> >>
> > > > > >> >> >> To send asynchronously you do
> > > > > >> >> >>    producer.send(record)
> > > > > >> >> >> whereas to block on a response you do
> > > > > >> >> >>    producer.send(record).get();
> > > > > >> >> >> which will wait for acknowledgement from
the server.
> > > > > >> >> >>
> > > > > >> >> >> One advantage of this model is that the
client will do
> it's
> > > best
> > > > > to
> > > > > >> >> batch
> > > > > >> >> >> under the covers even if linger.ms=0.
It will do this by
> > > > batching
> > > > > >> any
> > > > > >> >> data
> > > > > >> >> >> that arrives while another send is in
progress into a
> single
> > > > > >> >> >> request--giving a kind of "group commit"
effect.
> > > > > >> >> >>
> > > > > >> >> >> The hope is that this will be both simpler
to understand
> (a
> > > > single
> > > > > >> api
> > > > > >> >> that
> > > > > >> >> >> always works the same) and more powerful
(you always get a
> > > > > response
> > > > > >> with
> > > > > >> >> >> error and offset information whether or
not you choose to
> > use
> > > > it).
> > > > > >> >> >>
> > > > > >> >> >> -Jay
> > > > > >> >> >>
> > > > > >> >> >>
> > > > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen
Shapira <
> > > > > gshapira@cloudera.com
> > > > > >> >
> > > > > >> >> >> wrote:
> > > > > >> >> >>
> > > > > >> >> >> > If you want to emulate the old sync
producer behavior,
> you
> > > > need
> > > > > to
> > > > > >> set
> > > > > >> >> >> > the batch size to 1  (in producer
config) and wait on
> the
> > > > future
> > > > > >> you
> > > > > >> >> >> > get from Send (i.e. future.get)
> > > > > >> >> >> >
> > > > > >> >> >> > I can't think of good reasons to
do so, though.
> > > > > >> >> >> >
> > > > > >> >> >> > Gwen
> > > > > >> >> >> >
> > > > > >> >> >> >
> > > > > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM,
Otis Gospodnetic
> > > > > >> >> >> > <otis.gospodnetic@gmail.com>
wrote:
> > > > > >> >> >> > > Hi,
> > > > > >> >> >> > >
> > > > > >> >> >> > > Is the plan for New Producer
to have ONLY async mode?
> > I'm
> > > > > asking
> > > > > >> >> >> because
> > > > > >> >> >> > > of this info from the Wiki:
> > > > > >> >> >> > >
> > > > > >> >> >> > >
> > > > > >> >> >> > >    - The producer will always
attempt to batch data
> and
> > > will
> > > > > >> always
> > > > > >> >> >> > >    immediately return a SendResponse
which acts as a
> > > Future
> > > > to
> > > > > >> allow
> > > > > >> >> >> the
> > > > > >> >> >> > >    client to await the completion
of the request.
> > > > > >> >> >> > >
> > > > > >> >> >> > >
> > > > > >> >> >> > > The word "always" makes me think
there will be no sync
> > > mode.
> > > > > >> >> >> > >
> > > > > >> >> >> > > Thanks,
> > > > > >> >> >> > > Otis
> > > > > >> >> >> > > --
> > > > > >> >> >> > > Monitoring * Alerting * Anomaly
Detection *
> Centralized
> > > Log
> > > > > >> >> Management
> > > > > >> >> >> > > Solr & Elasticsearch Support
* http://sematext.com/
> > > > > >> >> >> >
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message