kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Igor Kuzmenko <f1she...@gmail.com>
Subject Kafka Producer meta data refresh.
Date Tue, 05 Jul 2016 10:51:22 GMT
Hello I'm using kafka 0.9.0 and sending messages to kafka topic via
KafkaProducer.
My application constantly waits for a new file in directory, reads it, than
send every line of a file as a message to kafka.

KafkaProducer creates at start of application and in logs I can see that
every 5 min it request cluster metadata. But after a few minutes metadata
requests starting to send too often. Here's log:

13:06:53.094 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
request ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=689920,client_id=producer-1},
body={topics=[mobile_connections]})) to node 1001
13:06:53.095 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
metadata version 686315 to Cluster(nodes = [Node(1001, jupiter.bss, 6667)],
partitions = [Partition(topic = mobile_connections, partition = 4, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 3, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 2, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 1, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 0, leader =
1001, replicas = [1001,], isr = [1001,]])
13:06:53.194 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
request ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=689921,client_id=producer-1},
body={topics=[mobile_connections]})) to node 1001
13:06:53.196 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
metadata version 686316 to Cluster(nodes = [Node(1001, jupiter.bss, 6667)],
partitions = [Partition(topic = mobile_connections, partition = 4, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 3, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 2, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 1, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 0, leader =
1001, replicas = [1001,], isr = [1001,]])
13:06:53.294 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
request ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=689922,client_id=producer-1},
body={topics=[mobile_connections]})) to node 1001
13:06:53.295 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
metadata version 686317 to Cluster(nodes = [Node(1001, jupiter.bss, 6667)],
partitions = [Partition(topic = mobile_connections, partition = 4, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 3, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 2, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 1, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 0, leader =
1001, replicas = [1001,], isr = [1001,]])
13:06:53.394 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
request ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=689923,client_id=producer-1},
body={topics=[mobile_connections]})) to node 1001
13:06:53.395 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
metadata version 686318 to Cluster(nodes = [Node(1001, jupiter.bss, 6667)],
partitions = [Partition(topic = mobile_connections, partition = 4, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 3, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 2, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 1, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 0, leader =
1001, replicas = [1001,], isr = [1001,]])
13:06:53.494 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
request ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=689924,client_id=producer-1},
body={topics=[mobile_connections]})) to node 1001
13:06:53.495 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
metadata version 686319 to Cluster(nodes = [Node(1001, jupiter.bss, 6667)],
partitions = [Partition(topic = mobile_connections, partition = 4, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 3, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 2, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 1, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 0, leader =
1001, replicas = [1001,], isr = [1001,]])
13:06:53.594 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
request ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=689925,client_id=producer-1},
body={topics=[mobile_connections]})) to node 1001
13:06:53.595 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
metadata version 686320 to Cluster(nodes = [Node(1001, jupiter.bss, 6667)],
partitions = [Partition(topic = mobile_connections, partition = 4, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 3, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 2, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic =
mobile_connections, partition = 1, leader = 1001, replicas = [1001,], isr =
[1001,], Partition(topic = mobile_connections, partition = 0, leader =
1001, replicas = [1001,], isr = [1001,]])
13:06:53.694 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
request ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=689926,client_id=producer-1},
body={topics=[mobile_connections]})) to node 1001

It doesn't seem right to refresh meta data so often.
Maybe problem is that this cluster info is not full. In logs there's
information about one topic only, but cluster has three topics.

My questions are:
Am I using KafkaProducer correctly? Maybe I should close it and create new
one on each file?
Why metadata refresh happens so often?

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message