kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dana Powers <dana.pow...@gmail.com>
Subject Re: KafkaProducer block on send
Date Wed, 04 May 2016 16:55:33 GMT
I think changes of this sort (design changes as opposed to bugs) typically
go through a KIP process before work is assigned. You might consider
starting a KIP discussion and see if there is interest in pursuing your
proposed changes.

-Dana
On May 4, 2016 7:58 AM, "Oleg Zhurakousky" <ozhurakousky@hortonworks.com>
wrote:

> Indeed it is.
>
> Oleg
> > On May 4, 2016, at 10:54 AM, Paolo Patierno <ppatierno@live.com> wrote:
> >
> > It's sad that after almost one month it's still "unassigned" :-(
> >
> > Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> > Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> > Twitter : @ppatierno
> > Linkedin : paolopatierno
> > Blog : DevExperience
> >
> >> Subject: Re: KafkaProducer block on send
> >> From: ozhurakousky@hortonworks.com
> >> To: users@kafka.apache.org
> >> Date: Wed, 4 May 2016 14:47:25 +0000
> >>
> >> Sure
> >>
> >> Here are both:
> >> https://issues.apache.org/jira/browse/KAFKA-3539
> >> https://issues.apache.org/jira/browse/KAFKA-3540
> >>
> >> On May 4, 2016, at 3:24 AM, Paolo Patierno <ppatierno@live.com<mailto:
> ppatierno@live.com>> wrote:
> >>
> >> Hi Oleg,
> >>
> >> can you share the JIRA link here because I totally agree with you.
> >> For me the send() should be totally asynchronous and not blocking for
> the max.block.ms timeout.
> >>
> >> Currently I'm using the overload with callback that, of course, isn't
> called if the send() fails due to timeout.
> >> In order to catch this scenario I need to do the following :
> >>
> >> Future<RecordMetadata> future = this.producer.send(....);
> >>
> >> if (future.isDone()) {
> >>               try {
> >>                   future.get();
> >>               } catch (InterruptedException e) {
> >>                   // TODO Auto-generated catch block
> >>                   e.printStackTrace();
> >>               } catch (ExecutionException e) {
> >>                   // TODO Auto-generated catch block
> >>                   e.printStackTrace();
> >>               }
> >>           }
> >>
> >> I don't like it so much ...
> >>
> >> Thanks,
> >> Paolo.
> >>
> >> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> >> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> >> Twitter : @ppatierno
> >> Linkedin : paolopatierno
> >> Blog : DevExperience
> >>
> >> Subject: Re: KafkaProducer block on send
> >> From: ozhurakousky@hortonworks.com<mailto:ozhurakousky@hortonworks.com>
> >> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >> Date: Mon, 11 Apr 2016 19:42:17 +0000
> >>
> >> Dana
> >>
> >> Thanks for the explanation, but it sounds more like a workaround since
> everything you describe could be encapsulated within the Future itself.
> After all it "represents the result of an asynchronous computation"
> >>
> >> executor.submit(new Callable<RecordMetadata>() {
> >>    @Override
> >>    public RecordMetadata call() throws Exception {
> >>    // first make sure the metadata for the topic is available
> >>    long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> this.maxBlockTimeMs);
> >>    . . .
> >>  }
> >> });
> >>
> >>
> >> The above would eliminate the confusion and keep user in control where
> even a legitimate blockage could be interrupted/canceled etc., based on
> various business/infrastructure requirements.
> >> Anyway, I’ll raise the issue in JIRA and reference this thread
> >>
> >> Cheers
> >> Oleg
> >>
> >> On Apr 8, 2016, at 10:31 AM, Dana Powers <dana.powers@gmail.com<mailto:
> dana.powers@gmail.com><mailto:dana.powers@gmail.com>> wrote:
> >>
> >> The prior discussion explained:
> >>
> >> (1) The code you point to blocks for a maximum of max.block.ms, which
> is
> >> user configurable. It does not block indefinitely with no user control
> as
> >> you suggest. You are free to configure this to 0 if you like at it will
> not
> >> block at all. Have you tried this like I suggested before?
> >>
> >> (2) Even if you convinced people to remove waitOnMetadata, the send
> method
> >> *still* blocks on memory back pressure (also configured by max.block.ms
> ).
> >> This is for good reason:
> >>
> >> while True:
> >> producer.send(msg)
> >>
> >> Can quickly devour all of you local memory and crash your process if the
> >> outflow rate decreases, say if brokers go down or network partition
> occurs.
> >>
> >> -Dana
> >> I totally agree with Oleg.
> >>
> >> As documentation says the producers send data in an asynchronous way
> and it
> >> is enforced by the send method signature with a Future returned.
> >> It can't block indefinitely without returning to the caller.
> >> I'm mean, you can decide that the code inside the send method blocks
> >> indefinitely but in an "asynchronous way", it should first return a
> Future
> >> to the caller that can handle it.
> >>
> >> Paolo.
> >>
> >> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> >> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> >> Twitter : @ppatierno
> >> Linkedin : paolopatierno
> >> Blog : DevExperience
> >>
> >> Subject: KafkaProducer block on send
> >> From: ozhurakousky@hortonworks.com<mailto:ozhurakousky@hortonworks.com
> ><mailto:ozhurakousky@hortonworks.com>
> >> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> users@kafka.apache.org>
> >> Date: Thu, 7 Apr 2016 13:04:49 +0000
> >>
> >> I know it’s been discussed before, but that conversation never really
> >> concluded with any reasonable explanation, so I am bringing it up again
> as
> >> I believe this is a bug that would need to be fixed in some future
> release.
> >> Can someone please explain the rational for the following code in
> >> KafkaProducer:
> >>
> >> @Override
> >> public Future<RecordMetadata> send(ProducerRecord<K, V> record,
Callback
> >> callback) {
> >>      try {
> >>          // first make sure the metadata for the topic is available
> >>          long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> >> this.maxBlockTimeMs);
> >> . . .
> >> }
> >>
> >> By definition the method that returns Future implies that caller decides
> >> how long to wait for the completion via Future.get(TIMETOWAIT). In this
> >> case there is an explicit blocking call (waitOnMetadata), that can hang
> >> infinitely (regardless of the reasons) which essentially results in
> user’s
> >> code deadlock since the Future may never be returned in the first place.
> >>
> >> Thoughts?
> >>
> >> Oleg
> >>
> >>
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message