kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arjun <ar...@socialtwist.com>
Subject Re: kafka consumer not consuming messages
Date Thu, 13 Feb 2014 08:01:16 GMT
The set up has 3 kafka brokers running on 3 different ec2 nodes (I added 
the host.name in broker config). I am not committing any messages in my 
consumer. The consumer is exact replica of the ConsumerGroupExample.

The test machine (10.60.15.123) is outside these systems security group 
but has all ports opened both tcp and udp

If i run the same code on any system which is in the same security 
group, things work pretty fine. I feel there is something to do with the 
ec2node set up.

Consumer offset :

ubuntu@ip-10-235-39-219:~/kafka/new/kafka_2.8.0-0.8.0$ 
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1 
--zkconnect 10.235.39.219:2181,10.249.171.5:2181,10.243.42.35:2181 
--topic taf.referral.emails.service
Group           Topic                          Pid Offset 
logSize         Lag             Owner
group1          taf.referral.emails.service    0   6 6               
0               none
group1          taf.referral.emails.service    1   3 3               
0               none
group1          taf.referral.emails.service    2   3 3               
0               none


Kafka.log :

[2014-02-13 02:15:09,837] TRACE Processor id 0 selection time = 300 ms 
(kafka.network.Processor)
[2014-02-13 02:15:09,916] DEBUG Accepted connection from /10.60.15.123 
on /10.235.39.219:9092. sendBufferSize [actual|requested]: 
[131071|1048576] recvBufferSize [actual|requested]: [131071|1048576] 
(kafka.network.Acceptor)
[2014-02-13 02:15:09,916] TRACE Processor id 0 selection time = 79 ms 
(kafka.network.Processor)
[2014-02-13 02:15:09,917] DEBUG Processor 0 listening to new connection 
from /10.60.15.123:45056 (kafka.network.Processor)
[2014-02-13 02:15:09,972] TRACE Processor id 0 selection time = 55 ms 
(kafka.network.Processor)
[2014-02-13 02:15:09,972] TRACE 195 bytes read from /10.60.15.123:45056 
(kafka.network.Processor)
[2014-02-13 02:15:09,997] DEBUG Got ping response for sessionid: 
0x2442a181bfb0000 after 1ms (org.apache.zookeeper.ClientCnxn)
[2014-02-13 02:15:10,036] DEBUG [FetchRequestPurgatory-0] Expiring fetch 
request Name: FetchRequest; Version: 0; CorrelationId: 54; ClientId: 
ReplicaFetcherThread-0-0; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 
bytes; RequestInfo: [taf.referral.emails.service,1] -> 
PartitionFetchInfo(3,1048576). 
(kafka.server.KafkaApis$FetchRequestPurgatory)



Kafka -request log

[2014-02-13 02:15:09,837] TRACE Processor id 0 selection time = 300 ms 
(kafka.network.Processor)
[2014-02-13 02:15:09,916] TRACE Processor id 0 selection time = 79 ms 
(kafka.network.Processor)
[2014-02-13 02:15:09,917] DEBUG Processor 0 listening to new connection 
from /10.60.15.123:45056 (kafka.network.Processor)
[2014-02-13 02:15:09,972] TRACE Processor id 0 selection time = 55 ms 
(kafka.network.Processor)
[2014-02-13 02:15:09,972] TRACE 195 bytes read from /10.60.15.123:45056 
(kafka.network.Processor)
[2014-02-13 02:15:09,973] TRACE Processor 0 received request : Name: 
FetchRequest; Version: 0; CorrelationId: 0; ClientId: 
group1-ConsumerFetcherThread-group1_ec2-54-225-44-248.compute-1.amazonaws.com-1392275707952-31ee2fed-0-0;

ReplicaId: -1; MaxWait: 1000000 ms; MinBytes: 100 bytes; RequestInfo: 
[taf.referral.emails.service,2] -> 
PartitionFetchInfo(3,1048576),[taf.referral.emails.service,1] -> 
PartitionFetchInfo(3,1048576) (kafka.network.RequestChannel$)
[2014-02-13 02:15:09,973] TRACE [KafkaApi-0] Handling request: Name: 
FetchRequest; Version: 0; CorrelationId: 0; ClientId: 
group1-ConsumerFetcherThread-group1_ec2-54-225-44-248.compute-1.amazonaws.com-1392275707952-31ee2fed-0-0;

ReplicaId: -1; MaxWait: 1000000 ms; MinBytes: 100 bytes; RequestInfo: 
[taf.referral.emails.service,2] -> 
PartitionFetchInfo(3,1048576),[taf.referral.emails.service,1] -> 
PartitionFetchInfo(3,1048576) from client: /10.60.15.123:45056 
(kafka.server.KafkaApis)
[2014-02-13 02:15:09,973] TRACE [KafkaApi-0] Fetching log segment for 
topic, partition, offset, size = 
(taf.referral.emails.service,2,3,1048576) (kafka.server.KafkaApis)
[2014-02-13 02:15:09,974] TRACE [KafkaApi-0] Fetching log segment for 
topic, partition, offset, size = 
(taf.referral.emails.service,1,3,1048576) (kafka.server.KafkaApis)
[2014-02-13 02:15:09,974] DEBUG [KafkaApi-0] Putting fetch request with 
correlation id 0 from client 
group1-ConsumerFetcherThread-group1_ec2-54-225-44-248.compute-1.amazonaws.com-1392275707952-31ee2fed-0-0

