kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shengnan YU <ysna...@hotmail.com>
Subject Kafka client keeps fetch metadata for deleted topic, which leads to UNKNOWN_TOPIC_EXCEPTION
Date Mon, 15 Apr 2019 05:41:25 GMT
Recently we used Apache Flink to consume kafka topics with a regex pattern. It is found that
when we deleted some unused topics, the logs will keep flushing UNKNOWN_TOPIC_EXCEPTION.

I looked up the source code of kafka client and found that for consumer, topicExpiry is disable
in Metadata, which leads to that the client still manage deleted topic's information in the
metadata's topic list and keep fetching them from servers.

Is there any good method to avoid this annoying warning logs without modify the kafka's source
code? (We still need the 'Real' Unknown topic exception, which means not the outdated topic,
in logs)

kafka client version: 1.0

The following code can be used to reproduce this problem (if you create multiple topics such
as "test1", "test2", "test3"..."testn" in kafka cluster and then delete any of one while running).

public static void main(String [] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092\n");
        props.put("group.id", "test10");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("metadata.max.age.ms", "60000");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        class PartitionOffsetAssignerListener implements ConsumerRebalanceListener {

            private KafkaConsumer<String, String> consumer;

            public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) {
                this.consumer = kafkaConsumer;
            }

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
                //reading all partitions from the beginning
                consumer.seekToBeginning(partitions);
            }
        }

        consumer.subscribe(Pattern.compile("^test.*$"), new PartitionOffsetAssignerListener(consumer));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offse


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message