kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Wang <chen.apache.s...@gmail.com>
Subject Re: Consumer keeps looking connection
Date Sun, 02 Nov 2014 18:03:41 GMT
When I check the server log, it also flushes with exception:

[2014-11-02 10:00:32,284] ERROR Closing socket for /10.93.80.119 because of
error (kafka.network.Processor)
java.io.IOException: Broken pipe
        at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
        at
sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
        at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
        at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
        at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
        at kafka.network.MultiSend.writeTo(Transmission.scala:102)
        at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
        at kafka.network.MultiSend.writeTo(Transmission.scala:102)
        at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
        at kafka.network.Processor.write(SocketServer.scala:405)
        at kafka.network.Processor.run(SocketServer.scala:265)
        at java.lang.Thread.run(Thread.java:744)

On Sat, Nov 1, 2014 at 9:46 PM, Chen Wang <chen.apache.solr@gmail.com>
wrote:

> Hello Folks,
> I am using Highlevel consumer, and it seems to drop connections
> intermittently:
>
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> Received -1 when reading from channel, socket has likely been closed.
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20220;
> ClientId:
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304),[test_topic,21]
> -> PartitionFetchInfo(141266339,4194304)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
>         at kafka.network.BlockingChannel.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
>         at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
>         at kafka.utils.ShutdownableThread.run(Unknown Source)
> 2014-11-01 13:34:40 VerifiableProperties [INFO] Verifying properties
>
> or sometimes:
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> null
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20222;
> ClientId:
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
>         at kafka.network.BlockingChannel.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
>         at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
>         at kafka.utils.ShutdownableThread.run(Unknown Source)
>
> The config I am using is:
> kafka.config.fetch.message.max.bytes4194304kafka.config.group.idmygroupid
> kafka.config.rebalance.backoff.ms6000kafka.config.rebalance.max.retries6
> kafka.config.zookeeper.connectbrokerlist
> kafka.config.zookeeper.session.timeout.ms60000
> There should not be any network connectivity issue as all the zookeepers,
> brokers,consumers are in the same cluster.
>
> What would be the cause for the connection reset error? Is it because the
> zookeeper cannot talk to the broker to get the partitionInfo?
> Thanks,
> Chen
>
>
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message