Hello,

Are the metrics for which the error occurs still reported correctly?

If you submit a job does this also happens for job metrics?

I haven't looked into it in detail yet, but I would assume that this is not an issue of the reporter, but something
deeper in Flink (like that TM metrics are registered multiple times).

On 03.07.2017 12:35, 김동원 wrote:
Hi,

First of all, thanks to Maximilian Bode for a Prometheus reporter.
Thanks to it, now I can count entirely on Prometheus to collect metrics from various sources including Flink as shown below:


To test it, I took the following steps
  1. generate flink-metrics-prometheus-1.4-SNAPSHOT.jar 
  2. put it under ./lib 
  3. modify ./conf/flink-conf.yaml to include the two lines:
    1. metrics.reporters: prom
    2. metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter 
  4. start JobManager and TaskManager on different nodes to avoid port collision (9249)

While JobManager works okay with Prometheus reporter (I found that Prometheus scraped it successfully every 5 sec as expected), TaskManager complains as follows when it boots up:
---------------------------------
2017-07-03 18:07:00,734 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor at akka://flink/user/taskmanager#-21882459.
2017-07-03 18:07:00,735 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager data connection information: 96fee790eabe7df19322147f7d8634b5 @ DNN-G08-235 (dataPort=46188)
2017-07-03 18:07:00,735 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager has 1 task slot(s).
2017-07-03 18:07:00,737 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 113/1024/1024 MB, NON HEAP: 36/37/-1 MB (used/committed/max)]
2017-07-03 18:07:00,741 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka.tcp://flink@pdm4:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2017-07-03 18:07:00,885 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka.tcp://flink@pdm4:6123/user/jobmanager), starting network stack and library cache.
2017-07-03 18:07:00,892 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be pdm4/50.1.100.234:41010. Starting BLOB cache.
2017-07-03 18:07:00,896 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-bad71755-c7a3-4179-8e70-ea42ff73cdde
2017-07-03 18:07:00,902 ERROR org.apache.flink.runtime.metrics.MetricRegistry               - Error while registering metric.
java.lang.IllegalArgumentException: Collector already registered that provides name: flink_taskmanager_Status_JVM_ClassLoader_ClassesLoaded
at org.apache.flink.shaded.io.prometheus.client.CollectorRegistry.register(CollectorRegistry.java:54)
at org.apache.flink.shaded.io.prometheus.client.Collector.register(Collector.java:128)
at org.apache.flink.shaded.io.prometheus.client.Collector.register(Collector.java:121)
at org.apache.flink.metrics.prometheus.PrometheusReporter.notifyOfAddedMetric(PrometheusReporter.java:133)
at org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:296)
at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370)
at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:314)
at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateClassLoaderMetrics(MetricUtils.java:90)
at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateStatusMetrics(MetricUtils.java:80)
at org.apache.flink.runtime.taskmanager.TaskManager.associateWithJobManager(TaskManager.scala:989)
at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRegistrationMessage(TaskManager.scala:627)
at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:287)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:121)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
---------------------------------

The error message is repeated for the other metrics:
  • flink_taskmanager_Status_JVM_ClassLoader_ClassesLoaded
  • flink_taskmanager_Status_JVM_ClassLoader_ClassesUnloaded
  • flink_taskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Count
  • flink_taskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Time
  • flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Count
  • flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Time
  • flink_taskmanager_Status_JVM_Memory_Heap_Used
  • flink_taskmanager_Status_JVM_Memory_Heap_Committed
  • flink_taskmanager_Status_JVM_Memory_Heap_Max
  • flink_taskmanager_Status_JVM_Memory_NonHeap_Used
  • flink_taskmanager_Status_JVM_Memory_NonHeap_Committed
  • flink_taskmanager_Status_JVM_Memory_NonHeap_Max
  • flink_taskmanager_Status_JVM_Memory_Direct_Count
  • flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed
  • flink_taskmanager_Status_JVM_Memory_Direct_TotalCapacity
  • flink_taskmanager_Status_JVM_Memory_Mapped_Count
  • flink_taskmanager_Status_JVM_Memory_Mapped_MemoryUsed
  • flink_taskmanager_Status_JVM_Memory_Mapped_TotalCapacity
  • flink_taskmanager_Status_JVM_Threads_Count
  • flink_taskmanager_Status_JVM_CPU_Load
  • flink_taskmanager_Status_JVM_CPU_Time
  • flink_taskmanager_Status_Network_TotalMemorySegments
  • flink_taskmanager_Status_Network_AvailableMemorySegments
I tested it on different environments (w/ and w/o Docker) and they showed the same error. 
What's going wrong with it?

Thanks,

- Dongwon Kim