flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aitozi <...@git.apache.org>
Subject [GitHub] flink pull request #4935: [Flink-7945][Metrics&connector]Fix per partition-l...
Date Thu, 02 Nov 2017 13:26:14 GMT
Github user Aitozi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4935#discussion_r148530767
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
---
    @@ -245,6 +238,23 @@ public void run() {
     				if (records == null) {
     					try {
     						records = consumer.poll(pollTimeout);
    +						// register Kafka's very own metrics in Flink's metric reporters
    +						if (useMetrics && !records.isEmpty()) {
    +							// register Kafka metrics to Flink
    +							Map<MetricName, ? extends Metric> metrics = consumer.metrics();
    +							if (metrics == null) {
    +								// MapR's Kafka implementation returns null here.
    +								log.info("Consumer implementation does not support metrics");
    +							} else {
    +								// we have Kafka metrics, register them
    +								for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet())
{
    --- End diff --
    
    yes, i agree with you this is not the best way to solve. what do you think about try to
register  kafka metrics at the beginnng of the job for about serval times which can be  configured
by `properties`, after beyond the count, we will not run in the loop~


---

Mime
View raw message