kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Igor Kuzmenko <f1she...@gmail.com>
Subject Re: Kafka Producer meta data refresh.
Date Tue, 05 Jul 2016 14:43:08 GMT
Thanks, thats defenetly my problem.

On Tue, Jul 5, 2016 at 5:35 PM, Ismael Juma <ismael@juma.me.uk> wrote:

> Sorry, I meant (the edit comment button is too near the permanent link
> button):
>
>
> https://issues.apache.org/jira/browse/KAFKA-3358?focusedCommentId=15239013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15239013
>
> Ismael
>
> On Tue, Jul 5, 2016 at 12:20 PM, Igor Kuzmenko <f1sherox@gmail.com> wrote:
>
> > Thanks for reply. Can you provide issue id. Link above doesn't open.
> >
> > On Tue, Jul 5, 2016 at 2:03 PM, Ismael Juma <ismael@juma.me.uk> wrote:
> >
> > > This looks like the following which was fixed in 0.10.0.0:
> > >
> > >
> > >
> >
> https://issues.apache.org/jira/secure/EditComment!default.jspa?id=12948347&commentId=15239013
> > >
> > > Ismael
> > >
> > > On Tue, Jul 5, 2016 at 11:51 AM, Igor Kuzmenko <f1sherox@gmail.com>
> > wrote:
> > >
> > > > Hello I'm using kafka 0.9.0 and sending messages to kafka topic via
> > > > KafkaProducer.
> > > > My application constantly waits for a new file in directory, reads
> it,
> > > than
> > > > send every line of a file as a message to kafka.
> > > >
> > > > KafkaProducer creates at start of application and in logs I can see
> > that
> > > > every 5 min it request cluster metadata. But after a few minutes
> > metadata
> > > > requests starting to send too often. Here's log:
> > > >
> > > > 13:06:53.094 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > > > request ClientRequest(expectResponse=true, callback=null,
> > > >
> > > >
> > >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689920,client_id=producer-1},
> > > > body={topics=[mobile_connections]})) to node 1001
> > > > 13:06:53.095 DEBUG org.apache.kafka.clients.Metadata - Updated
> cluster
> > > > metadata version 686315 to Cluster(nodes = [Node(1001, jupiter.bss,
> > > 6667)],
> > > > partitions = [Partition(topic = mobile_connections, partition = 4,
> > > leader =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,]])
> > > > 13:06:53.194 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > > > request ClientRequest(expectResponse=true, callback=null,
> > > >
> > > >
> > >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689921,client_id=producer-1},
> > > > body={topics=[mobile_connections]})) to node 1001
> > > > 13:06:53.196 DEBUG org.apache.kafka.clients.Metadata - Updated
> cluster
> > > > metadata version 686316 to Cluster(nodes = [Node(1001, jupiter.bss,
> > > 6667)],
> > > > partitions = [Partition(topic = mobile_connections, partition = 4,
> > > leader =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,]])
> > > > 13:06:53.294 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > > > request ClientRequest(expectResponse=true, callback=null,
> > > >
> > > >
> > >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689922,client_id=producer-1},
> > > > body={topics=[mobile_connections]})) to node 1001
> > > > 13:06:53.295 DEBUG org.apache.kafka.clients.Metadata - Updated
> cluster
> > > > metadata version 686317 to Cluster(nodes = [Node(1001, jupiter.bss,
> > > 6667)],
> > > > partitions = [Partition(topic = mobile_connections, partition = 4,
> > > leader =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,]])
> > > > 13:06:53.394 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > > > request ClientRequest(expectResponse=true, callback=null,
> > > >
> > > >
> > >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689923,client_id=producer-1},
> > > > body={topics=[mobile_connections]})) to node 1001
> > > > 13:06:53.395 DEBUG org.apache.kafka.clients.Metadata - Updated
> cluster
> > > > metadata version 686318 to Cluster(nodes = [Node(1001, jupiter.bss,
> > > 6667)],
> > > > partitions = [Partition(topic = mobile_connections, partition = 4,
> > > leader =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,]])
> > > > 13:06:53.494 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > > > request ClientRequest(expectResponse=true, callback=null,
> > > >
> > > >
> > >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689924,client_id=producer-1},
> > > > body={topics=[mobile_connections]})) to node 1001
> > > > 13:06:53.495 DEBUG org.apache.kafka.clients.Metadata - Updated
> cluster
> > > > metadata version 686319 to Cluster(nodes = [Node(1001, jupiter.bss,
> > > 6667)],
> > > > partitions = [Partition(topic = mobile_connections, partition = 4,
> > > leader =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,]])
> > > > 13:06:53.594 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > > > request ClientRequest(expectResponse=true, callback=null,
> > > >
> > > >
> > >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689925,client_id=producer-1},
> > > > body={topics=[mobile_connections]})) to node 1001
> > > > 13:06:53.595 DEBUG org.apache.kafka.clients.Metadata - Updated
> cluster
> > > > metadata version 686320 to Cluster(nodes = [Node(1001, jupiter.bss,
> > > 6667)],
> > > > partitions = [Partition(topic = mobile_connections, partition = 4,
> > > leader =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic =
> > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,],
> > > isr =
> > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader
> =
> > > > 1001, replicas = [1001,], isr = [1001,]])
> > > > 13:06:53.694 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata
> > > > request ClientRequest(expectResponse=true, callback=null,
> > > >
> > > >
> > >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=689926,client_id=producer-1},
> > > > body={topics=[mobile_connections]})) to node 1001
> > > >
> > > > It doesn't seem right to refresh meta data so often.
> > > > Maybe problem is that this cluster info is not full. In logs there's
> > > > information about one topic only, but cluster has three topics.
> > > >
> > > > My questions are:
> > > > Am I using KafkaProducer correctly? Maybe I should close it and
> create
> > > new
> > > > one on each file?
> > > > Why metadata refresh happens so often?
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message