flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink pull request #2285: [FLINK-4246] Allow Specifying Multiple Metrics Rep...
Date Mon, 25 Jul 2016 15:57:12 GMT
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2285#discussion_r72091326
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
---
    @@ -74,81 +76,97 @@ public MetricRegistry(Configuration config) {
     		this.delimiter = delim;
     
     		// second, instantiate any custom configured reporters
    +		this.reporters = new ArrayList<>();
    +
    +		final String definedReporters = config.getString(ConfigConstants.METRICS_REPORTERS_LIST,
null);
     
    -		final String className = config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
    -		if (className == null) {
    +		if (definedReporters == null) {
    +			// no reporters defined
     			// by default, don't report anything
     			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
    -			this.reporter = null;
     			this.executor = null;
    -		}
    -		else {
    -			MetricReporter reporter;
    -			ScheduledExecutorService executor = null;
    -			try {
    -				String configuredPeriod = config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL,
null);
    -				TimeUnit timeunit = TimeUnit.SECONDS;
    -				long period = 10;
    -
    -				if (configuredPeriod != null) {
    -					try {
    -						String[] interval = configuredPeriod.split(" ");
    -						period = Long.parseLong(interval[0]);
    -						timeunit = TimeUnit.valueOf(interval[1]);
    -					}
    -					catch (Exception e) {
    -						LOG.error("Cannot parse report interval from config: " + configuredPeriod +
    -							" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
    -							"Using default reporting interval.");
    -					}
    +		} else {
    +			// we have some reporters so
    +			String[] namedReporters = definedReporters.split(",");
    +			for (String namedReporter : namedReporters) {
    +
    +				final String className =
    +						config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "."
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
    +				if (className == null) {
    +					LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might
not be exposed/reported.");
    +					continue;
     				}
     
    -				MetricConfig reporterConfig = createReporterConfig(config);
    +				try {
    +					String configuredPeriod =
    +							config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "."
+ ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
    +					TimeUnit timeunit = TimeUnit.SECONDS;
    +					long period = 10;
    +
    +					if (configuredPeriod != null) {
    +						try {
    +							String[] interval = configuredPeriod.split(" ");
    +							period = Long.parseLong(interval[0]);
    +							timeunit = TimeUnit.valueOf(interval[1]);
    +						}
    +						catch (Exception e) {
    +							LOG.error("Cannot parse report interval from config: " + configuredPeriod +
    +									" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
    +									"Using default reporting interval.");
    +						}
    +					}
     
    -				Class<?> reporterClass = Class.forName(className);
    -				reporter = (MetricReporter) reporterClass.newInstance();
    -				reporter.open(reporterConfig);
    +					Class<?> reporterClass = Class.forName(className);
    +					MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance();
     
    -				if (reporter instanceof Scheduled) {
    -					executor = Executors.newSingleThreadScheduledExecutor();
    -					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    +					MetricConfig reporterConfig = new MetricConfig();
    +					config.addAll(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + ".", reporterConfig);
    +					reporterInstance.open(reporterConfig);
     
    -					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period,
period, timeunit);
    +					if (reporterInstance instanceof Scheduled) {
    +						if (this.executor == null) {
    +							executor = Executors.newSingleThreadScheduledExecutor();
    +						}
    +						LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    --- End diff --
    
    Fixing, I'll include the configured reporter name as well as the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message