kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andy Davidson <A...@SantaCruzIntegration.com>
Subject Re: newbie: kafka 0.9.0.0 producer does not terminate after producer.close()
Date Mon, 23 May 2016 18:41:12 GMT
Thanks

Andy

From:  Kamal C <kamaltarget@gmail.com>
Reply-To:  <users@kafka.apache.org>
Date:  Sunday, May 22, 2016 at 8:36 AM
To:  <users@kafka.apache.org>
Subject:  Re: newbie: kafka 0.9.0.0 producer does not terminate after
producer.close()

> Andy,
> 
> Kafka 0.9.0 server supports the previous versions of the clients (0.8.2,
> 0.8.1..).
> But, new clients won't work properly with the older version of Kafka server.
> 
> You should upgrade your server / broker first.
> 
> --Kamal
> 
> On Fri, May 20, 2016 at 10:58 PM, Andy Davidson <
> Andy@santacruzintegration.com> wrote:
> 
>>  Hi Jaikiran
>> 
>>  Bellow is the stack trace. For completeness I see in my log file that my
>>  code has called
>> 
>>  producer.flush();
>> 
>>  producer.close();
>> 
>> 
>> 
>>  I get the following error, how ever I do not think this is the problem. I
>>  found a ??bug report?? That said this was because I was connecting to a
>>  0.8x
>>  sever. I am able to consume my test messages using
>>  kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh
>> 
>>  Kind regards
>> 
>>  Andy
>> 
>>  ERROR 17:12:14 kafka-producer-network-thread | producer-1
>>  o.a.k.c.p.i.Sender
>>  run line:130 Uncaught error in kafka producer I/O thread:
>> 
>>  org.apache.kafka.common.protocol.types.SchemaException: Error reading field
>>  'throttle_time_ms': java.nio.BufferUnderflowException
>> 
>>  at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
>> 
>>  at
>> 
>>  org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient
>>  .java:464)
>> 
>>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
>> 
>>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>> 
>>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>> 
>>  at java.lang.Thread.run(Thread.java:745)
>> 
>> 
>> 
>>  $ jstack 908
>> 
>>  2016-05-20 10:16:25
>> 
>>  Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):
>> 
>> 
>> 
>>  "Attach Listener" #12 daemon prio=9 os_prio=31 tid=0x00007fe04291c800
>>  nid=0x130b waiting on condition [0x0000000000000000]
>> 
>>     java.lang.Thread.State: RUNNABLE
>> 
>> 
>> 
>>  "kafka-producer-network-thread | producer-1" #11 daemon prio=5 os_prio=31
>>  tid=0x00007fe041116800 nid=0x5a0f runnable [0x00007000015d5000]
>> 
>>     java.lang.Thread.State: RUNNABLE
>> 
>>  at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
>> 
>>  at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
>> 
>>  at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)
>> 
>>  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>> 
>>  - locked <0x000000076b67ea88> (a sun.nio.ch.Util$2)
>> 
>>  - locked <0x000000076b67ea00> (a java.util.Collections$UnmodifiableSet)
>> 
>>  - locked <0x000000076b67e740> (a sun.nio.ch.KQueueSelectorImpl)
>> 
>>  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>> 
>>  at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>> 
>>  at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>> 
>>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
>> 
>>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>> 
>>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>> 
>>  at java.lang.Thread.run(Thread.java:745)
>> 
>> 
>> 
>>  "Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007fe042015800
>>  nid=0x5203 runnable [0x0000000000000000]
>> 
>>     java.lang.Thread.State: RUNNABLE
>> 
>> 
>> 
>>  "C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007fe04285b000
>>  nid=0x5003 waiting on condition [0x0000000000000000]
>> 
>>     java.lang.Thread.State: RUNNABLE
>> 
>> 
>> 
>>  "C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007fe04282f000
>>  nid=0x4e03 waiting on condition [0x0000000000000000]
>> 
>>     java.lang.Thread.State: RUNNABLE
>> 
>> 
>> 
>>  "C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007fe041830800
>>  nid=0x4c03 waiting on condition [0x0000000000000000]
>> 
>>     java.lang.Thread.State: RUNNABLE
>> 
>> 
>> 
>>  "C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007fe04201c800
>>  nid=0x4a03 waiting on condition [0x0000000000000000]
>> 
>>     java.lang.Thread.State: RUNNABLE
>> 
>> 
>> 
>>  "Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fe042015000
>>  nid=0x3e0f runnable [0x0000000000000000]
>> 
>>     java.lang.Thread.State: RUNNABLE
>> 
>> 
>> 
>>  "Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fe04200d800 nid=0x3803
>>  in
>>  Object.wait() [0x0000700000d3a000]
>> 
>>     java.lang.Thread.State: WAITING (on object monitor)
>> 
>>  at java.lang.Object.wait(Native Method)
>> 
>>  - waiting on <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)
>> 
>>  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>> 
>>  - locked <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)
>> 
>>  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
>> 
>>  at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>> 
>> 
>> 
>>  "Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fe04200d000
>>  nid=0x3603 in Object.wait() [0x0000700000c37000]
>> 
>>     java.lang.Thread.State: WAITING (on object monitor)
>> 
>>  at java.lang.Object.wait(Native Method)
>> 
>>  - waiting on <0x000000076ab06af8> (a java.lang.ref.Reference$Lock)
>> 
>>  at java.lang.Object.wait(Object.java:502)
>> 
>>  at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
>> 
>>  - locked <0x000000076ab06af8> (a java.lang.ref.Reference$Lock)
>> 
>> 
>> 
>>  "main" #1 prio=5 os_prio=31 tid=0x00007fe041010800 nid=0x1703 waiting on
>>  condition [0x0000700000219000]
>> 
>>     java.lang.Thread.State: WAITING (parking)
>> 
>>  at sun.misc.Unsafe.park(Native Method)
>> 
>>  - parking to wait for  <0x000000076b89c8e0> (a
>>  java.util.concurrent.CountDownLatch$Sync)
>> 
>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>> 
>>  at
>> 
>>  java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(
>>  AbstractQueuedSynchronizer.java:836)
>> 
>>  at
>> 
>>  java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterru
>>  ptibly(AbstractQueuedSynchronizer.java:997)
>> 
>>  at
>> 
>>  java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterrupt
>>  ibly(AbstractQueuedSynchronizer.java:1304)
>> 
>>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>> 
>>  at
>> 
>>  org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(Produ
>>  ceRequestResult.java:57)
>> 
>>  at
>> 
>>  org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushComp
>>  letion(RecordAccumulator.java:422)
>> 
>>  at
>> 
>>  org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546
>>  )
>> 
>>  at com.pws.gnip.powertrack.HoseBirdClient.main(HoseBirdClient.java:56)
>> 
>> 
>> 
>>  "VM Thread" os_prio=31 tid=0x00007fe04200a000 nid=0x3403 runnable
>> 
>> 
>> 
>>  "GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fe042005800
>>  nid=0x2403
>>  runnable
>> 
>> 
>> 
>>  "GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fe042006000
>>  nid=0x2603
>>  runnable
>> 
>> 
>> 
>>  "GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fe042006800
>>  nid=0x2803
>>  runnable
>> 
>> 
>> 
>>  "GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fe042007000
>>  nid=0x2a03
>>  runnable
>> 
>> 
>> 
>>  "GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fe042800800
>>  nid=0x2c03
>>  runnable
>> 
>> 
>> 
>>  "GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fe042801000
>>  nid=0x2e03
>>  runnable
>> 
>> 
>> 
>>  "GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fe042801800
>>  nid=0x3003
>>  runnable
>> 
>> 
>> 
>>  "GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fe042802800
>>  nid=0x3203
>>  runnable
>> 
>> 
>> 
>>  "VM Periodic Task Thread" os_prio=31 tid=0x00007fe04202d800 nid=0x5403
>>  waiting on condition
>> 
>> 
>> 
>>  JNI global references: 341
>> 
>> 
>> 
>>  From:  Jaikiran Pai <jai.forums2013@gmail.com>
>>  Reply-To:  <users@kafka.apache.org>
>>  Date:  Friday, May 20, 2016 at 7:55 AM
>>  To:  <users@kafka.apache.org>
>>  Subject:  Re: newbie: kafka 0.9.0.0 producer does not terminate after
>>  producer.close()
>> 
>>>  > You can take a thread dump (using "jstack <pid-of-your-program>")
when
>>>  > the program doesn't terminate and post that output here. That will tell
>>>  > us which threads are causing the program to not terminate.
>>>  >
>>>  > -Jaikiran
>>>  >
>>>  > On Tuesday 17 May 2016 11:32 PM, Andy Davidson wrote:
>>>>  >>  I wrote a little test client that reads from a file an publishes
using
>>  the
>>>>  >>  0.9.0.0 API. I am contacting to an older 0.8.x sever. I am able
to
>>>> send
>>>>  >>  messages how ever I noticed that once I am done reading the input
file
>>  my
>>>>  >>  test program hangs
>>>>  >>
>>>>  >>  Any idea what I am doing wrong?
>>>>  >>
>>>>  >>  Kind regards
>>>>  >>
>>>>  >>  Andy
>>>>  >>
>>>>  >>
>>>>  >>  public static void main(String[] args) throws IOException {
>>>>  >>  logger.warn("BEGIN");
>>>>  >>
>>>>  >>  readFromFile(cmdLine, producer, topic);
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>  producer.flush();
>>>>  >>
>>>>  >>  producer.close();
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>  logger.warn("END");
>>>>  >>
>>>>  >>  }
>>>>  >>
>>>>  >>
>>>>  >>  private static void readFromFile(CmdLine cmdLine,
>>>> KafkaProducer<String,
>>>>  >>  String> producer,
>>>>  >>
>>>>  >>  String topic) throws IOException {
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>  logger.info("BEGIN");
>>>>  >>
>>>>  >>  BufferedReader reader = cmdLine.getReader();
>>>>  >>
>>>>  >>  String value = null;
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>  while ((value = reader.readLine()) != null) {
>>>>  >>
>>>>  >>  logger.info("sending value: " + value);
>>>>  >>
>>>>  >>  publish(producer, topic, value);
>>>>  >>
>>>>  >>  }
>>>>  >>
>>>>  >>  logger.info("END");
>>>>  >>
>>>>  >>  }
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>  private static void publish(KafkaProducer<String, String>
producer,
>>  String
>>>>  >>  topic, String value) {
>>>>  >>
>>>>  >>    Future<RecordMetadata> response = producer.send(new
>>  ProducerRecord<String,
>>>>  >>  String>(topic, value));
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>  /* TODO
>>>>  >>
>>>>  >>    send() will raise following error.
>>>>  >>
>>>>  >>    It is because we are using a 0.9.0.0 client with an 0.8 server.
The
>>  0.8
>>>>  >>  consumer seems
>>>>  >>
>>>>  >>    to work with out problems
>>>>  >>
>>>>  >>  }
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>  Š
>>>>  >>  INFO  17:02:53 main c.p.g.p.KClient readFromFile line:79 BEGIN
>>>>  >>
>>>>  >>  Š
>>>>  >>  INFO  17:02:54 main c.p.g.p.KClient readFromFile line:85 sending
>>>> value:
>>>>  >>  dependencies {
>>>>  >>
>>>>  >>  Š
>>>>  >>  INFO  17:02:54 main c.p.g.p.KClient readFromFile line:89 END
>>>>  >>
>>>>  >>  Š
>>>>  >>
>>>>  >>  The following error appears to be because we are using 0.9.0.0
api
>>  with an
>>>>  >>  0.8.x sever. If I read from stdin instead of a file I would be
able to
>>>>  >>  continue sending messages. I do not think this is the reason my
test
>>  code
>>>>  >>  hangs.
>>>>  >>
>>>>  >>  ERROR 17:02:54 kafka-producer-network-thread | producer-1
>>  o.a.k.c.p.i.Sender
>>>>  >>  run line:130 Uncaught error in kafka producer I/O thread:
>>>>  >>
>>>>  >>  org.apache.kafka.common.protocol.types.SchemaException: Error
reading
>>  field
>>>>  >>  'throttle_time_ms': java.nio.BufferUnderflowException
>>>>  >>
>>>>  >>  at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
>>>>  >>
>>>>  >>  at
>>>>  >>
>>  org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient
>>>>  >>  .java:464)
>>>>  >>
>>>>  >>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
>>>>  >>
>>>>  >>  at
>>  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>>>>  >>
>>>>  >>  at
>>  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>>>>  >>
>>>>  >>  at java.lang.Thread.run(Thread.java:745)
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>
>>>>  >>
>>>  >
>>>  >
>> 
>> 
>> 
> 



Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message