kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fumo, Vincent" <Vincent_F...@cable.comcast.com>
Subject Re: Metadata Request Loop?
Date Mon, 25 Apr 2016 13:38:12 GMT


My code is very straightforward. I create a producer, and then call it to send messages. Here
is the factory method::

public Producer<String, String> createProducer() {

    Properties props = new Properties();

    props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("acks", "all");
    props.put("retries", "0");
    props.put("batch.size", "1638");
    props.put("linger.ms", "1");
    props.put("buffer.memory", "33554432");
    props.put("compression.type", "gzip");
    props.put("client.id", "sds-merdevl");

    return new KafkaProducer<>(props);
}

That is injected into a single instance of KafkaNotifier::

public class KafkaNotifier implements MessageNotifier {
    private static Logger logger = LoggerFactory.getLogger(KafkaNotifier.class);

    private Producer<String, String> producer;
    private String topicName;

    @Override
    public void sendNotificationMessage(DataObjectModificationMessage message) {

        String json = message.asJSON();
        String key = message.getObject().getId().toString();

        producer.send(new ProducerRecord<>(topicName, key, json));

        if (logger.isDebugEnabled()) {
            logger.debug("Sent Kafka message for object with id={}", key);
        }
    }

    @Required
    public void setProducer(Producer<String, String> producer) {
        this.producer = producer;
    }

    @Required
    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }
}

Here is the topic config info :

./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe --topic s.notifications.dev
Topic:s.notifications.dev	PartitionCount:1	ReplicationFactor:1	Configs:retention.ms=172800000,cleanup.policy=compact
	Topic: s.notifications.dev	Partition: 0	Leader: 0	Replicas: 0	Isr: 0





> On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.tech@gmail.com> wrote:
> 
> 2 producer's to same topic should not be a problem. There can be multiple
> producers and consumers of same kafka topic.
> 
> I am not sure what can be wrong here. I can this at my end If you can share
> producer code and any config of topic ot broker that you changed and is not
> default.
> 
> Please also check that you are not creating producer every time you send a
> message but reusing producer once created to send multiple messages. Before
> a send and after creation a producers sends this request to fetch metadata.
> I ran a test to publish messages from 2 producers to a topic with 3
> partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more than
> a minute and saw just once for both producers before their 1st send.
> 
> Regards,
> Vinay Sharma
> On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <Vincent_Fumo@cable.comcast.com>
> wrote:
> 
>> Hi. I've not set that value. My producer properties are as follows :
>> 
>> acks=all
>> retries=0
>> bath.size=1638
>> linger.ms=1
>> buffer.memory=33554432
>> compression.type=gzip
>> client.id=sds-merdevl
>> 
>> I have this running on two hosts with the same config. I thought that
>> having the same client.id on each would just consolidate the tracking
>> (same logical name). You don't think there is an issue with 2 producers to
>> the same topic?
>> 
>> 
>> 
>> 
>>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.tech@gmail.com>
>> wrote:
>>> 
>>> Generally a proactive metadata refresh request is sent by producer and
>>> consumer every 5 minutes but this interval can be overriden with
>> property "
>>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes.
>> Check if
>>> you have set this property very low in your producer?
>>> 
>>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent <
>>> Vincent_Fumo@cable.comcast.com> wrote:
>>> 
>>>> I'm testing a kafka install and using the java client. I have a topic
>> set
>>>> up and it appears to work great, but after a while I noticed my log
>>>> starting to fill up with what appears to be some kind of loop for
>> metadata
>>>> updates.
>>>> 
>>>> example::
>>>> 
>>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out  env="md"
>>>> [kafka-producer-network-thread | sds-merdevl]
>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>> 6196 to
>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], partitions
>> =
>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>> replicas
>>>> = [0,], isr = [0,]])
>>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out  env="md"
>>>> [kafka-producer-network-thread | sds-merdevl]
>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>> 6197 to
>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], partitions
>> =
>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>> replicas
>>>> = [0,], isr = [0,]])
>>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out  env="md"
>>>> [kafka-producer-network-thread | sds-merdevl]
>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>> 6198 to
>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], partitions
>> =
>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>> replicas
>>>> = [0,], isr = [0,]])
>>>> 
>>>> etc.
>>>> 
>>>> It hasn't stopped..
>>>> 
>>>> I'm curious about what's going on here. Can anyone help?
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>> 
>> 


Mime
View raw message