kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chengwei Yang <chengwei.yang...@gmail.com>
Subject Re: producer & consumer fail if the leader failed-over
Date Fri, 30 May 2014 01:06:46 GMT
Hi Jun,

Thanks for your reply.

On Thu, May 29, 2014 at 07:52:11AM -0700, Jun Rao wrote:
> You need to pass in multiple brokers in --broker-list in the producer. To
> obtain the new leaders, the producer needs to talk to a live broker to get
> the new metadata.

Then I did tried with with 3 broker, and put them all into
--broker-list like below, but still fail with broken pipe.

$ ./bin/kafka-console-producer.sh --broker-list kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092
--topic test-1
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
further details.
ok
will kill leader broker.id=2

fail?
[2014-05-29 18:05:31,973] WARN Failed to send producer request with
correlation id 8 to broker 2 with data for partitions [test-1,0]
(kafka.producer.async.DefaultEventHandler)
java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
	at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
        at sun.nio.ch.IOUtil.write(IOUtil.java:148)

Another question is why the consumer failed?

--
Thanks,
Chengwei

> 
> Thanks,
> 
> Jun
> 
> 
> On Thu, May 29, 2014 at 2:38 AM, Chengwei Yang
> <chengwei.yang.cn@gmail.com>wrote:
> 
> > Hi List,
> >
> > I'm new to kafka, feel sorry if this has been asked, I didn't figure out
> > my answer by googling, so asked here. Thanks in advance!
> >
> > I'm following the kafka quick start
> > http://kafka.apache.org/documentation.html#quickstart
> >
> > and managed setup a kafka cluster with two brokers, which connected to a
> > zookeeper service, consists of 3 zk hosts.
> >
> > The working state looks like below
> > ----------------------8<--------------- topic describe ----8<-------------
> > Topic:test      PartitionCount:3        ReplicationFactor:2     Configs:
> >         Topic: test     Partition: 0    Leader: 0       Replicas: 0,1
> > Isr: 0,1
> >         Topic: test     Partition: 1    Leader: 1       Replicas: 1,0
> > Isr: 1,0
> >         Topic: test     Partition: 2    Leader: 0       Replicas: 0,1
> > Isr: 0,1
> > --------------------8<---------------- end ------------8<--------------
> >
> > -------------------8<------------- console-producer.sh -------8<-------
> > $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further details.
> > this is a test message
> > another test message
> > haha
> > --------------------8<---------------- end ------------8<--------------
> >
> > -------------------8<------------- console-consumer.sh -------8<-------
> > $ ./bin/kafka-console-consumer.sh --zookeeper <my zk service here> --topic
> > test --from-beginning
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further details.
> > this is a test message
> > another test message
> > haha
> > -----------------8<---------------- end ----------------8<-----------
> >
> > So far, everything works, however, when I following the quick start to
> > kill the leader broker for topic *test*, say that one has broker.id 0,
> > then error happended, like below
> >
> > -------------------8<------------- console-consumer.sh -------8<-------
> > [2014-05-29 17:05:26,446] WARN Reconnect due to socket error: Received -1
> > when reading from channel, socket has likely been closed.
> > (kafka.consumer.SimpleConsumer)
> > [2014-05-29 17:05:26,460] ERROR
> > [ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0],
> > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1401;
> > ClientId:
> > console-consumer-40784-ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0;
> > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test,0] ->
> > PartitionFetchInfo(0,1048576),[test,2] -> PartitionFetchInfo(0,1048576)
> > (kafka.consumer.ConsumerFetcherThread)
> > java.net.ConnectException: Connection refused
> >         at sun.nio.ch.Net.connect0(Native Method)
> >         at sun.nio.ch.Net.connect(Net.java:465)
> >         at sun.nio.ch.Net.connect(Net.java:457)
> >         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> >         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >         at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >         at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >         at
> > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >         at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >         at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >         at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >         at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >         at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >         at
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > -----------------8<---------------- end ----------------8<-----------
> >
> > And when I send another message by using console-producer, it fails with
> > the same error, connect refused.
> >
> > From the kafka-topic.sh, it says the leader already failed-over, like
> > below.
> >
> > ----------------------8<--------------- topic describe ----8<-------------
> > Topic:test      PartitionCount:3        ReplicationFactor:2     Configs:
> >         Topic: test     Partition: 0    Leader: 1       Replicas: 0,1
> > Isr: 1
> >         Topic: test     Partition: 1    Leader: 1       Replicas: 1,0
> > Isr: 1
> >         Topic: test     Partition: 2    Leader: 1       Replicas: 0,1
> > Isr: 1
> > -----------------8<---------------- end ----------------8<-----------
> >
> > However, from the quick start, it seems it shouldn't fail at all.
> >
> > --
> > Thanks,
> > Chengwei
> >

Mime
View raw message