kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Greg Zoller <gwzol...@yahoo.com.INVALID>
Subject Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
Date Thu, 14 Apr 2016 02:19:10 GMT
I hate email forums... mess up your code!
Ok, so I finally managed to get it to run past the first one.  Not sure what I did... just
kept trying.  Seems a bit flakey.
When it ran all 6 it did 6 sends() then waited 5 seconds sleep I put in there before the close.
 Then I saw 6 of these:
java.lang.IllegalStateException: Producer is closed forcefully. at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:487)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:467)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:155) at java.lang.Thread.run(Thread.java:745)The
offset of the record we just sent is: null
So something is hanging up and then gets tied in knots when things finally shut down.
      From: Greg Zoller <gwzoller@yahoo.com.INVALID>
 To: "users@kafka.apache.org" <users@kafka.apache.org> 
 Sent: Wednesday, April 13, 2016 9:08 PM
 Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
   
I did and now it hangs on the first send.  Here's my output: (hopefully won't destroy formatting).The
"::: Writing 1 :::" should output a line like that for each of 6 writes, but only the first
is called, then hangs until timeout.
[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO org.apache.kafka.clients.producer.ProducerConfig
- ProducerConfig values: compression.type = none metric.reporters = [] metadata.max.age.ms
= 300000 metadata.fetch.timeout.ms = 60000 reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor
= 0.8 bootstrap.servers = [192.168.99.100:9092] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd
= /usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type
= JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null
max.block.ms = 60000 interceptor.classes = null sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000 ssl.truststore.password = null max.in.flight.requests.per.connection
= 5 metrics.num.samples = 2 client.id = producer-1 ssl.endpoint.identification.algorithm =
null ssl.protocol = TLS request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols
= [TLSv1.2, TLSv1.1, TLSv1] acks = 1 batch.size = 16384 ssl.keystore.location = null receive.buffer.bytes
= 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT retries
= 0 max.request.size = 1048576 value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072 linger.ms = 0
[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka version : 0.10.1.0-SNAPSHOT[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka commitId : 319c6e7195143c1f  ::: Writing 1 ::::[kafka-producer-network-thread | producer-1]
WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation
id 0 : {lowercaseStrings=LEADER_NOT_AVAILABLE}org.apache.kafka.common.errors.TimeoutException:
Failed to update metadata after 59860 ms.




      From: Ismael Juma <ismael@juma.me.uk>
 To: users@kafka.apache.org; Greg Zoller <gwzoller@yahoo.com> 
 Sent: Wednesday, April 13, 2016 5:54 PM
 Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
  
Hi Greg,

It may be worth passing a callback to the `send` to see if there's any
error (or calling `get` on the Future returned).

Ismael

On Wed, Apr 13, 2016 at 2:59 PM, Greg Zoller <gwzoller@yahoo.com.invalid>
wrote:

> So I rebuilt last night with the latest from master branch.
> Unfortunately same problem--producer doesn't seem to commit/close().
> After inserting a few records (which *seem* to go ok), close() times out
> and no offsets are updated--still set to 0.
> Any ideas?Thanks,Greg
>
>      From: Greg Zoller <gwzoller@yahoo.com.INVALID>
>  To: "users@kafka.apache.org" <users@kafka.apache.org>; Greg Zoller <
> gwzoller@yahoo.com>
>  Sent: Tuesday, April 12, 2016 11:49 AM
>  Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
>
> Sorry the formatting was all messed up.I re-tested this code with 0.9.0.1
> and it worked fine--KafkaProducer closed and committed the number of
> records expected into the partitions.
> So this seems like a SNAPSHOT issue.  Will continue looking.
>
>      From: Greg Zoller <gwzoller@yahoo.com.INVALID>
>  To: "users@kafka.apache.org" <users@kafka.apache.org>
>  Sent: Tuesday, April 12, 2016 10:40 AM
>  Subject: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
>
> Hello,
> I'm trying to run the latest master build in github.  I've got producer
> code like below:
>    val props = Map(      "bootstrap.servers" -> host,
> "key.serializer" ->
> "org.apache.kafka.common.serialization.ByteArraySerializer",
> "value.serializer" ->
> "org.apache.kafka.common.serialization.StringSerializer"    )    val p =
> new KafkaProducer[Array[Byte], String](props)    (1 to num).foreach { i =>
>    p.send(new ProducerRecord[Array[Byte], String](topic, s"msg-$i"))    }
>  p.close(10000, java.util.concurrent.TimeUnit.MILLISECONDS)
>
>
> This code will wait 10 seconds then close the KafkaProducer.  At that
> point if I check the offsets in my server (same build) I see all offsets
> set to 0...no data committed.  If I put some kind of println in my loop I
> see the p.send() call seeming to work happily.
> Any ideas?
> Thanks,Greg
>
>
>
>
>




  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message