kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Igor Kuzmenko <f1she...@gmail.com>
Subject Re: Kafka Producer meta data refresh.
Date Tue, 05 Jul 2016 11:20:17 GMT
Thanks for reply. Can you provide issue id. Link above doesn't open.

On Tue, Jul 5, 2016 at 2:03 PM, Ismael Juma <ismael@juma.me.uk> wrote:

> This looks like the following which was fixed in 0.10.0.0:
>
>
> https://issues.apache.org/jira/secure/EditComment!default.jspa?id=12948347&commentId=15239013
>
> Ismael
>
> On Tue, Jul 5, 2016 at 11:51 AM, Igor Kuzmenko <f1sherox@gmail.com> wrote:
>
> > Hello I'm using kafka 0.9.0 and sending messages to kafka topic via
> > KafkaProducer.
> > My application constantly waits for a new file in directory, reads it,
> than
> > send every line of a file as a message to kafka.
> >
> > KafkaProducer creates at start of application and in logs I can see that
> > every 5 min it request cluster metadata. But after a few minutes metadata
> > requests starting to send too often. Here's log:
> >
> > 13:06:53.094 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > request ClientRequest(expectResponse=true, callback=null,
> >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689920,client_id=producer-1},
> > body={topics=[mobile_connections]})) to node 1001
> > 13:06:53.095 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
> > metadata version 686315 to Cluster(nodes = [Node(1001, jupiter.bss,
> 6667)],
> > partitions = [Partition(topic = mobile_connections, partition = 4,
> leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 2, leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 0, leader =
> > 1001, replicas = [1001,], isr = [1001,]])
> > 13:06:53.194 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > request ClientRequest(expectResponse=true, callback=null,
> >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689921,client_id=producer-1},
> > body={topics=[mobile_connections]})) to node 1001
> > 13:06:53.196 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
> > metadata version 686316 to Cluster(nodes = [Node(1001, jupiter.bss,
> 6667)],
> > partitions = [Partition(topic = mobile_connections, partition = 4,
> leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 2, leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 0, leader =
> > 1001, replicas = [1001,], isr = [1001,]])
> > 13:06:53.294 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > request ClientRequest(expectResponse=true, callback=null,
> >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689922,client_id=producer-1},
> > body={topics=[mobile_connections]})) to node 1001
> > 13:06:53.295 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
> > metadata version 686317 to Cluster(nodes = [Node(1001, jupiter.bss,
> 6667)],
> > partitions = [Partition(topic = mobile_connections, partition = 4,
> leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 2, leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 0, leader =
> > 1001, replicas = [1001,], isr = [1001,]])
> > 13:06:53.394 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > request ClientRequest(expectResponse=true, callback=null,
> >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689923,client_id=producer-1},
> > body={topics=[mobile_connections]})) to node 1001
> > 13:06:53.395 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
> > metadata version 686318 to Cluster(nodes = [Node(1001, jupiter.bss,
> 6667)],
> > partitions = [Partition(topic = mobile_connections, partition = 4,
> leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 2, leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 0, leader =
> > 1001, replicas = [1001,], isr = [1001,]])
> > 13:06:53.494 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > request ClientRequest(expectResponse=true, callback=null,
> >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689924,client_id=producer-1},
> > body={topics=[mobile_connections]})) to node 1001
> > 13:06:53.495 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
> > metadata version 686319 to Cluster(nodes = [Node(1001, jupiter.bss,
> 6667)],
> > partitions = [Partition(topic = mobile_connections, partition = 4,
> leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 2, leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 0, leader =
> > 1001, replicas = [1001,], isr = [1001,]])
> > 13:06:53.594 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > request ClientRequest(expectResponse=true, callback=null,
> >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689925,client_id=producer-1},
> > body={topics=[mobile_connections]})) to node 1001
> > 13:06:53.595 DEBUG org.apache.kafka.clients.Metadata - Updated cluster
> > metadata version 686320 to Cluster(nodes = [Node(1001, jupiter.bss,
> 6667)],
> > partitions = [Partition(topic = mobile_connections, partition = 4,
> leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 2, leader =
> > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> isr =
> > [1001,], Partition(topic = mobile_connections, partition = 0, leader =
> > 1001, replicas = [1001,], isr = [1001,]])
> > 13:06:53.694 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > request ClientRequest(expectResponse=true, callback=null,
> >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689926,client_id=producer-1},
> > body={topics=[mobile_connections]})) to node 1001
> >
> > It doesn't seem right to refresh meta data so often.
> > Maybe problem is that this cluster info is not full. In logs there's
> > information about one topic only, but cluster has three topics.
> >
> > My questions are:
> > Am I using KafkaProducer correctly? Maybe I should close it and create
> new
> > one on each file?
> > Why metadata refresh happens so often?
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message