kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arjun <ar...@socialtwist.com>
Subject Re: consumer not consuming messages
Date Fri, 11 Apr 2014 04:26:29 GMT
i see this in the consumer logs
[kafka.consumer.ConsumerFetcherManager] 
[ConsumerFetcherManager-1397188062631] Adding fetcher for partition 
[taf.referral.emails.service,11], initOffset 250 to broker 1 with 
fetcherId 0

but no data and i get this warning

[ConsumerFetcherThread-group1_ip-10-91-35-43-1397188061429-b5ff1205-0-1] 
[kafka.consumer.ConsumerFetcherThread] 
[ConsumerFetcherThread-group1_ip-10-91-35-43-1397188061429-b5ff1205-0-1], Error 
in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId: 
group1-ConsumerFetcherThread-group1_ip-10-91-35-43-1397188061429-b5ff1205-0-1; 
ReplicaId: -1; MaxWait: 180000 ms; MinBytes: 1 bytes; RequestInfo: 
[taf.referral.emails.service,0] -> PartitionFetchInfo(253,10485760)
java.net.SocketTimeoutException
     at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
     at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
     at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
     at kafka.utils.Utils$.read(Utils.scala:395)
     at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
     at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
     at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
     at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
     at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
     at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
     at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
     at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


Thanks
Arjun Narasimha Kota



On Friday 11 April 2014 09:51 AM, Arjun wrote:
> I hope this one would give u  a better idea.
>
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group 
> group1 --zkconnect zkhost:port --topic testtopic
> Group           Topic                          Pid Offset          
> logSize         Lag             Owner
> group1          testtopic    0   253             253 0               
> group1_ip-xx-1397188061429-b5ff1205-0
> group1          testtopic    1   267             267 0               
> group1_ip-xx-1397188061429-b5ff1205-0
> group1          testtopic    2   254             254 0               
> group1_ip-xx-1397188061429-b5ff1205-0
> group1          testtopic    3   265             265 0               
> group1_ip-xx-1397188061429-b5ff1205-0
> group1          testtopic    4   261             261 0               
> group1_ip-xx-1397188061429-b5ff1205-1
> group1          testtopic    5   294             294 0               
> group1_ip-xx-1397188061429-b5ff1205-1
> group1          testtopic    6   248             248 0               
> group1_ip-xx-1397188061429-b5ff1205-1
> group1          testtopic    7   271             271 0               
> group1_ip-xx-1397188061429-b5ff1205-1
> group1          testtopic    8   240             240 0               
> group1_ip-xx-1397188061429-b5ff1205-2
> group1          testtopic    9   261             261 0               
> group1_ip-xx-1397188061429-b5ff1205-2
> group1          testtopic    10  290             290 0               
> group1_ip-xx-1397188061429-b5ff1205-2
> group1          testtopic    11  250             251 1               
> group1_ip-xx-1397188061429-b5ff1205-2
>
> If you see the output, in the last line the lag is 1 for that 
> partition. I just send one message. This topic is not new as you see 
> there are lot of messages which have accumlated from yesterday. This 
> one message will not be consumed by consumer what so ever. But if i 
> send some 10 messages then all the messages are consumed.
>
> Please let me know if i have to change any consumer properties.
>
> My consumer properties are :
> "fetch.wait.max.ms"="180000"
> "fetch.min.bytes" = "1"
> "auto.offset.reset" = "smallest"
> "auto.commit.enable"=  "false"
> "fetch.message.max.bytes" = "1048576"
>
> Thanks
> Arjun Narasimha Kota
> On Friday 11 April 2014 06:23 AM, Arjun Kota wrote:
>>
>> The consumer uses do specific topics.
>>
>> On Apr 11, 2014 6:23 AM, "Arjun Kota" <arjun@socialtwist.com 
>> <mailto:arjun@socialtwist.com>> wrote:
>>
>>     Yes the message shows up on the server.
>>
>>     On Apr 11, 2014 12:07 AM, "Guozhang Wang" <wangguoz@gmail.com
>>     <mailto:wangguoz@gmail.com>> wrote:
>>
>>         Hi Arjun,
>>
>>         If you only send one message, does that message show up on
>>         the server? Does
>>         you consumer use wildcard topics or specific topics?
>>
>>         Guozhang
>>
>>
>>         On Thu, Apr 10, 2014 at 9:20 AM, Arjun <arjun@socialtwist.com
>>         <mailto:arjun@socialtwist.com>> wrote:
>>
>>         > But  we have auto offset reset to smallest not largest,
>>         even then this
>>         > issue arises? If so is there any work around?
>>         >
>>         > Thanks
>>         > Arjun NArasimha Kota
>>         >
>>         >
>>         > On Thursday 10 April 2014 09:39 PM, Guozhang Wang wrote:
>>         >
>>         >> It could be https://issues.apache.org/jira/browse/KAFKA-1006.
>>         >>
>>         >> Guozhang
>>         >>
>>         >>
>>         >> On Thu, Apr 10, 2014 at 8:50 AM, Arjun
>>         <arjun@socialtwist.com <mailto:arjun@socialtwist.com>> wrote:
>>         >>
>>         >>  its auto created
>>         >>> but even after topic creation this is the scenario
>>         >>>
>>         >>> Arjun
>>         >>>
>>         >>> On Thursday 10 April 2014 08:41 PM, Guozhang Wang wrote:
>>         >>>
>>         >>>  Hi Arjun,
>>         >>>>
>>         >>>> Did you manually create the topic or use
>>         auto.topic.creation?
>>         >>>>
>>         >>>> Guozhang
>>         >>>>
>>         >>>>
>>         >>>> On Thu, Apr 10, 2014 at 7:39 AM, Arjun
>>         <arjun@socialtwist.com <mailto:arjun@socialtwist.com>> wrote:
>>         >>>>
>>         >>>>   Hi,
>>         >>>>
>>         >>>>> We have 3 node kafka 0.8 setup with zookeepers
>>         ensemble. We use high
>>         >>>>> level
>>         >>>>> consumer with auto commit offset false. I am facing
>>         some peculiar
>>         >>>>> problem
>>         >>>>> with kafka. When i send some 10-20 messages or so the
>>         consumer starts
>>         >>>>> to
>>         >>>>> consume the messages. But if  i send only one message
>>         to kafka, then
>>         >>>>> even
>>         >>>>> though consumer is active it is not trying to fetch
the
>>         message. There
>>         >>>>> is
>>         >>>>> nothing in logs, just the messages are being fetched
by
>>         the kafka
>>         >>>>> consumer.
>>         >>>>> The messages are there in the Kafka server. Can some
>>         one let me know
>>         >>>>> where
>>         >>>>> i am doing wrong.
>>         >>>>>
>>         >>>>>
>>         >>>>> Thanks
>>         >>>>> Arjun Narasimha Kota
>>         >>>>>
>>         >>>>>
>>         >>>>>
>>         >>>>
>>         >>
>>         >
>>
>>
>>         --
>>         -- Guozhang
>>
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message