flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager
Date Thu, 23 Jun 2016 09:04:16 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346104#comment-15346104
] 

ASF GitHub Bot commented on FLINK-1550:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2146#discussion_r68199369
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
    @@ -1737,6 +1766,149 @@ class JobManager(
         // Shutdown and discard all queued messages
         context.system.shutdown()
       }
    +
    +  private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
    +    jobManagerMetricGroup.gauge("taskSlotsAvailable", new Gauge[Long] {
    +      override def getValue: Long = JobManager.this.instanceManager.getNumberOfAvailableSlots
    +    })
    +    jobManagerMetricGroup.gauge("taskSlotsTotal", new Gauge[Long] {
    +      override def getValue: Long = JobManager.this.instanceManager.getTotalNumberOfSlots
    +    })
    +    jobManagerMetricGroup.gauge("numRegisteredTaskManagers", new Gauge[Long] {
    +      override def getValue: Long
    +      = JobManager.this.instanceManager.getNumberOfRegisteredTaskManagers
    +    })
    +    jobManagerMetricGroup.gauge("numRunningJobs", new Gauge[Long] {
    +      override def getValue: Long = JobManager.this.currentJobs.size
    +    })
    +    instantiateStatusMetrics(jobManagerMetricGroup)
    +  }
    +
    +  private def instantiateStatusMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
    +    val jvm = jobManagerMetricGroup
    +      .addGroup("Status")
    +      .addGroup("JVM")
    +
    +    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
    +    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
    +    instantiateMemoryMetrics(jvm.addGroup("Memory"))
    +    instantiateThreadMetrics(jvm.addGroup("Threads"))
    +    instantiateCPUMetrics(jvm.addGroup("CPU"))
    +  }
    +
    +  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
    +    val mxBean = ManagementFactory.getClassLoadingMXBean
    +
    +    metrics
    +      .gauge("ClassesLoaded", new Gauge[Long] {
    +        override def getValue: Long = mxBean.getTotalLoadedClassCount
    +      })
    +    metrics.gauge("ClassesUnloaded", new Gauge[Long] {
    +      override def getValue: Long = mxBean.getUnloadedClassCount
    +    })
    +  }
    +
    +  private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
    +    val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
    +
    +    for (garbageCollector <- garbageCollectors) {
    +      val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"")
    +      gcGroup.gauge("Count", new Gauge[Long] {
    +        override def getValue: Long = garbageCollector.getCollectionCount
    +      })
    +      gcGroup.gauge("Time", new Gauge[Long] {
    +        override def getValue: Long = garbageCollector.getCollectionTime
    +      })
    +    }
    +  }
    +
    +  private def instantiateMemoryMetrics(metrics: MetricGroup) {
    +    val mxBean = ManagementFactory.getMemoryMXBean
    +    val heap = metrics.addGroup("Heap")
    +    heap.gauge("Used", new Gauge[Long] {
    +      override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
    +    })
    +    heap.gauge("Committed", new Gauge[Long] {
    +      override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
    +    })
    +    heap.gauge("Max", new Gauge[Long] {
    +      override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
    +    })
    +
    +    val nonHeap = metrics.addGroup("NonHeap")
    +    nonHeap.gauge("Used", new Gauge[Long] {
    +      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
    +    })
    +    nonHeap.gauge("Committed", new Gauge[Long] {
    +      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
    +    })
    +    nonHeap.gauge("Max", new Gauge[Long] {
    +      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
    +    })
    +
    +    val con = ManagementFactory.getPlatformMBeanServer;
    +
    +    val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
    +
    +    val direct = metrics.addGroup("Direct")
    +    direct.gauge("Count", new Gauge[Long] {
    +      override def getValue: Long = con
    +        .getAttribute(directObjectName, "Count").asInstanceOf[Long]
    --- End diff --
    
    But it throws exceptions when the attribute cannot be found. We should handle them.


> Show JVM Metrics for JobManager
> -------------------------------
>
>                 Key: FLINK-1550
>                 URL: https://issues.apache.org/jira/browse/FLINK-1550
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager, Metrics
>            Reporter: Robert Metzger
>            Assignee: Chesnay Schepler
>             Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message