kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Oleg Zhurakousky <ozhurakou...@hortonworks.com>
Subject Re: KafkaProducer block on send
Date Wed, 04 May 2016 14:47:25 GMT
Sure

Here are both:
https://issues.apache.org/jira/browse/KAFKA-3539
https://issues.apache.org/jira/browse/KAFKA-3540

On May 4, 2016, at 3:24 AM, Paolo Patierno <ppatierno@live.com<mailto:ppatierno@live.com>>
wrote:

Hi Oleg,

can you share the JIRA link here because I totally agree with you.
For me the send() should be totally asynchronous and not blocking for the max.block.ms timeout.

Currently I'm using the overload with callback that, of course, isn't called if the send()
fails due to timeout.
In order to catch this scenario I need to do the following :

Future<RecordMetadata> future = this.producer.send(....);

if (future.isDone()) {
               try {
                   future.get();
               } catch (InterruptedException e) {
                   // TODO Auto-generated catch block
                   e.printStackTrace();
               } catch (ExecutionException e) {
                   // TODO Auto-generated catch block
                   e.printStackTrace();
               }
           }

I don't like it so much ...

Thanks,
Paolo.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

Subject: Re: KafkaProducer block on send
From: ozhurakousky@hortonworks.com<mailto:ozhurakousky@hortonworks.com>
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Date: Mon, 11 Apr 2016 19:42:17 +0000

Dana

Thanks for the explanation, but it sounds more like a workaround since everything you describe
could be encapsulated within the Future itself. After all it "represents the result of an
asynchronous computation"

executor.submit(new Callable<RecordMetadata>() {
    @Override
    public RecordMetadata call() throws Exception {
    // first make sure the metadata for the topic is available
    long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
    . . .
  }
});


The above would eliminate the confusion and keep user in control where even a legitimate blockage
could be interrupted/canceled etc., based on various business/infrastructure requirements.
Anyway, I’ll raise the issue in JIRA and reference this thread

Cheers
Oleg

On Apr 8, 2016, at 10:31 AM, Dana Powers <dana.powers@gmail.com<mailto:dana.powers@gmail.com><mailto:dana.powers@gmail.com>>
wrote:

The prior discussion explained:

(1) The code you point to blocks for a maximum of max.block.ms, which is
user configurable. It does not block indefinitely with no user control as
you suggest. You are free to configure this to 0 if you like at it will not
block at all. Have you tried this like I suggested before?

(2) Even if you convinced people to remove waitOnMetadata, the send method
*still* blocks on memory back pressure (also configured by max.block.ms).
This is for good reason:

while True:
producer.send(msg)

Can quickly devour all of you local memory and crash your process if the
outflow rate decreases, say if brokers go down or network partition occurs.

-Dana
I totally agree with Oleg.

As documentation says the producers send data in an asynchronous way and it
is enforced by the send method signature with a Future returned.
It can't block indefinitely without returning to the caller.
I'm mean, you can decide that the code inside the send method blocks
indefinitely but in an "asynchronous way", it should first return a Future
to the caller that can handle it.

Paolo.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

Subject: KafkaProducer block on send
From: ozhurakousky@hortonworks.com<mailto:ozhurakousky@hortonworks.com><mailto:ozhurakousky@hortonworks.com>
To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
Date: Thu, 7 Apr 2016 13:04:49 +0000

I know it’s been discussed before, but that conversation never really
concluded with any reasonable explanation, so I am bringing it up again as
I believe this is a bug that would need to be fixed in some future release.
Can someone please explain the rational for the following code in
KafkaProducer:

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
      try {
          // first make sure the metadata for the topic is available
          long waitedOnMetadataMs = waitOnMetadata(record.topic(),
this.maxBlockTimeMs);
. . .
}

By definition the method that returns Future implies that caller decides
how long to wait for the completion via Future.get(TIMETOWAIT). In this
case there is an explicit blocking call (waitOnMetadata), that can hang
infinitely (regardless of the reasons) which essentially results in user’s
code deadlock since the Future may never be returned in the first place.

Thoughts?

Oleg





Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message