incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [11/15] git commit: Asynch serialization by default + add synchro in injector - injector now only starts injecting when target consumers are all available (allows to easily check that no event is lost)
Date Fri, 18 Jan 2013 12:16:46 GMT
Asynch serialization by default + add synchro in injector
- injector now only starts injecting when target consumers are all available
(allows to easily check that no event is lost)


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/ab3ac775
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/ab3ac775
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/ab3ac775

Branch: refs/heads/dev
Commit: ab3ac7754987310112aff25b091d25e83adac9aa
Parents: 8117de6
Author: Matthieu Morel <mmorel@apache.org>
Authored: Thu Dec 6 15:37:32 2012 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Thu Dec 6 15:44:59 2012 +0100

----------------------------------------------------------------------
 subprojects/s4-benchmarks/README.md                |   10 ++++
 .../org/apache/s4/benchmark/utils/Injector.java    |   39 +++++++++++++-
 .../java/org/apache/s4/benchmark/utils/Utils.java  |   31 ++++++++++++
 .../main/java/org/apache/s4/core/SenderImpl.java   |    8 +--
 4 files changed, 80 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab3ac775/subprojects/s4-benchmarks/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/README.md b/subprojects/s4-benchmarks/README.md
index f8528fb..93062f6 100644
--- a/subprojects/s4-benchmarks/README.md
+++ b/subprojects/s4-benchmarks/README.md
@@ -54,9 +54,12 @@ Exmample configuration files are available in `/config` and you can configure
:
 - the duration of the pause (can be 0)
 - etc…
 
+The total number of events sent from an injector is `number of keys * number of test iterations
* number of parallel injection threads`. Make sure this is significant in order to be able
to correctly interpret the messaging rates (1000 would be too little for instance!).
+
 By default in this example the size of a message is 188 bytes.
 
 
+
 ## Running
 
 Running 2 S4 nodes on the local machine:
@@ -76,6 +79,13 @@ Results are also available from the console output for each of the nodes.
 
 Most statistics files come from the probes of the platform and some of them use weighted
moving averages. These are good for long running applications. For the benchmarks we also
show instant rates, which are available in `injection-rate.csv` and `simplePE1.csv` files.
 
+You may also check that all events have been processed: 
+
+* each injector reports how many events it sent on which stream
+* each node reports the total number of events received
+* you should get `total injected from all injectors = total received in all nodes` (minus
events sent through internal streams in the app, if that applies)
+
+
 ## Notes
 
 There are a lot of knobs for optimally configuring the stages, and the optimal settings will
also depend upon:

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab3ac775/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java
b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java
index 769ccaa..0766e2d 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java
@@ -4,6 +4,7 @@ import java.io.File;
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -13,6 +14,7 @@ import org.apache.s4.base.Event;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.RemoteStream;
 import org.apache.s4.core.adapter.AdapterApp;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -22,6 +24,7 @@ import com.google.inject.name.Named;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
 
 public class Injector extends AdapterApp {
 
@@ -51,6 +54,8 @@ public class Injector extends AdapterApp {
     @Named("s4.benchmark.injector.parallelism")
     int parallelism;
 
+    static int CSV_REPORTER_INTERVAL_S = 5;
+
     // Meter meter = Metrics.newMeter(Injector.class, "injector", "injected", TimeUnit.SECONDS);
 
     static AtomicLong counter = new AtomicLong();
@@ -61,11 +66,12 @@ public class Injector extends AdapterApp {
     @Override
     protected void onInit() {
 
-        File logDir = new File(System.getProperty("user.dir") + "/measurements/injectors");
+        File logDir = new File(System.getProperty("user.dir") + "/measurements/injectors/"
+                + getReceiver().getPartitionId());
         if (!logDir.mkdirs()) {
             logger.debug("Cannot create dir " + logDir.getAbsolutePath());
         }
-        // CsvReporter.enable(logDir, 5, TimeUnit.SECONDS);
+        CsvReporter.enable(logDir, CSV_REPORTER_INTERVAL_S, TimeUnit.SECONDS);
         remoteStreamKeyFinder = new KeyFinder<Event>() {
 
             @Override
@@ -80,6 +86,24 @@ public class Injector extends AdapterApp {
     @Override
     protected void onStart() {
 
+        ZooKeeper zk;
+        try {
+            zk = new ZooKeeper(zkString, 10000, null);
+
+            // NOTE: processing nodes cluster name is hardcoded!
+            int nbProcessingNodes = zk.getChildren("/s4/clusters/testCluster2/tasks", null).size();
+            CountDownLatch signalNodesConnected = new CountDownLatch(1);
+            Utils.watchAndSignalChildrenReachedCount("/s4/streams/" + getRemoteStream().getName()
+ "/consumers",
+                    signalNodesConnected, zk, nbProcessingNodes);
+            logger.info("Waiting for all consumers for stream {}", "inputStream");
+            signalNodesConnected.await();
+            logger.info("All consumers reached for stream {}, proceeding to injection", getRemoteStream().getName());
+        } catch (Exception e) {
+            e.printStackTrace();
+            logger.error("Cannot fetch config info from zookeeper", e);
+            System.exit(1);
+        }
+
         Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
 
             @Override
@@ -112,7 +136,16 @@ public class Injector extends AdapterApp {
 
         generateEvents(remoteStream, testIterations, keysCount, parallelism);
 
-        logger.info("Tests completed after test events", testIterations * parallelism * keysCount);
+        try {
+            // make sure the last log is written
+            Thread.sleep(CSV_REPORTER_INTERVAL_S * 1000 + 5000);
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
+        logger.info("Tests completed after {} test events sent to stream {}", testIterations
* parallelism * keysCount,
+                getRemoteStream().getName());
 
         System.exit(0);
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab3ac775/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
index bf1023a..291c50a 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
@@ -5,6 +5,11 @@ import java.util.concurrent.CountDownLatch;
 
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,4 +41,30 @@ public class Utils {
         return signalReady;
     }
 
+    public static void watchAndSignalChildrenReachedCount(final String path, final CountDownLatch
latch,
+            final ZooKeeper zk, final int count) throws KeeperException, InterruptedException
{
+
+        List<String> children = zk.getChildren(path, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (EventType.NodeChildrenChanged.equals(event.getType())) {
+                    try {
+                        if (count == zk.getChildren(path, false).size()) {
+                            latch.countDown();
+                        }
+                    } catch (KeeperException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    } catch (InterruptedException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                    latch.countDown();
+                }
+            }
+        });
+        if (children.size() == count) {
+            latch.countDown();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab3ac775/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 7dd84b9..03799f6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -99,15 +99,13 @@ public class SenderImpl implements Sender {
             /* Hey we are in the same JVM, don't use the network. */
             return false;
         }
-        // TODO asynch
-        send(partition, serDeser.serialize(event));
+        send(partition, event);
         metrics.sentEvent(partition);
         return true;
     }
 
-    private void send(int partition, ByteBuffer message) {
-
-        emitter.send(partition, message);
+    private void send(int partition, Event event) {
+        tpe.submit(new SerializeAndSendToRemotePartitionTask(event, partition));
     }
 
     /*


Mime
View raw message