kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paolo Patierno <ppatie...@live.com>
Subject No error for assigning to not existing partition
Date Thu, 21 Apr 2016 13:41:44 GMT
Hello,

I have a topic with 4 partitions and I'm using the assign() method of KafkaConsumer for receiving
from a specified partition.
For testing, I tried to specify a not existing partition (number 5) but I had a strange trace
and behavior instead of an error/exception related to not existing partition.
After the assing() call I have the following ...
...
2016-04-21 15:31:26,355 [                      Thread-5] KafkaConsumer                  DEBUG
Subscribed to partition(s): my_topic-5
...
and then a lot of retries to get partition information offset ...

2016-04-21 15:31:26,482 [                      Thread-5] ConsumerCoordinator            DEBUG
No committed offset for partition my_topic-5
2016-04-21 15:31:26,482 [                      Thread-5] Fetcher                        DEBUG
Resetting offset for partition my_topic-5 to latest offset.
2016-04-21 15:31:26,483 [                      Thread-5] Fetcher                        DEBUG
Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,571 [                      Thread-5] NetworkClient                  DEBUG
Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=4,client_id=consumer-1},
body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486571, sendTimeMs=0)
to node 0
2016-04-21 15:31:26,573 [                      Thread-5] Metadata                       DEBUG
Updated cluster metadata version 3 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)],
partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr
= [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,],
Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic
= my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
2016-04-21 15:31:26,573 [                      Thread-5] Fetcher                        DEBUG
Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,672 [                      Thread-5] NetworkClient                  DEBUG
Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=5,client_id=consumer-1},
body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486672, sendTimeMs=0)
to node 0
2016-04-21 15:31:26,674 [                      Thread-5] Metadata                       DEBUG
Updated cluster metadata version 4 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)],
partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr
= [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,],
Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic
= my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
2016-04-21 15:31:26,674 [                      Thread-5] Fetcher                        DEBUG
Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,773 [                      Thread-5] NetworkClient                  DEBUG
Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=6,client_id=consumer-1},
body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486773, sendTimeMs=0)
to node 0
2016-04-21 15:31:26,775 [                      Thread-5] Metadata                       DEBUG
Updated cluster metadata version 5 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)],
partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr
= [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,],
Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic
= my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
2016-04-21 15:31:26,775 [                      Thread-5] Fetcher                        DEBUG
Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,874 [                      Thread-5] NetworkClient                  DEBUG
Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=7,client_id=consumer-1},
body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486874, sendTimeMs=0)
to node 0
2016-04-21 15:31:26,876 [                      Thread-5] Metadata                       DEBUG
Updated cluster metadata version 6 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)],
partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr
= [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,],
Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic
= my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])

Why don't an error is returned for a not existing partition ? 

Do I have to check by myself calling partitionsFor() method before trying the assign() ?

Thanks,

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience 		 	   		  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message