kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fumo, Vincent" <Vincent_F...@cable.comcast.com>
Subject Re: Metadata Request Loop?
Date Tue, 26 Apr 2016 16:24:51 GMT
Hello. I found the issue. The Ops team deployed kafka 0.8.1 and all my code was 0.9.0. Simple
mistake and one that I should have thought of sooner. Once I had them bump up to the latest
kafka all was well. Thank you for your help!

v


> On Apr 25, 2016, at 2:54 PM, vinay sharma <vinsharma.tech@gmail.com> wrote:
> 
> Hi,
> 
> Your code looks good. i don't see any reason there for frequent meta data
> fetch but i will run it to verify.
> 
> I was able to replicate this issue with my consumer today where I killed 2
> brokers out or 3 and ran consumer. I saw a lot of meta data requests in
> wait for new leader for partitions for which those 2 brokers were leader. i
> see below 2 lines frequently as you mentioned in logs
> 
> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated cluster
> metadata version 275 to Cluster(nodes = [Node(1, node1.example.com, 9093)],
> partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1,
> replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition =
> 2, leader = none, replicas = [], isr = [], Partition(topic = kafkaPOCTopic,
> partition = 0, leader = none, replicas = [], isr = []])
> 
> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for
> partition kafkaPOCTopic-0 unavailable for fetching offset, wait for
> metadata refresh
> 
> I also saw this behavior due to below error where i might have killed the
> only ISR available for the topic at that time.
> 
> "Error while fetching metadata with correlation id 242 :
> {kafkaPOCTopic=LEADER_NOT_AVAILABLE}"
> 
> Do you see any such error in your logs?
> 
> Regards,
> Vinay Sharma
> 
> 
> On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent <
> Vincent_Fumo@cable.comcast.com> wrote:
> 
>> 
>> 
>> My code is very straightforward. I create a producer, and then call it to
>> send messages. Here is the factory method::
>> 
>> public Producer<String, String> createProducer() {
>> 
>>    Properties props = new Properties();
>> 
>>    props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092");
>>    props.put("key.serializer",
>> "org.apache.kafka.common.serialization.StringSerializer");
>>    props.put("value.serializer",
>> "org.apache.kafka.common.serialization.StringSerializer");
>>    props.put("acks", "all");
>>    props.put("retries", "0");
>>    props.put("batch.size", "1638");
>>    props.put("linger.ms", "1");
>>    props.put("buffer.memory", "33554432");
>>    props.put("compression.type", "gzip");
>>    props.put("client.id", "sds-merdevl");
>> 
>>    return new KafkaProducer<>(props);
>> }
>> 
>> That is injected into a single instance of KafkaNotifier::
>> 
>> public class KafkaNotifier implements MessageNotifier {
>>    private static Logger logger =
>> LoggerFactory.getLogger(KafkaNotifier.class);
>> 
>>    private Producer<String, String> producer;
>>    private String topicName;
>> 
>>    @Override
>>    public void sendNotificationMessage(DataObjectModificationMessage
>> message) {
>> 
>>        String json = message.asJSON();
>>        String key = message.getObject().getId().toString();
>> 
>>        producer.send(new ProducerRecord<>(topicName, key, json));
>> 
>>        if (logger.isDebugEnabled()) {
>>            logger.debug("Sent Kafka message for object with id={}", key);
>>        }
>>    }
>> 
>>    @Required
>>    public void setProducer(Producer<String, String> producer) {
>>        this.producer = producer;
>>    }
>> 
>>    @Required
>>    public void setTopicName(String topicName) {
>>        this.topicName = topicName;
>>    }
>> }
>> 
>> Here is the topic config info :
>> 
>> ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe
>> --topic s.notifications.dev
>> Topic:s.notifications.dev       PartitionCount:1
>> ReplicationFactor:1     Configs:retention.ms
>> =172800000,cleanup.policy=compact
>>        Topic: s.notifications.dev      Partition: 0    Leader: 0
>> Replicas: 0     Isr: 0
>> 
>> 
>> 
>> 
>> 
>>> On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.tech@gmail.com>
>> wrote:
>>> 
>>> 2 producer's to same topic should not be a problem. There can be multiple
>>> producers and consumers of same kafka topic.
>>> 
>>> I am not sure what can be wrong here. I can this at my end If you can
>> share
>>> producer code and any config of topic ot broker that you changed and is
>> not
>>> default.
>>> 
>>> Please also check that you are not creating producer every time you send
>> a
>>> message but reusing producer once created to send multiple messages.
>> Before
>>> a send and after creation a producers sends this request to fetch
>> metadata.
>>> I ran a test to publish messages from 2 producers to a topic with 3
>>> partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more
>> than
>>> a minute and saw just once for both producers before their 1st send.
>>> 
>>> Regards,
>>> Vinay Sharma
>>> On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <Vincent_Fumo@cable.comcast.com
>>> 
>>> wrote:
>>> 
>>>> Hi. I've not set that value. My producer properties are as follows :
>>>> 
>>>> acks=all
>>>> retries=0
>>>> bath.size=1638
>>>> linger.ms=1
>>>> buffer.memory=33554432
>>>> compression.type=gzip
>>>> client.id=sds-merdevl
>>>> 
>>>> I have this running on two hosts with the same config. I thought that
>>>> having the same client.id on each would just consolidate the tracking
>>>> (same logical name). You don't think there is an issue with 2 producers
>> to
>>>> the same topic?
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.tech@gmail.com>
>>>> wrote:
>>>>> 
>>>>> Generally a proactive metadata refresh request is sent by producer and
>>>>> consumer every 5 minutes but this interval can be overriden with
>>>> property "
>>>>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes.
>>>> Check if
>>>>> you have set this property very low in your producer?
>>>>> 
>>>>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent <
>>>>> Vincent_Fumo@cable.comcast.com> wrote:
>>>>> 
>>>>>> I'm testing a kafka install and using the java client. I have a topic
>>>> set
>>>>>> up and it appears to work great, but after a while I noticed my log
>>>>>> starting to fill up with what appears to be some kind of loop for
>>>> metadata
>>>>>> updates.
>>>>>> 
>>>>>> example::
>>>>>> 
>>>>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out  env="md"
>>>>>> [kafka-producer-network-thread | sds-merdevl]
>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>>>> 6196 to
>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
>> partitions
>>>> =
>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>>>> replicas
>>>>>> = [0,], isr = [0,]])
>>>>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out  env="md"
>>>>>> [kafka-producer-network-thread | sds-merdevl]
>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>>>> 6197 to
>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
>> partitions
>>>> =
>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>>>> replicas
>>>>>> = [0,], isr = [0,]])
>>>>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out  env="md"
>>>>>> [kafka-producer-network-thread | sds-merdevl]
>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>>>> 6198 to
>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
>> partitions
>>>> =
>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>>>> replicas
>>>>>> = [0,], isr = [0,]])
>>>>>> 
>>>>>> etc.
>>>>>> 
>>>>>> It hasn't stopped..
>>>>>> 
>>>>>> I'm curious about what's going on here. Can anyone help?
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 


Mime
View raw message