kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Scott Reynolds <sreyno...@twilio.com>
Subject Consumers closing sockets abruptly?
Date Tue, 27 Jan 2015 17:52:23 GMT
On my brokers I am seeing this error log message:

Closing socket for /X because of error (X is the ip address of a consumer)
> 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer
> 2015-01-27_17:32:58.29890       at
> sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> 2015-01-27_17:32:58.29891       at
> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
> 2015-01-27_17:32:58.29892       at
> sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
> 2015-01-27_17:32:58.29892       at
> kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
> 2015-01-27_17:32:58.29893       at
> kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
> 2015-01-27_17:32:58.29893       at
> kafka.network.MultiSend.writeTo(Transmission.scala:102)
> 2015-01-27_17:32:58.29894       at
> kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
> 2015-01-27_17:32:58.29895       at
> kafka.network.MultiSend.writeTo(Transmission.scala:102)
> 2015-01-27_17:32:58.29895       at
> kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
> 2015-01-27_17:32:58.29896       at
> kafka.network.Processor.write(SocketServer.scala:375)
> 2015-01-27_17:32:58.29896       at
> kafka.network.Processor.run(SocketServer.scala:247)
> 2015-01-27_17:32:58.29897       at java.lang.Thread.run(Thread.java:745)
>

This is because the Processor doesn't handle java.io.IOException and it
falls through to the catch all.

My consumers seem actually really happy. So I don't think there is a real
issue here. But I could use some help figuring out if there is.

We are using the Java consumer like so:

> final ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(config);
>     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>     topicCountMap.put(topicName, new Integer(1));
>     final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> consumer.createMessageStreams(topicCountMap);
>     final KafkaStream<byte[], byte[]> stream =
> consumerMap.get(topicName).get(0);
>

and we just iterate over the stream

Questions:
1. What class is the one that makes the network connection to the consumer?
2. Do legit cases exist where the consumer would close its socket
connection ? Zookeeper issues ? Consumer too far behind ?

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message