kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Kafka issue with "Reconnect due to socket error"
Date Wed, 10 Jul 2013 04:07:35 GMT
The reconnect is due to the socket connection to Kafka broker. Technically,
reconnect worked. So the tool still produces the output. The reconnection
is weird though. Do you see the same reconnection when you run console
consumer?

Thanks,

Jun


On Tue, Jul 9, 2013 at 11:06 AM, Dennis Haller <dhaller@talemetry.com>wrote:

> Hi,
>
> Thanks for your reply.
>
> I can't see anything in the broker logs, , except this keeps coming up in
> the zookeeper log (instance 1 out of 3):
> WARN  [SyncThread:1:FileTxnLog@321] - fsync-ing the write ahead log in
> SyncThread:1 took 1298ms which will adversely effect operation latency. See
> the ZooKeeper troubleshooting guide
>
>
> I'm not sure if the connection error is from zookeeper or a kafka broker.
> In an edited version of the log file, you see that a normal cycle of
> ConsmerOffsetChecker requires 5 ZK queries before the stats are printed
> out, but when the Exception is thrown it is after 4 queries have completed
> and somewhere in the 5th query:
>
>
> [2013-07-09 10:45:41,845] INFO Client
> environment:user.dir=/opt/kafka/kafka-0.8 (org.apache.zookeeper.ZooKeeper)
>  18 [2013-07-09 10:45:41,846] INFO Initiating client connection,
> connectString=ec2-23-22-34-191.compute-1.amazonaws.comsessionTimeout=30000
> watcher=org.I0Itec.zkclient.ZkClient@3c0db454 (>
>  19 [2013-07-09 10:45:41,848] DEBUG zookeeper.disableAutoWatchReset is
> false (org.apache.zookeeper.ClientCnxn)
>  20 [2013-07-09 10:45:41,934] DEBUG Awaiting connection to Zookeeper server
> (org.I0Itec.zkclient.ZkClient)
>  21 [2013-07-09 10:45:41,934] DEBUG Waiting for keeper state SyncConnected
> (org.I0Itec.zkclient.ZkClient)
>  22 [2013-07-09 10:45:41,934] INFO Opening socket connection to server
>
> ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181(org.apache.zookeeper.ClientCnxn)
>  23 [2013-07-09 10:45:42,020] INFO Socket connection established to
> ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181, initiating
> session (org.apache.zookeeper.ClientCnxn)
>  24 [2013-07-09 10:45:42,024] DEBUG Session establishment request sent on
>
> ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181(org.apache.zookeeper.ClientCnxn)
>  25 [2013-07-09 10:45:42,114] INFO Session establishment complete on server
> ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181, sessionid =
> 0x13fc464f0420009, negotiated timeout = >
>  26 [2013-07-09 10:45:42,118] DEBUG Received event: WatchedEvent
> state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
>  27 [2013-07-09 10:45:42,118] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
>  28 [2013-07-09 10:45:42,118] DEBUG Leaving process event
> (org.I0Itec.zkclient.ZkClient)
>  29 [2013-07-09 10:45:42,118] DEBUG State is SyncConnected
> (org.I0Itec.zkclient.ZkClient)
>  30 [2013-07-09 10:45:42,265] DEBUG Reading reply
> sessionid:0x13fc464f0420009, packet:: clientPath:null serverPath:null
> finished:false header:: 1,8  replyHeader:: 1,12884971368,0  request:>
>  31 Group           Topic                          Pid Offset
>  logSize         Lag             Owner
>  32
>  33 [2013-07-09 10:45:42,394] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/brokers/topics/qa-M-Candidate-CrmStage-Events,F
>  (org.apache.zookeeper.ClientCnxn)
>  34 [2013-07-09 10:45:42,684] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/consumers/KafkaMirror/offsets/qa-M-Candidate-CrmStage-Events/0,F
>  (org.apache.zookeeper>
>  35 [2013-07-09 10:45:42,768] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/consumers/KafkaMirror/owners/qa-M-Candidate-CrmStage-Events/0,F
>  (org.apache.zookeeper.>
>  36 [2013-07-09 10:45:42,849] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/brokers/topics/qa-M-Candidate-CrmStage-Events/partitions/0/state,F
>  (org.apache.zookeep>
>  37 [2013-07-09 10:45:42,971] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request:: '/brokers/ids/2,F
>  (org.apache.zookeeper.ClientCnxn)
>  38 KafkaMirror     qa-M-Candidate-CrmStage-Events 0   15              15
>            0
> KafkaMirror_ip-10-41-3-33.ec2.internal-1373310093967-e20aba69-0
>  39
>  40 [2013-07-09 10:45:44,061] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/brokers/topics/qa-M-Match,F  (org.apache.zookeeper.ClientCnxn)
>  41 [2013-07-09 10:45:44,150] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/consumers/KafkaMirror/offsets/qa-M-Match/0,F
>  (org.apache.zookeeper.ClientCnxn)
>  42 [2013-07-09 10:45:44,233] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/consumers/KafkaMirror/owners/qa-M-Match/0,F
>  (org.apache.zookeeper.ClientCnxn)
>  43 [2013-07-09 10:45:44,314] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/brokers/topics/qa-M-Match/partitions/0/state,F
>  (org.apache.zookeeper.ClientCnxn)
>  44 [2013-07-09 10:45:44,329] INFO Reconnect due to socket error:
>  (kafka.consumer.SimpleConsumer)
>  45 java.nio.channels.ClosedChannelException
>  46 .........
>  47   at
> kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
>  48   at
> kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
>  49 KafkaMirror     qa-M-Match                     0   76              76
>            0
> KafkaMirror_ip-10-41-3-33.ec2.internal-1373310093967-e20aba69-0
>  50
>
>
>
> On Mon, Jul 8, 2013 at 8:56 PM, Jun Rao <junrao@gmail.com> wrote:
>
> > The consumer reconnects because the broker closed the socket. Any
> > error/exception on the broker side around the same time?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Jul 8, 2013 at 10:25 AM, Dennis Haller <dhaller@talemetry.com
> > >wrote:
> >
> > > I have a 4-broker Kafka system running in Amazon EC2,  and we are using
> > > Kafka 0.8 beta1. Most of the standard default configurations remain
> > > unchanged. Running the kafka tool ConsumerOffsetChecker is causing
> socket
> > > errors to occur. Some of these socket reset errors are also in the
> kafka
> > > server log.
> > >
> > >  Usually the first few message topics are printed, but then every
> > > subsequent one is causing a socket error. The entire command completes
> in
> > > only 3 or 4 seconds, so I don't know where the timeouts are coming
> from.
> > >
> > > Do you have any suggestions?
> > >
> > >
> > > Here is the log:
> > >
> > >
> > >
> > > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> > > ArchivingConsumer --zkconnect
> > > ec2-23-22-34-191.compute-1.amazonaws.com:2181
> > > [2013-07-08 10:14:33,172] INFO Starting ZkClient event thread.
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > [2013-07-08 10:14:33,186] INFO Client
> > > environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47
> > GMT
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,186] INFO Client
> > > environment:host.name=ip-10-41-3-33.ec2.internal
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,186] INFO Client environment:java.version=1.7.0_21
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,186] INFO Client environment:java.vendor=Oracle
> > > Corporation (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,186] INFO Client
> > > environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client
> > >
> > >
> >
> environment:java.class.path=:bin/../core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar:bin/../core/target/scala-2.8.0/kafka-assembly-0.8.0-beta1-deps.jar:bin/../perf/target/scala-2.8.0/kafka-perf_2.8.0-0.8.0-beta1.jar:bin/../libs/*.jar:bin/../kafka*.jar
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client
> > >
> > >
> >
> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/jni:/lib:/usr/lib
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client environment:java.io.tmpdir=/tmp
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client environment:java.compiler=<NA>
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client environment:os.name=Linux
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client environment:os.arch=amd64
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,188] INFO Client
> > > environment:os.version=3.2.0-32-virtual
> (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,188] INFO Client environment:user.name=ubuntu
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,188] INFO Client
> environment:user.home=/home/ubuntu
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,188] INFO Client
> > > environment:user.dir=/opt/kafka/kafka-0.8
> > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,189] INFO Initiating client connection,
> > connectString=
> > > ec2-23-22-34-191.compute-1.amazonaws.com:2181 sessionTimeout=30000
> > > watcher=org.I0Itec.zkclient.ZkClient@737c45ee
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,216] INFO Opening socket connection to server
> > >
> > >
> >
> ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181(org.apache.zookeeper.ClientCnxn)
> > > [2013-07-08 10:14:33,226] INFO Socket connection established to
> > > ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181,
> initiating
> > > session (org.apache.zookeeper.ClientCnxn)
> > > [2013-07-08 10:14:33,300] INFO Session establishment complete on server
> > > ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181,
> sessionid =
> > > 0x13fb0932c3a002a, negotiated timeout = 30000
> > > (org.apache.zookeeper.ClientCnxn)
> > > [2013-07-08 10:14:33,302] INFO zookeeper state changed (SyncConnected)
> > > (org.I0Itec.zkclient.ZkClient)
> > > Group           Topic                          Pid Offset
> >  logSize
> > >         Lag             Owner
> > > ArchivingConsumer qa-M-Candidate-CrmStatus-Events 0   30
>  30
> > >            0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > ArchivingConsumer qa-M-Match                     0   36              36
> > >          0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > ArchivingConsumer qa-M-friday-01                 0   100
> 100
> > >           0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > ArchivingConsumer qa-M-test-01                   0   200
> 200
> > >           0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > [2013-07-08 10:14:35,223] INFO Reconnect due to socket error:
> > >  (kafka.consumer.SimpleConsumer)
> > > java.nio.channels.ClosedChannelException
> > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> > >  at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > >  at
> > >
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > >  at scala.collection.immutable.List.foreach(List.scala:45)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > >  at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > at scala.collection.immutable.List.foreach(List.scala:45)
> > >  at
> > >
> kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> > > at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> > >
> > > . . . repeated for each topic . . .
> > >
> > > ArchivingConsumer qa-f1-Match                    0   14              14
> > >          0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > [2013-07-08 10:14:35,906] INFO Reconnect due to socket error:
> > >  (kafka.consumer.SimpleConsumer)
> > > java.nio.channels.ClosedChannelException
> > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > > at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > at
> > kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> > > at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> > > ArchivingConsumer qa-f3-T-Campaign-Email         0   1               1
> > >           0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > [2013-07-08 10:14:35,918] INFO Terminate ZkClient event thread.
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > [2013-07-08 10:14:35,923] INFO EventThread shut down
> > > (org.apache.zookeeper.ClientCnxn)
> > > [2013-07-08 10:14:35,923] INFO Session: 0x13fb0932c3a002a closed
> > > (org.apache.zookeeper.ZooKeeper)
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message