incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [3/3] git commit: Bind S4Metrics as an eager singleton
Date Thu, 20 Sep 2012 16:19:08 GMT
Bind S4Metrics as an eager singleton


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/b0a904b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/b0a904b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/b0a904b7

Branch: refs/heads/S4-95
Commit: b0a904b70fc2e0e4e2e264f3dbce7d6f93273069
Parents: 80dfe40
Author: Matthieu Morel <mmorel@apache.org>
Authored: Mon Sep 17 12:30:50 2012 +0200
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Mon Sep 17 12:31:15 2012 +0200

----------------------------------------------------------------------
 .../java/org/apache/s4/core/DefaultCoreModule.java |    3 ++
 .../java/org/apache/s4/core/ProcessingElement.java |    2 +-
 .../java/org/apache/s4/core/util/S4Metrics.java    |   24 +++++++++++---
 3 files changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b0a904b7/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5701640..bdf9379 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -31,6 +31,7 @@ import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.ft.NoOpCheckpointingFramework;
+import org.apache.s4.core.util.S4Metrics;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.slf4j.Logger;
@@ -83,6 +84,8 @@ public class DefaultCoreModule extends AbstractModule {
         // For enabling checkpointing, one needs to use a custom module, such as
         // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
         bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
+
+        bind(S4Metrics.class).asEagerSingleton();
     }
 
     private void loadProperties(Binder binder) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b0a904b7/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 5429076..aca9327 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -172,7 +172,7 @@ public abstract class ProcessingElement implements Cloneable {
          */
         this.pePrototype = this;
 
-        S4Metrics.createCacheGauges(peInstances);
+        S4Metrics.createCacheGauges(this, peInstances);
 
         processingTimer = Metrics.newTimer(getClass(), getClass().getName() + "-pe-processing-time");
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b0a904b7/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index d0ffc0d..fe3fa80 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -11,17 +11,23 @@ import org.apache.s4.core.Receiver;
 import org.apache.s4.core.RemoteSender;
 import org.apache.s4.core.Sender;
 import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.beust.jcommander.internal.Lists;
 import com.beust.jcommander.internal.Maps;
 import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.Meter;
 
+@Singleton
 public class S4Metrics {
 
+    private static Logger logger = LoggerFactory.getLogger(S4Metrics.class);
+
     @Inject
     Emitter emitter;
 
@@ -52,22 +58,24 @@ public class S4Metrics {
         }
     }
 
-    public static void createCacheGauges(final LoadingCache<String, ProcessingElement>
cache) {
-        Metrics.newGauge(ProcessingElement.class, "PE-cache-entries", new Gauge<Long>()
{
+    public static void createCacheGauges(ProcessingElement prototype,
+            final LoadingCache<String, ProcessingElement> cache) {
+
+        Metrics.newGauge(prototype.getClass(), "PE-cache-entries", new Gauge<Long>()
{
 
             @Override
             public Long value() {
                 return cache.size();
             }
         });
-        Metrics.newGauge(ProcessingElement.class, "PE-cache-evictions", new Gauge<Long>()
{
+        Metrics.newGauge(prototype.getClass(), "PE-cache-evictions", new Gauge<Long>()
{
 
             @Override
             public Long value() {
                 return cache.stats().evictionCount();
             }
         });
-        Metrics.newGauge(ProcessingElement.class, "PE-cache-misses", new Gauge<Long>()
{
+        Metrics.newGauge(prototype.getClass(), "PE-cache-misses", new Gauge<Long>()
{
 
             @Override
             public Long value() {
@@ -82,7 +90,13 @@ public class S4Metrics {
     }
 
     public static void sentEvent(int partition) {
-        senderMeters[partition].mark();
+        try {
+            senderMeters[partition].mark();
+        } catch (NullPointerException e) {
+            logger.warn("Sender meter not ready for partition {}", partition);
+        } catch (ArrayIndexOutOfBoundsException e) {
+            logger.warn("Partition {} does not exist", partition);
+        }
     }
 
     public static void createStreamMeters(String name) {


Mime
View raw message