flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jelmer Kuperus (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8093) flink job fail because of kafka producer create fail of "javax.management.InstanceAlreadyExistsException"
Date Mon, 22 Jan 2018 10:13:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334101#comment-16334101
] 

Jelmer Kuperus commented on FLINK-8093:
---------------------------------------

This is a lot more problematic in flink 1.4.0


Statics are only static within the context of their classloader. In 1.4.0 the kafka classes
are loaded from the user classloader. So this PRODUCER_CLIENT_ID_SEQUENCE that you see used
here :(

[https://github.com/apache/kafka/blob/11f3db0b731739017174e488670e529af4dc22ae/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L335]

 

will always be 1

> flink job fail because of kafka producer create fail of "javax.management.InstanceAlreadyExistsException"
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8093
>                 URL: https://issues.apache.org/jira/browse/FLINK-8093
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.3.2
>         Environment: flink 1.3.2, kafka 0.9.1
>            Reporter: dongtingting
>            Priority: Critical
>
> one taskmanager has multiple taskslot, one task fail because of create kafkaProducer
fail,the reason for create kafkaProducer fail is “javax.management.InstanceAlreadyExistsException:
kafka.producer:type=producer-metrics,client-id=producer-3”。 the detail trace is :
> 2017-11-04 19:41:23,281 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Source: Custom Source -> Filter -> Map -> Filter -> Sink: dp_client_**_log
(7/80) (99551f3f892232d7df5eb9060fa9940c) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>         at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
>         at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getKafkaProducer(FlinkKafkaProducerBase.java:202)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:212)
>         at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException: Error registering mbean kafka.producer:type=producer-metrics,client-id=producer-3
>         at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
>         at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
>         at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
>         at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:255)
>         at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:239)
>         at org.apache.kafka.clients.producer.internals.RecordAccumulator.registerMetrics(RecordAccumulator.java:137)
>         at org.apache.kafka.clients.producer.internals.RecordAccumulator.<init>(RecordAccumulator.java:111)
>         at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:261)
>         ... 9 more
> Caused by: javax.management.InstanceAlreadyExistsException: kafka.producer:type=producer-metrics,client-id=producer-3
>         at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>         at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>         at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>         at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>         at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>         at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>         at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
>         ... 16 more
> I doubt that task in different taskslot of one taskmanager use different classloader,
and taskid may be  the same in one process。 So this lead to create kafkaProducer fail in
one taskManager。 
> Does anybody encountered the same problem? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message