flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shengnan YU (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-11848) Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
Date Wed, 24 Apr 2019 03:21:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16824781#comment-16824781

Shengnan YU commented on FLINK-11848:

It does not work, I have looked up the source code. Flink partition discovery get the topic
list from consumer metadata. However the warning occurs when consumer fetch metadata from
cluster. It is a issue with kafka-client not flink actually. Thank you very much for help.

> Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
> ------------------------------------------------------------
>                 Key: FLINK-11848
>                 URL: https://issues.apache.org/jira/browse/FLINK-11848
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.4
>            Reporter: Shengnan YU
>            Assignee: frank wang
>            Priority: Major
> Recently we are doing some streaming jobs with apache flink. There are multiple KAFKA
topics with a format as xxxxxx_yy-mm-dd. We used a topic regex pattern to let a consumer to
consume those topics. However, if we delete some older topics, it seems that the metadata
in consumer does not update properly so It still remember those outdated topic in its topic
list, which leads to *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery.
It seems to occur in producer as well. Any idea to solve this problem? Thank you very much!
> Example to reproduce problem:
> There are multiple kafka topics which are "test20190310","test20190311","test20190312"
for instance. I run the job and everything is ok. Then if I delete topic "test20190310", the
consumer does not perceive the topic is deleted, it will still go fetching metadata of that
topic. In taskmanager's log, unknown errors display. 
> {code:java}
> public static void main(String []args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "localhost:9092\n");
>         props.put("group.id", "test10");
>         props.put("enable.auto.commit", "true");
>         props.put("auto.commit.interval.ms", "1000");
>         props.put("auto.offset.rest", "earliest");
>         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>         props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>                "1200000");
>         Pattern topics = Pattern.compile("^test.*$");
>         FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topics,
new SimpleStringSchema(), props);
>         DataStream<String> stream = env.addSource(consumer);
>         stream.writeToSocket("localhost", 44444, new SimpleStringSchema());
>         env.execute("test");
>     }
> }
> {code}

This message was sent by Atlassian JIRA

View raw message