kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka connection loss with high volume of messages
Date Wed, 25 Jun 2014 15:15:37 GMT
Hello Ahmed,

Did you see any exceptions on the broker logs?

Guozhang


On Wed, Jun 25, 2014 at 7:47 AM, Ahmed H. <ahmed.hammad@gmail.com> wrote:

> Hello All,
>
> I am seeing this issue very frequently when running a high volume of
> messages through Kafka. It starts off well, and it can go on for minutes
> that way, but eventually it reaches a point where the connection to Kafka
> dies, then it reconnects and carries on. This repeats more frequently when
> I have been sending messages for a while. Basically, the more messages I
> send, the more I see this. To give you an idea, a separate process is
> writing about 500k messages to the queue. I see this issue on the consumer
> side that is receiving those messages, after it has received about 60% of
> the messages. Here is the stack trace:
>
> 10:36:14,238 WARN  [kafka.consumer.ConsumerFetcherThread]
>
> (ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0)
>
> [ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId:
>
> test.queue.default-ConsumerFetcherThread-test.queue_localhost-1403704698110-71aecd8d-0-0;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test.queue.resync,0] -> PartitionFetchInfo(0,1048576),[test.queue,0] ->
> PartitionFetchInfo(0,1048576): java.nio.channels.ClosedByInterruptException
>  at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> [rt.jar:1.7.0_25]
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
> [rt.jar:1.7.0_25]
>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>
>
> Since it starts off working as it should, and only runs into this after
> some time, I am inclined to believe that this maybe a memory/GC issue? Not
> quite sure.
>
> I hope I explained it properly. I am having trouble describing it.
>
> Thanks
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message