into purgatory (kafka.server.KafkaApis)




On Wednesday 12 February 2014 09:26 PM, Jun Rao wrote:
> Interesting. So you have 4 messages in the broker. The checkpointed offset
> for the consumer is at the 3rd message. Did you change the default setting
> of auto.commit.enable? Also, if you look at the
> request log, what's the offset in the fetch request from this consumer?
> Thanks,
> Jun
>
>
> On Tue, Feb 11, 2014 at 10:07 PM, Arjun <arjun@socialtwist.com> wrote:
>
>> The topic name is correct, the o/p of the ConsumerOffserChecker is
>>
>> arjunn@arjunn-lt:~/Downloads/Kafka0.8/new/kafka_2.8.0-0.8.0$
>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1
>> --zkconnect 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --topic
>> taf.referral.emails.service
>> Group           Topic                          Pid Offset logSize
>> Lag             Owner
>> group1          taf.referral.emails.service    0   2 4               2
>> group1_arjunn-lt-1392133080519-e24b249b-0
>> group1          taf.referral.emails.service    1   2 4               2
>> group1_arjunn-lt-1392133080519-e24b249b-0
>>
>> thanks
>> Arjun Narasimha Kota
>>
>>
>>
>>
>> On Wednesday 12 February 2014 10:21 AM, Jun Rao wrote:
>>
>>> Could you double check that you used the correct topic name? If so, could
>>> you run ConsumerOffsetChecker as described in
>>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ and see if there is
>>> any lag?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>> On Tue, Feb 11, 2014 at 8:45 AM, Arjun Kota <arjun@socialtwist.com>
>>> wrote:
>>>
>>>   fetch.wait.max.ms=10000
>>>> fetch.min.bytes=128
>>>>
>>>> My message size is much more than that.
>>>> On Feb 11, 2014 9:21 PM, "Jun Rao" <junrao@gmail.com> wrote:
>>>>
>>>>   What's the fetch.wait.max.ms and fetch.min.bytes you used?
>>>>> Thanks,
>>>>>
>>>>> Jun
>>>>>
>>>>>
>>>>> On Tue, Feb 11, 2014 at 12:54 AM, Arjun <arjun@socialtwist.com>
wrote:
>>>>>
>>>>>   With the same group id from the console consumer its working fine.
>>>>>>
>>>>>> On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote:
>>>>>>
>>>>>>   Arjun,
>>>>>>> Are you using the same group name for the console consumer and
the
>>>>>>>
>>>>>> java
>>>>> consumer?
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Feb 10, 2014 at 11:38 PM, Arjun <arjun@socialtwist.com>
>>>>>>>
>>>>>> wrote:
>>>>>    Hi Jun,
>>>>>>>> No its not that problem. I am not getting what the problem
is can you
>>>>>>>> please help.
>>>>>>>>
>>>>>>>> thanks
>>>>>>>> Arjun Narasimha Kota
>>>>>>>>
>>>>>>>>
>>>>>>>> On Monday 10 February 2014 09:10 PM, Jun Rao wrote:
>>>>>>>>
>>>>>>>>    Does
>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
>>>>>>>>> Whydoesmyconsumernevergetanydata?
>>>>>>>>> apply?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Jun
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Feb 9, 2014 at 10:27 PM, Arjun <arjun@socialtwist.com>
>>>>>>>>>
>>>>>>>> wrote:
>>>>>     Hi,
>>>>>>>>>   I started using kafka some time back. I was experimenting
with 0.8.
>>>>>>>>> My
>>>>>> problem is the kafka is unable to consume the messages. My
>>>>>>>>>> configuration
>>>>>>>>>> is kafka broker on the local host and zookeeper on
the local host.
>>>>>>>>>>
>>>>>>>>> I
>>>>> have only one broker and one consumer at present.
>>>>>>>>>> What have I done:
>>>>>>>>>>          1) I used the java examples in the kafka
src and pushed
>>>>>>>>>> some
>>>>>>>>>>
>>>>>>>>> 600
>>>>>> messages to the broker
>>>>>>>>>>          2) I used the console consumer to check
weather the
>>>>>>>>>> messages
>>>>>>>>>>
>>>>>>>>> are
>>>>>> there in the broker or not. Console consumer printed all 600
>>>>>>>>> messages
>>>>>          3) Now i used the java Consumer code, and tried to get those
>>>>>>>>>> messages. This is not printing any messages. It just
got stuck
>>>>>>>>>>
>>>>>>>>>> When was it working earlier:
>>>>>>>>>>          -When i tried with three brokers and three
consumers in the
>>>>>>>>>>
>>>>>>>>> same
>>>>>> machine, with the same configuration it worked fine.
>>>>>>>>>>          -I changed the properties accordingly when
i tried to make
>>>>>>>>>>
>>>>>>>>> it
>>>>> work
>>>>>>>>>> with one broker and one consumer
>>>>>>>>>>
>>>>>>>>>> What does log say:
>>>>>>>>>>          - attaching the logs even
>>>>>>>>>>
>>>>>>>>>> If some one points me where I am doing wrong it would
be helpful.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Arjun Narasimha Kota
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>


Mime
View raw message