Bind S4Metrics as an eager singleton
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/b0a904b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/b0a904b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/b0a904b7
Branch: refs/heads/dev
Commit: b0a904b70fc2e0e4e2e264f3dbce7d6f93273069
Parents: 80dfe40
Author: Matthieu Morel <mmorel@apache.org>
Authored: Mon Sep 17 12:30:50 2012 +0200
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Mon Sep 17 12:31:15 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/s4/core/DefaultCoreModule.java | 3 ++
.../java/org/apache/s4/core/ProcessingElement.java | 2 +-
.../java/org/apache/s4/core/util/S4Metrics.java | 24 +++++++++++---
3 files changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b0a904b7/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5701640..bdf9379 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -31,6 +31,7 @@ import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.core.ft.CheckpointingFramework;
import org.apache.s4.core.ft.NoOpCheckpointingFramework;
+import org.apache.s4.core.util.S4Metrics;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.DistributedDeploymentManager;
import org.slf4j.Logger;
@@ -83,6 +84,8 @@ public class DefaultCoreModule extends AbstractModule {
// For enabling checkpointing, one needs to use a custom module, such as
// org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
+
+ bind(S4Metrics.class).asEagerSingleton();
}
private void loadProperties(Binder binder) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b0a904b7/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 5429076..aca9327 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -172,7 +172,7 @@ public abstract class ProcessingElement implements Cloneable {
*/
this.pePrototype = this;
- S4Metrics.createCacheGauges(peInstances);
+ S4Metrics.createCacheGauges(this, peInstances);
processingTimer = Metrics.newTimer(getClass(), getClass().getName() + "-pe-processing-time");
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b0a904b7/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index d0ffc0d..fe3fa80 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -11,17 +11,23 @@ import org.apache.s4.core.Receiver;
import org.apache.s4.core.RemoteSender;
import org.apache.s4.core.Sender;
import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.beust.jcommander.internal.Lists;
import com.beust.jcommander.internal.Maps;
import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Meter;
+@Singleton
public class S4Metrics {
+ private static Logger logger = LoggerFactory.getLogger(S4Metrics.class);
+
@Inject
Emitter emitter;
@@ -52,22 +58,24 @@ public class S4Metrics {
}
}
- public static void createCacheGauges(final LoadingCache<String, ProcessingElement>
cache) {
- Metrics.newGauge(ProcessingElement.class, "PE-cache-entries", new Gauge<Long>()
{
+ public static void createCacheGauges(ProcessingElement prototype,
+ final LoadingCache<String, ProcessingElement> cache) {
+
+ Metrics.newGauge(prototype.getClass(), "PE-cache-entries", new Gauge<Long>()
{
@Override
public Long value() {
return cache.size();
}
});
- Metrics.newGauge(ProcessingElement.class, "PE-cache-evictions", new Gauge<Long>()
{
+ Metrics.newGauge(prototype.getClass(), "PE-cache-evictions", new Gauge<Long>()
{
@Override
public Long value() {
return cache.stats().evictionCount();
}
});
- Metrics.newGauge(ProcessingElement.class, "PE-cache-misses", new Gauge<Long>()
{
+ Metrics.newGauge(prototype.getClass(), "PE-cache-misses", new Gauge<Long>()
{
@Override
public Long value() {
@@ -82,7 +90,13 @@ public class S4Metrics {
}
public static void sentEvent(int partition) {
- senderMeters[partition].mark();
+ try {
+ senderMeters[partition].mark();
+ } catch (NullPointerException e) {
+ logger.warn("Sender meter not ready for partition {}", partition);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ logger.warn("Partition {} does not exist", partition);
+ }
}
public static void createStreamMeters(String name) {
|