incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [20/50] [abbrv] git commit: renamed test packages
Date Tue, 03 Jan 2012 14:03:28 GMT
renamed test packages

- from test.s4.xx to org.apache.s4.xx


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/90f68b42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/90f68b42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/90f68b42

Branch: refs/heads/piper
Commit: 90f68b42716061996bef4f3a958b633e2902ef0d
Parents: 502b81e
Author: Matthieu Morel <mmorel@apache.org>
Authored: Fri Nov 11 13:01:24 2011 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Wed Nov 23 15:54:50 2011 +0100

----------------------------------------------------------------------
 .../org/apache/s4/core/TestCircularFifoBuffer.java |   53 ++
 .../apache/s4/core/apploading/AppLoadingTest.java  |  164 ++++++
 .../org/apache/s4/core/apploading/SimpleApp.java   |   47 ++
 .../apache/s4/core/apploading/SimpleModule.java    |    7 +
 .../org/apache/s4/core/apploading/SimplePE.java    |   66 +++
 .../java/org/apache/s4/core/overloadgen/A.java     |   51 ++
 .../java/org/apache/s4/core/overloadgen/B.java     |   21 +
 .../java/org/apache/s4/core/overloadgen/C.java     |   25 +
 .../java/org/apache/s4/core/overloadgen/D.java     |   32 +
 .../org/apache/s4/core/overloadgen/Event1.java     |    7 +
 .../org/apache/s4/core/overloadgen/Event1a.java    |    7 +
 .../org/apache/s4/core/overloadgen/Event2.java     |    7 +
 .../core/overloadgen/OverloadDispatcherTest.java   |   81 +++
 .../apache/s4/core/triggers/CountTriggerTest.java  |   16 +
 .../org/apache/s4/core/triggers/NoTriggerTest.java |   17 +
 .../apache/s4/core/triggers/TimeTriggerTest.java   |   19 +
 .../org/apache/s4/core/triggers/TriggerTest.java   |   75 +++
 .../org/apache/s4/core/triggers/TriggerablePE.java |   80 +++
 .../org/apache/s4/core/triggers/TriggeredApp.java  |   58 ++
 .../apache/s4/core/triggers/TriggeredModule.java   |    6 +
 .../FileBasedClusterManagementTestModule.java      |   78 +++
 .../java/org/apache/s4/fixtures/SocketAdapter.java |  104 ++++
 .../java/org/apache/s4/fixtures/TestUtils.java     |  451 +++++++++++++++
 .../ZkBasedClusterManagementTestModule.java        |   82 +++
 .../org/apache/s4/wordcount/KeyValueEvent.java     |   23 +
 .../org/apache/s4/wordcount/KeyValueKeyFinder.java |   18 +
 .../org/apache/s4/wordcount/SentenceKeyFinder.java |   19 +
 .../java/org/apache/s4/wordcount/StringEvent.java  |   24 +
 .../test/java/org/apache/s4/wordcount/Word.java    |   23 +
 .../org/apache/s4/wordcount/WordClassifierPE.java  |  114 ++++
 .../java/org/apache/s4/wordcount/WordCountApp.java |   71 +++
 .../org/apache/s4/wordcount/WordCountEvent.java    |   26 +
 .../apache/s4/wordcount/WordCountKeyFinder.java    |   17 +
 .../org/apache/s4/wordcount/WordCountModule.java   |    7 +
 .../org/apache/s4/wordcount/WordCountTest.java     |   93 +++
 .../org/apache/s4/wordcount/WordCounterPE.java     |   43 ++
 .../org/apache/s4/wordcount/WordSeenEvent.java     |   21 +
 .../org/apache/s4/wordcount/WordSeenKeyFinder.java |   18 +
 .../org/apache/s4/wordcount/WordSplitterPE.java    |   40 ++
 .../apache/s4/wordcount/zk/WordCountModuleZk.java  |    9 +
 .../apache/s4/wordcount/zk/WordCountTestZk.java    |  104 ++++
 .../java/test/s4/core/TestCircularFifoBuffer.java  |   53 --
 .../test/s4/core/apploading/AppLoadingTest.java    |  164 ------
 .../java/test/s4/core/apploading/SimpleApp.java    |   47 --
 .../java/test/s4/core/apploading/SimpleModule.java |    7 -
 .../java/test/s4/core/apploading/SimplePE.java     |   66 ---
 .../src/test/java/test/s4/core/overloadgen/A.java  |   51 --
 .../src/test/java/test/s4/core/overloadgen/B.java  |   21 -
 .../src/test/java/test/s4/core/overloadgen/C.java  |   25 -
 .../src/test/java/test/s4/core/overloadgen/D.java  |   32 -
 .../test/java/test/s4/core/overloadgen/Event1.java |    7 -
 .../java/test/s4/core/overloadgen/Event1a.java     |    7 -
 .../test/java/test/s4/core/overloadgen/Event2.java |    7 -
 .../core/overloadgen/OverloadDispatcherTest.java   |   81 ---
 .../test/s4/core/triggers/CountTriggerTest.java    |   16 -
 .../java/test/s4/core/triggers/NoTriggerTest.java  |   17 -
 .../test/s4/core/triggers/TimeTriggerTest.java     |   19 -
 .../java/test/s4/core/triggers/TriggerTest.java    |   75 ---
 .../java/test/s4/core/triggers/TriggerablePE.java  |   80 ---
 .../java/test/s4/core/triggers/TriggeredApp.java   |   58 --
 .../test/s4/core/triggers/TriggeredModule.java     |    6 -
 .../FileBasedClusterManagementTestModule.java      |   78 ---
 .../test/java/test/s4/fixtures/SocketAdapter.java  |  104 ----
 .../src/test/java/test/s4/fixtures/TestUtils.java  |  451 ---------------
 .../ZkBasedClusterManagementTestModule.java        |   82 ---
 .../test/java/test/s4/wordcount/KeyValueEvent.java |   23 -
 .../java/test/s4/wordcount/KeyValueKeyFinder.java  |   18 -
 .../java/test/s4/wordcount/SentenceKeyFinder.java  |   19 -
 .../test/java/test/s4/wordcount/StringEvent.java   |   24 -
 .../src/test/java/test/s4/wordcount/Word.java      |   23 -
 .../java/test/s4/wordcount/WordClassifierPE.java   |  114 ----
 .../test/java/test/s4/wordcount/WordCountApp.java  |   71 ---
 .../java/test/s4/wordcount/WordCountEvent.java     |   26 -
 .../java/test/s4/wordcount/WordCountKeyFinder.java |   17 -
 .../java/test/s4/wordcount/WordCountModule.java    |    7 -
 .../test/java/test/s4/wordcount/WordCountTest.java |   93 ---
 .../test/java/test/s4/wordcount/WordCounterPE.java |   43 --
 .../test/java/test/s4/wordcount/WordSeenEvent.java |   21 -
 .../java/test/s4/wordcount/WordSeenKeyFinder.java  |   18 -
 .../java/test/s4/wordcount/WordSplitterPE.java     |   40 --
 .../test/s4/wordcount/zk/WordCountModuleZk.java    |    8 -
 .../java/test/s4/wordcount/zk/WordCountTestZk.java |  104 ----
 82 files changed, 2224 insertions(+), 2223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/TestCircularFifoBuffer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TestCircularFifoBuffer.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TestCircularFifoBuffer.java
new file mode 100644
index 0000000..88fb157
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TestCircularFifoBuffer.java
@@ -0,0 +1,53 @@
+package org.apache.s4.core;
+
+import org.apache.commons.collections15.buffer.CircularFifoBuffer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestCircularFifoBuffer extends TestCase {
+
+    protected void setUp() {
+
+    }
+
+    public void test1() {
+
+        System.out.println("Buffer size is 10.\n");
+        CircularFifoBuffer<Integer> circularBuffer = new CircularFifoBuffer<Integer>(
+                10);
+
+        System.out.println("Add ints 100-114.");
+        for (int i = 0; i < 15; i++) {
+            circularBuffer.add(i + 100);
+        }
+
+        System.out.println("Iterate.");
+        int j = 5;
+        for(Integer num : circularBuffer) {
+            System.out.print(num + " ");
+            Assert.assertEquals(j + 100, num.intValue());
+            j++;
+        }
+        System.out.println("\nLeast recent value: " + circularBuffer.get());
+        Assert.assertEquals(105, circularBuffer.get().intValue());
+        System.out.println("\n");
+        
+        circularBuffer.clear();
+        
+        /* Less than max size. */
+        System.out.println("Clear and add ints 200-204.");
+        for (int i = 0; i < 5; i++) {
+            circularBuffer.add(i + 200);
+        }
+        
+        System.out.println("Iterate.");
+        int z = 0;
+        for(Integer num : circularBuffer) {
+            System.out.print(num + " ");
+            Assert.assertEquals(z + 200, num.intValue());
+            z++;
+        }
+        System.out.println("\n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
new file mode 100644
index 0000000..fec8fe9
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
@@ -0,0 +1,164 @@
+package org.apache.s4.core.apploading;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+import org.apache.s4.core.Server;
+import org.apache.s4.fixtures.TestUtils;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+
+/**
+ * 
+ * Tests packaging and deployment of an S4 app
+ * 
+ */
+public class AppLoadingTest {
+
+    public static final long ZOOKEEPER_PORT = 21810;
+    private static Factory zookeeperServerConnectionFactory = null;
+    private Process forkedApp;
+
+    @Before
+    public void prepare() throws Exception {
+        TestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+        final ZooKeeper zk = TestUtils.createZkClient();
+        try {
+            zk.delete("/simpleAppCreated", -1);
+        } catch (Exception ignored) {
+        }
+
+        zk.close();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        TestUtils.killS4App(forkedApp);
+    }
+
+    @Ignore("fix paths")
+    @Test
+    public void testA() throws Exception {
+
+        // add all classes from counter app
+        File rootAppDir = new File(new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath()
+                + "/s4-example/bin");
+        File appFilesDir = new File(rootAppDir, "org/apache/s4/example/counter");
+        generateS4RFromDirectoryContents(rootAppDir, appFilesDir, "counterExample",
+                "org.apache.s4.example.counter.MyApp");
+
+        forkedApp = TestUtils.forkS4Node();
+        Thread.sleep(15000);
+    }
+
+    private void generateS4RFromDirectoryContents(File rootAppDir, File appFilesDir, String s4rName, String appClassName)
+            throws IOException, FileNotFoundException {
+        Collection<File> s4rFiles = listFilesRecursively(appFilesDir);
+        File jarFile = new File(System.getProperty("user.dir") + "/bin/apps/" + s4rName + ".s4r");
+        Files.createParentDirs(jarFile);
+        FileOutputStream fos = new FileOutputStream(jarFile);
+        JarOutputStream jos = new JarOutputStream(fos);
+        System.out.println(System.getProperty("java.class.path"));
+        for (File file : s4rFiles) {
+            JarEntry jarEntry = new JarEntry(file.getAbsolutePath().substring(rootAppDir.getAbsolutePath().length()));
+            jos.putNextEntry(jarEntry);
+            ByteStreams.copy(Files.newInputStreamSupplier(file), jos);
+        }
+        // add manifest
+        File manifest = File.createTempFile("s4app", "manifest");
+        String manifestContents = "Manifest-Version: 1.0\n" + Server.MANIFEST_S4_APP_CLASS + ": " + appClassName + "\n";
+        Files.write(manifestContents, manifest, Charset.forName("UTF-8"));
+        JarEntry jarEntry = new JarEntry("META-INF/MANIFEST.MF");
+        jos.putNextEntry(jarEntry);
+        ByteStreams.copy(Files.newInputStreamSupplier(manifest), jos);
+
+        jos.close();
+    }
+
+    private Collection<File> listFilesRecursively(File dir) {
+        if (dir.isDirectory()) {
+            File[] listFiles = dir.listFiles();
+            List<File> filesToAdd = new ArrayList<File>();
+            if (listFiles.length != 0) {
+                for (File file : listFiles) {
+                    if (file.isFile()) {
+                        filesToAdd.add(file);
+                    } else if (file.isDirectory()) {
+                        filesToAdd.addAll(listFilesRecursively(file));
+                    }
+                }
+            }
+            return filesToAdd;
+        } else {
+            // TODO throw exception
+            return null;
+        }
+    }
+
+    /**
+     * 
+     * 1. generates an s4r package from classes in the apploading package (TODO process still to be improved), 2.
+     * deploys it to bin/apps 3. starts a forked S4 node, which loads apps from bin/apps 4. verifies app is working (s4
+     * app started, event correctly processed)
+     * 
+     * NOTE: we'll need to add an automatic test for which we make sure code cannot be in the classpath
+     */
+    @Test
+    public void testAppLoading() throws Exception {
+
+        // TODO fix paths
+
+        final ZooKeeper zk = TestUtils.createZkClient();
+
+        File rootAppDir = TestUtils.findDirForCompiledTestClasses();
+
+        File appFilesDir = new File(rootAppDir, "test/s4/core/apploading");
+        // 1. create app jar and place it in tmp/s4-apps
+        generateS4RFromDirectoryContents(rootAppDir, appFilesDir, "appLoadingTest", SimpleApp.class.getName());
+
+        CountDownLatch signalAppStarted = new CountDownLatch(1);
+        // 2. start s4 node and check results
+        forkedApp = TestUtils.forkS4Node();
+
+        // TODO wait for ready state (zk node available)
+        Thread.sleep(5000);
+
+        // note: make sure we don't delete existing znode if it was already created
+        TestUtils.watchAndSignalCreation("/simpleAppCreated", signalAppStarted, zk, false);
+
+        Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
+
+        String time1 = String.valueOf(System.currentTimeMillis());
+
+        CountDownLatch signalEvent1Processed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
+
+        TestUtils.injectIntoStringSocketAdapter(time1);
+
+        // check event processed
+        Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
new file mode 100644
index 0000000..98dc728
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
@@ -0,0 +1,47 @@
+package org.apache.s4.core.apploading;
+
+import java.io.IOException;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+import org.apache.s4.fixtures.SocketAdapter;
+import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.wordcount.SentenceKeyFinder;
+import org.apache.s4.wordcount.StringEvent;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class SimpleApp extends App {
+    private SocketAdapter<StringEvent> socketAdapter;
+    
+    public SimpleApp () {}
+
+    @Override
+    protected void start() {
+        try {
+            final ZooKeeper zk = TestUtils.createZkClient();
+            zk.create("/simpleAppCreated", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.close();
+        } catch (Exception e) {
+            System.exit(-1);
+        }
+    }
+
+    @Override
+    protected void init() {
+        SimplePE prototype = createPE(SimplePE.class);
+        Stream<StringEvent> stream = createStream("stream", new SentenceKeyFinder(), prototype);
+        try {
+            socketAdapter = new SocketAdapter<StringEvent>(stream, new SocketAdapter.SentenceEventFactory());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void close() {
+        // TODO Auto-generated method stub
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleModule.java
new file mode 100644
index 0000000..982ef81
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleModule.java
@@ -0,0 +1,7 @@
+package org.apache.s4.core.apploading;
+
+import org.apache.s4.fixtures.FileBasedClusterManagementTestModule;
+
+public class SimpleModule extends FileBasedClusterManagementTestModule<SimpleApp> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
new file mode 100644
index 0000000..e6a9976
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
@@ -0,0 +1,66 @@
+package org.apache.s4.core.apploading;
+
+import java.io.IOException;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.wordcount.StringEvent;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.LoggerFactory;
+
+
+public class SimplePE  extends ProcessingElement implements Watcher {
+
+    private ZooKeeper zk;
+
+    public SimplePE() {}
+    
+    public SimplePE(App app) {
+        super(app);
+    }
+    
+    public void onEvent(StringEvent event) {
+        try {
+            LoggerFactory.getLogger(getClass()).debug("processing envent {}", event.getString());
+            zk.create("/onEvent@"+event.getString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            zk.close();
+        } catch (KeeperException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void onCreate() {
+        if (zk == null) {
+            try {
+                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        // TODO Auto-generated method stub
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/A.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/A.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/A.java
new file mode 100644
index 0000000..7ae257d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/A.java
@@ -0,0 +1,51 @@
+package org.apache.s4.core.overloadgen;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+
+
+public class A extends ProcessingElement {
+    
+    public Class<? extends Event> processedEventClass;
+    public Class<? extends Event> processedTriggerEventClass;
+    boolean processedTriggerThroughGenericMethod = false;
+    
+    public void onEvent(Event event) {
+        processedEventClass = Event.class;
+    }
+        
+    public void onEvent(Event2 event) {
+        processedEventClass = event.getClass();
+    }
+    
+    public void onEvent(Event1 event) {
+        processedEventClass = event.getClass();
+    }
+
+    public void onEvent(Event1a event) {
+        processedEventClass = event.getClass();
+    }
+    
+    public void onTrigger(Event event) {
+        processedTriggerEventClass = event.getClass();
+        processedTriggerThroughGenericMethod = true;
+    }
+    
+    public void onTrigger(Event1 event ) {
+        processedTriggerEventClass = event.getClass();
+        processedTriggerThroughGenericMethod = false;
+    }
+    
+    
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+        
+    }
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/B.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/B.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/B.java
new file mode 100644
index 0000000..1d62b4a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/B.java
@@ -0,0 +1,21 @@
+package org.apache.s4.core.overloadgen;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class B extends ProcessingElement {
+    
+    
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/C.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/C.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/C.java
new file mode 100644
index 0000000..da16bf7
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/C.java
@@ -0,0 +1,25 @@
+package org.apache.s4.core.overloadgen;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class C extends ProcessingElement {
+    
+    public boolean processedEvent1Class = false;
+
+    public void onEvent(Event1 event) {
+        processedEvent1Class = true;
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/D.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/D.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/D.java
new file mode 100644
index 0000000..93167ed
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/D.java
@@ -0,0 +1,32 @@
+package org.apache.s4.core.overloadgen;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+
+
+public class D extends ProcessingElement {
+    
+    public boolean processedGenericEvent = false;
+    public boolean processedEvent1 = false;
+    
+    public void onEvent(Event event) {
+        processedGenericEvent =true;
+    }
+    
+    public void onEvent(Event1 event) {
+        processedEvent1 = true;
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event1.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event1.java
new file mode 100644
index 0000000..ef441ad
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event1.java
@@ -0,0 +1,7 @@
+package org.apache.s4.core.overloadgen;
+
+import org.apache.s4.base.Event;
+
+public class Event1 extends Event {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event1a.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event1a.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event1a.java
new file mode 100644
index 0000000..29392f2
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event1a.java
@@ -0,0 +1,7 @@
+package org.apache.s4.core.overloadgen;
+
+import org.apache.s4.base.Event;
+
+public class Event1a extends Event {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event2.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event2.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event2.java
new file mode 100644
index 0000000..98cfee0
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/Event2.java
@@ -0,0 +1,7 @@
+package org.apache.s4.core.overloadgen;
+
+import org.apache.s4.base.Event;
+
+public class Event2 extends Event {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/OverloadDispatcherTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/OverloadDispatcherTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/OverloadDispatcherTest.java
new file mode 100644
index 0000000..627df67
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/overloadgen/OverloadDispatcherTest.java
@@ -0,0 +1,81 @@
+package org.apache.s4.core.overloadgen;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.regex.Pattern;
+
+import junit.framework.Assert;
+
+import org.apache.s4.core.gen.OverloadDispatcher;
+import org.apache.s4.core.gen.OverloadDispatcherGenerator;
+import org.junit.Test;
+
+public class OverloadDispatcherTest {
+
+    @Test
+    public void testDispatchWithEventHierarchies() throws Exception {
+        OverloadDispatcherGenerator gen = new OverloadDispatcherGenerator(A.class);
+        OverloadDispatcher dispatcher = (OverloadDispatcher) gen.generate().newInstance();
+        A a = new A();
+        // input events
+        dispatcher.dispatchEvent(a, new Event1());
+        Assert.assertEquals(Event1.class, a.processedEventClass);
+        dispatcher.dispatchEvent(a, new Event1a());
+        Assert.assertEquals(Event1a.class, a.processedEventClass);
+        dispatcher.dispatchEvent(a, new Event2());
+        Assert.assertEquals(Event2.class, a.processedEventClass);
+       
+        // trigger events
+        dispatcher.dispatchTrigger(a, new Event2());
+        Assert.assertEquals(Event2.class, a.processedTriggerEventClass);
+        Assert.assertTrue(a.processedTriggerThroughGenericMethod);
+        dispatcher.dispatchTrigger(a, new Event1());
+        Assert.assertEquals(Event1.class, a.processedTriggerEventClass);
+        Assert.assertFalse(a.processedTriggerThroughGenericMethod);
+    }
+    
+    @Test
+    public void testDispatchWithSingleMethod() throws Exception {
+        OverloadDispatcherGenerator gen = new OverloadDispatcherGenerator(C.class);
+        OverloadDispatcher dispatcher = (OverloadDispatcher) gen.generate().newInstance();
+        C c = new C();
+        dispatcher.dispatchEvent(c, new Event2());
+        Assert.assertFalse(c.processedEvent1Class);
+        dispatcher.dispatchEvent(c, new Event1());
+        Assert.assertTrue(c.processedEvent1Class);
+    }
+
+    @Test
+    public void testNoMatchingMethod() throws Exception {
+        PrintStream stdout = System.out;
+        try {
+            ByteArrayOutputStream tmpOut = new ByteArrayOutputStream();
+            System.setOut(new PrintStream(tmpOut));
+
+            OverloadDispatcherGenerator gen = new OverloadDispatcherGenerator(B.class);
+            OverloadDispatcher dispatcher = (OverloadDispatcher) gen.generate().newInstance();
+            B b = new B();
+            dispatcher.dispatchEvent(b, new Event1());
+            String output = tmpOut.toString().trim();
+            // use DOTALL to ignore previous lines in output debug mode
+            Assert.assertTrue(Pattern.compile("^.+OverloadDispatcher\\d+ - Cannot dispatch event "
+                    + "of type \\[" + Event1.class.getName() + "\\] to PE of type \\[" + B.class.getName()
+                    + "\\] : no matching onEvent method found$", Pattern.DOTALL).matcher(output).matches());
+        } finally {
+            System.setOut(stdout);
+        }
+
+    }
+    
+    @Test
+    public void testGenericEvent() throws Exception {
+        OverloadDispatcherGenerator gen = new OverloadDispatcherGenerator(D.class);
+        OverloadDispatcher dispatcher = (OverloadDispatcher) gen.generate().newInstance();
+        D d = new D();
+        dispatcher.dispatchEvent(d, new Event2());
+        Assert.assertTrue(d.processedGenericEvent);
+        dispatcher.dispatchEvent(d, new Event1());
+        Assert.assertTrue(d.processedEvent1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/CountTriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/CountTriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/CountTriggerTest.java
new file mode 100644
index 0000000..f8b0e40
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/CountTriggerTest.java
@@ -0,0 +1,16 @@
+package org.apache.s4.core.triggers;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CountTriggerTest extends TriggerTest {
+    
+    @Test
+    public void testEventCountBasedTrigger() throws Exception {
+        triggerType = TriggerType.COUNT_BASED;
+        Assert.assertTrue(createTriggerAppAndSendEvent().await(5, TimeUnit.SECONDS));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/NoTriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/NoTriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/NoTriggerTest.java
new file mode 100644
index 0000000..6feb783
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/NoTriggerTest.java
@@ -0,0 +1,17 @@
+package org.apache.s4.core.triggers;
+
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class NoTriggerTest extends TriggerTest {
+
+    @Test
+    public void testNoTrigger() throws Exception {
+        triggerType = TriggerType.NONE;
+        Assert.assertFalse(createTriggerAppAndSendEvent().await(5, TimeUnit.SECONDS));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TimeTriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TimeTriggerTest.java
new file mode 100644
index 0000000..8df087c
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TimeTriggerTest.java
@@ -0,0 +1,19 @@
+package org.apache.s4.core.triggers;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TimeTriggerTest extends TriggerTest {
+
+    @Test
+    public void testTimeBasedTrigger() throws Exception {
+        triggerType = TriggerType.TIME_BASED;
+        Assert.assertTrue(createTriggerAppAndSendEvent().await(5, TimeUnit.SECONDS));
+
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerTest.java
new file mode 100644
index 0000000..adb7bcf
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerTest.java
@@ -0,0 +1,75 @@
+package org.apache.s4.core.triggers;
+
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.fixtures.TestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/**
+ * tests from subclasses are forked in separate VMs, an easy way to avoid
+ * conflict with unavailable resources when instantiating new S4 nodes
+ */
+
+public abstract class TriggerTest {
+
+    private Factory zookeeperServerConnectionFactory;
+    public static TriggerType triggerType;
+    protected TriggeredApp app;
+
+    public enum TriggerType {
+        TIME_BASED, COUNT_BASED, NONE
+    }
+
+    @Before
+    public void prepare() throws IOException, InterruptedException, KeeperException {
+        TestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+    }
+
+    @After
+    public void cleanup() throws IOException, InterruptedException {
+        if (app != null) {
+            app.close();
+            app = null;
+        }
+        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+    }
+
+    protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
+        final ZooKeeper zk = TestUtils.createZkClient();
+        Injector injector = Guice.createInjector(new TriggeredModule());
+        app = injector.getInstance(TriggeredApp.class);
+        app.init();
+        app.start();
+        
+        String time1 = String.valueOf(System.currentTimeMillis());
+
+        CountDownLatch signalEvent1Processed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
+        
+        CountDownLatch signalEvent1Triggered = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
+
+        TestUtils.injectIntoStringSocketAdapter(time1);
+
+        // check event processed
+        Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
+
+        // return latch on trigger signal
+        return signalEvent1Triggered;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java
new file mode 100644
index 0000000..863f80e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java
@@ -0,0 +1,80 @@
+package org.apache.s4.core.triggers;
+
+
+import java.io.IOException;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.wordcount.StringEvent;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class TriggerablePE extends ProcessingElement implements Watcher {
+    
+    private ZooKeeper zk;
+
+    public TriggerablePE() {}
+    
+    public TriggerablePE(App app) {
+        super(app);
+    }
+    
+    public void onEvent(StringEvent event) {
+        try {
+            zk.create("/onEvent@"+event.getString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+    
+    @Override
+    protected void onCreate() {
+        if (zk == null) {
+            try {
+                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void onTrigger(StringEvent event) {
+        try {
+            zk.create("/onTrigger[StringEvent]@"+event.getString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+    
+    
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        // TODO Auto-generated method stub
+        
+    }
+    
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java
new file mode 100644
index 0000000..21c7ee2
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java
@@ -0,0 +1,58 @@
+package org.apache.s4.core.triggers;
+
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+import org.apache.s4.fixtures.SocketAdapter;
+import org.apache.s4.wordcount.SentenceKeyFinder;
+import org.apache.s4.wordcount.StringEvent;
+
+
+import com.google.inject.Inject;
+
+public class TriggeredApp extends App {
+
+    SocketAdapter<StringEvent> socketAdapter;
+
+    @Inject
+    public TriggeredApp() {
+        super();
+    }
+
+    @Override
+    protected void start() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void init() {
+
+        TriggerablePE prototype = createPE(TriggerablePE.class);
+        Stream<StringEvent> stream = createStream("stream", new SentenceKeyFinder(), prototype);
+        switch (TriggerTest.triggerType) {
+            case COUNT_BASED:
+                prototype.setTrigger(Event.class, 1, 0, TimeUnit.SECONDS);
+                break;
+            case TIME_BASED:
+                prototype.setTrigger(Event.class, 1, 1, TimeUnit.MILLISECONDS);
+            default:
+                break;
+        }
+
+        try {
+            socketAdapter = new SocketAdapter<StringEvent>(stream, new SocketAdapter.SentenceEventFactory());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void close() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java
new file mode 100644
index 0000000..cf7cd80
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java
@@ -0,0 +1,6 @@
+package org.apache.s4.core.triggers;
+
+import org.apache.s4.fixtures.FileBasedClusterManagementTestModule;
+
+public class TriggeredModule extends FileBasedClusterManagementTestModule<TriggeredApp> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
new file mode 100644
index 0000000..b189350
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
@@ -0,0 +1,78 @@
+package org.apache.s4.fixtures;
+
+import java.io.InputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public abstract class FileBasedClusterManagementTestModule<T> extends AbstractModule {
+
+    protected PropertiesConfiguration config = null;
+    private final Class<?> appClass;
+
+    protected FileBasedClusterManagementTestModule() {
+        // infer actual app class through "super type tokens" (this simple code
+        // assumes actual module class is a direct subclass from this one)
+        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+        Type[] fieldArgTypes = pt.getActualTypeArguments();
+        this.appClass = (Class<?>) fieldArgTypes[0];
+    }
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+            config.setProperty("cluster.lock_dir",
+                    config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        bind(appClass);
+        bind(Cluster.class);
+        bind(Hasher.class).to(DefaultHasher.class);
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        bind(Assignment.class).to(AssignmentFromFile.class);
+        bind(Topology.class).to(TopologyFromFile.class);
+        bind(Emitter.class).to(UDPEmitter.class);
+        bind(Listener.class).to(UDPListener.class);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
new file mode 100644
index 0000000..cefe8b3
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
@@ -0,0 +1,104 @@
+package org.apache.s4.fixtures;
+
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.apache.s4.core.Stream;
+import org.apache.s4.wordcount.KeyValueEvent;
+import org.apache.s4.wordcount.StringEvent;
+
+
+public class SocketAdapter<T extends StringEvent> {
+
+    static ServerSocket serverSocket;
+
+    /**
+     * Listens to incoming sentence and forwards them to a sentence Stream.
+     * Each sentence is sent through a new socket connection
+     * 
+     * @param stream
+     * @throws IOException
+     */
+    public SocketAdapter(final Stream<T> stream, final StringEventFactory<T> stringEventFactory) throws IOException {
+        Thread t = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                serverSocket = null;
+                Socket connectedSocket;
+                BufferedReader in = null;
+                try {
+                    serverSocket = new ServerSocket(12000);
+                    while (true) {
+                        connectedSocket = serverSocket.accept();
+                        in = new BufferedReader(new InputStreamReader(connectedSocket.getInputStream()));
+
+                        String line = in.readLine();
+                        System.out.println("read: " + line);
+                        stream.put(stringEventFactory.create(line)) ;
+                        connectedSocket.close();
+                    }
+
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    System.exit(-1);
+                } finally {
+                    if (in != null) {
+                        try {
+                            in.close();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    if (serverSocket != null) {
+                        try {
+                            serverSocket.close();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }
+        });
+        t.start();
+
+    }
+    
+    public void close()  {
+        if(serverSocket !=null) {
+            try {
+                serverSocket.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    interface StringEventFactory<T> {
+        T create(String string);
+    }
+    
+    public static class SentenceEventFactory implements StringEventFactory<StringEvent> {
+
+        @Override
+        public StringEvent create(String string) {
+            return new StringEvent(string);
+        }
+        
+    }
+    
+    public static class KeyValueEventFactory implements StringEventFactory<KeyValueEvent> {
+
+        @Override
+        public KeyValueEvent create(String string) {
+            return new KeyValueEvent(string);
+        }
+        
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java
new file mode 100644
index 0000000..91690bd
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java
@@ -0,0 +1,451 @@
+package org.apache.s4.fixtures;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.Main;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
+ * conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
+ * latches through zookeeper - etc...
+ * 
+ */
+public class TestUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestUtils.class);
+
+    public static final int ZK_PORT = 21810;
+    public static final int INITIAL_BOOKIE_PORT = 5000;
+    public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
+    public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+            + "storage");
+    public static ServerSocket serverSocket;
+    static {
+        logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
+    }
+
+    public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
+        return forkProcess(App.class.getName(), moduleClass.getName(), appClass.getName());
+    }
+
+    public static Process forkS4Node() throws IOException, InterruptedException {
+        return forkProcess(Main.class.getName(), new String[] {});
+    }
+
+    private static Process forkProcess(String mainClass, String... args) throws IOException, InterruptedException {
+        List<String> cmdList = new ArrayList<String>();
+        cmdList.add("java");
+        cmdList.add("-cp");
+        cmdList.add(System.getProperty("java.class.path"));
+        // cmdList.add("-Xdebug");
+        // cmdList.add("-Xnoagent");
+        //
+        // cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
+
+        cmdList.add(mainClass);
+        for (String arg : args) {
+            cmdList.add(arg);
+        }
+
+        System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+        ProcessBuilder pb = new ProcessBuilder(cmdList);
+
+        pb.directory(new File(System.getProperty("user.dir")));
+        pb.redirectErrorStream();
+        final Process process = pb.start();
+
+        // TODO some synchro with s4 platform ready state
+        Thread.sleep(2000);
+        try {
+            int exitValue = process.exitValue();
+            Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
+        } catch (IllegalThreadStateException ignored) {
+        }
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                String line;
+                try {
+                    line = br.readLine();
+                    while (line != null) {
+                        System.out.println(line);
+                        line = br.readLine();
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }).start();
+
+        return process;
+    }
+
+    public static void killS4App(Process forkedApp) throws IOException, InterruptedException {
+        if (forkedApp != null) {
+            forkedApp.destroy();
+        }
+    }
+
+    public static void writeStringToFile(String s, File f) throws IOException {
+        if (f.exists()) {
+            if (!f.delete()) {
+                throw new RuntimeException("Cannot delete file " + f.getAbsolutePath());
+            }
+        }
+
+        FileWriter fw = null;
+        try {
+            if (!f.createNewFile()) {
+                throw new RuntimeException("Cannot create new file : " + f.getAbsolutePath());
+            }
+            fw = new FileWriter(f);
+
+            fw.write(s);
+        } catch (IOException e) {
+            throw (e);
+        } finally {
+            if (fw != null) {
+                try {
+                    fw.close();
+                } catch (IOException e) {
+                    throw (e);
+                }
+            }
+        }
+    }
+
+    public static String readFile(File f) throws IOException {
+        BufferedReader br = null;
+        try {
+            br = new BufferedReader(new FileReader(f));
+            StringBuilder sb = new StringBuilder();
+            String line = br.readLine();
+            while (line != null) {
+                sb.append(line);
+                line = br.readLine();
+                if (line != null) {
+                    sb.append("\n");
+                }
+            }
+            return sb.toString();
+        } finally {
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (IOException e) {
+                    throw (e);
+                }
+            }
+        }
+
+    }
+
+    public static NIOServerCnxn.Factory startZookeeperServer() throws IOException, InterruptedException,
+            KeeperException {
+
+        List<String> cmdList = new ArrayList<String>();
+        final File zkDataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+                + "zookeeper" + File.separator + "data");
+        if (zkDataDir.exists()) {
+            TestUtils.deleteDirectoryContents(zkDataDir);
+        } else {
+            zkDataDir.mkdirs();
+        }
+
+        ZooKeeperServer zks = new ZooKeeperServer(zkDataDir, zkDataDir, 3000);
+        NIOServerCnxn.Factory nioZookeeperConnectionFactory = new NIOServerCnxn.Factory(new InetSocketAddress(ZK_PORT));
+        nioZookeeperConnectionFactory.startup(zks);
+        Assert.assertTrue("waiting for server being up", waitForServerUp("localhost", ZK_PORT, 4000));
+        return nioZookeeperConnectionFactory;
+
+    }
+
+    public static void stopZookeeperServer(NIOServerCnxn.Factory f) throws IOException, InterruptedException {
+        if (f != null) {
+            f.shutdown();
+            Assert.assertTrue("waiting for server down", waitForServerDown("localhost", ZK_PORT, 3000));
+        }
+    }
+
+    public static void deleteDirectoryContents(File dir) {
+        File[] files = dir.listFiles();
+        for (File file : files) {
+            if (file.isDirectory()) {
+                deleteDirectoryContents(file);
+            }
+            if (!file.delete()) {
+                throw new RuntimeException("could not delete : " + file);
+            }
+        }
+    }
+
+    public static String readFileAsString(File f) throws IOException {
+        FileReader fr = new FileReader(f);
+        StringBuilder sb = new StringBuilder("");
+        BufferedReader br = new BufferedReader(fr);
+        String line = br.readLine();
+        while (line != null) {
+            sb.append(line);
+            line = br.readLine();
+            if (line != null) {
+                sb.append("\n");
+            }
+        }
+        return sb.toString();
+
+    }
+
+    // TODO factor this code (see BasicFSStateStorage) - or use commons io or
+    // guava
+    public static byte[] readFileAsByteArray(File file) throws Exception {
+        FileInputStream in = null;
+        try {
+            in = new FileInputStream(file);
+
+            long length = file.length();
+
+            /*
+             * Arrays can only be created using int types, so ensure that the file size is not too big before we
+             * downcast to create the array.
+             */
+            if (length > Integer.MAX_VALUE) {
+                throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
+            }
+
+            byte[] buffer = new byte[(int) length];
+            int offSet = 0;
+            int numRead = 0;
+
+            while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
+                offSet += numRead;
+            }
+
+            if (offSet < buffer.length) {
+                throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
+                        + buffer.length + " bytes read");
+            }
+
+            in.close();
+            return buffer;
+
+        } finally {
+            if (in != null) {
+                in.close();
+            }
+        }
+    }
+
+    public static ZooKeeper createZkClient() throws IOException {
+        final ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 4000, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+            }
+        });
+        return zk;
+    }
+
+    public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk)
+            throws KeeperException, InterruptedException {
+
+        // by default delete existing nodes with same path
+        watchAndSignalCreation(path, latch, zk, false);
+    }
+
+    public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk,
+            boolean deleteIfExists) throws KeeperException, InterruptedException {
+
+        if (zk.exists(path, false) != null) {
+            if (deleteIfExists) {
+                zk.delete(path, -1);
+            } else {
+                latch.countDown();
+            }
+        }
+        zk.exists(path, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (EventType.NodeCreated.equals(event.getType())) {
+                    latch.countDown();
+                }
+            }
+        });
+    }
+
+    public static void watchAndSignalChangedChildren(String path, final CountDownLatch latch, final ZooKeeper zk)
+            throws KeeperException, InterruptedException {
+
+        zk.getChildren(path, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (EventType.NodeChildrenChanged.equals(event.getType())) {
+                    latch.countDown();
+                }
+            }
+        });
+    }
+
+    // from zookeeper's codebase
+    public static boolean waitForServerUp(String host, int port, long timeout) {
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                // if there are multiple hostports, just take the first one
+                String result = send4LetterWord(host, port, "stat");
+                if (result.startsWith("Zookeeper version:")) {
+                    return true;
+                }
+            } catch (IOException ignored) {
+                // ignore as this is expected
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+
+    // from zookeeper's codebase
+    public static String send4LetterWord(String host, int port, String cmd) throws IOException {
+        Socket sock = new Socket(host, port);
+        BufferedReader reader = null;
+        try {
+            OutputStream outstream = sock.getOutputStream();
+            outstream.write(cmd.getBytes());
+            outstream.flush();
+            // this replicates NC - close the output stream before reading
+            sock.shutdownOutput();
+
+            reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+            StringBuilder sb = new StringBuilder();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                sb.append(line + "\n");
+            }
+            return sb.toString();
+        } finally {
+            sock.close();
+            if (reader != null) {
+                reader.close();
+            }
+        }
+    }
+
+    // from zookeeper's codebase
+    public static boolean waitForServerDown(String host, int port, long timeout) {
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                send4LetterWord(host, port, "stat");
+            } catch (IOException e) {
+                return true;
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+
+    public static void cleanupTmpDirs() {
+        if (TestUtils.DEFAULT_TEST_OUTPUT_DIR.exists()) {
+            deleteDirectoryContents(TestUtils.DEFAULT_TEST_OUTPUT_DIR);
+        }
+        TestUtils.DEFAULT_STORAGE_DIR.mkdirs();
+
+    }
+
+    public static void stopSocketAdapter() throws IOException {
+        if (serverSocket != null) {
+            serverSocket.close();
+        }
+    }
+
+    /**
+     * gradle and eclipse have different directories for output files This is justified here
+     * http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
+     * 
+     * A consequence is that for tests to reference compiled files, we need to resolve the corresponding directory at
+     * runtime.
+     * 
+     * This is what this method does
+     * 
+     * @return directory containing the compiled test classes for this project and execution environment.
+     */
+    public static File findDirForCompiledTestClasses() {
+        String userDir = System.getProperty("user.dir");
+        String classpath = System.getProperty("java.class.path");
+        System.out.println(userDir);
+        System.out.println(classpath);
+        if (classpath.contains(userDir + "/bin")) {
+            // eclipse classpath
+            return new File(userDir + "/bin");
+        } else if (classpath.contains(userDir + "/build/classes/test")) {
+            // gradle classpath
+            return new File(userDir + "/build/classes/test");
+        } else {
+            // TODO other IDEs
+            throw new RuntimeException("Cannot find path for compiled test classes");
+        }
+
+    }
+
+    public static void injectIntoStringSocketAdapter(String string) throws IOException {
+        Socket socket = null;
+        PrintWriter writer = null;
+        try {
+            socket = new Socket("localhost", 12000);
+            writer = new PrintWriter(socket.getOutputStream(), true);
+            writer.println(string);
+        } catch (IOException e) {
+            e.printStackTrace();
+            System.exit(-1);
+        } finally {
+            if (socket != null) {
+                socket.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
new file mode 100644
index 0000000..b3ccf10
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -0,0 +1,82 @@
+package org.apache.s4.fixtures;
+
+import java.io.InputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.netty.NettyEmitter;
+import org.apache.s4.comm.netty.NettyListener;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+// also uses netty
+public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
+
+    protected PropertiesConfiguration config = null;
+    private final Class<?> appClass;
+
+    protected ZkBasedClusterManagementTestModule() {
+        // infer actual app class through "super type tokens" (this simple code
+        // assumes actual module class is a direct subclass from this one)
+        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+        Type[] fieldArgTypes = pt.getActualTypeArguments();
+        this.appClass = (Class<?>) fieldArgTypes[0];
+    }
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+            config.setProperty("cluster.zk_address",
+                    config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + TestUtils.ZK_PORT));
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        bind(appClass);
+        bind(Cluster.class);
+        bind(Hasher.class).to(DefaultHasher.class);
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        bind(Assignment.class).to(AssignmentFromZK.class);
+        bind(Topology.class).to(TopologyFromZK.class);
+        bind(Emitter.class).to(NettyEmitter.class);
+        bind(Listener.class).to(NettyListener.class);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/KeyValueEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/KeyValueEvent.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/KeyValueEvent.java
new file mode 100644
index 0000000..8bd4ddf
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/KeyValueEvent.java
@@ -0,0 +1,23 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.wordcount.StringEvent;
+
+public class KeyValueEvent extends StringEvent {
+
+    String key;
+    String value;
+
+    public KeyValueEvent(String keyValue) {
+        key = keyValue.split(";")[0];
+        value = keyValue.split(";")[1];
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/KeyValueKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/KeyValueKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/KeyValueKeyFinder.java
new file mode 100644
index 0000000..8ed767a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/KeyValueKeyFinder.java
@@ -0,0 +1,18 @@
+package org.apache.s4.wordcount;
+
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class KeyValueKeyFinder implements KeyFinder<KeyValueEvent> {
+
+    public static final String UNIQUE_KEY = "KEY";
+
+    @Override
+    public List<String> get(final KeyValueEvent event) {
+        return Arrays.asList(new String[] {UNIQUE_KEY});
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java
new file mode 100644
index 0000000..e565864
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.wordcount;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class SentenceKeyFinder implements KeyFinder<StringEvent> {
+
+    private static final String SENTENCE_KEY = "sentence";
+
+    @SuppressWarnings("serial")
+    @Override
+    public List<String> get(StringEvent event) {
+        return new ArrayList<String>(){{add(SENTENCE_KEY);}};
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/StringEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/StringEvent.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/StringEvent.java
new file mode 100644
index 0000000..13db453
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/StringEvent.java
@@ -0,0 +1,24 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.base.Event;
+
+public class StringEvent extends Event {
+    
+    String string;
+    
+    public StringEvent() {}
+    
+    public StringEvent(String string) {
+        super();
+        this.string = string;
+    }
+    
+    public void setString(String string) {
+        this.string = string;
+    }
+    
+    public String getString() {
+        return string;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/Word.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/Word.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/Word.java
new file mode 100644
index 0000000..eacb80a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/Word.java
@@ -0,0 +1,23 @@
+package org.apache.s4.wordcount;
+
+
+public class Word {
+
+    private String word;
+
+    public Word() {
+    }
+
+    public Word(String word) {
+        this.word = word;
+    }
+
+    public void setWord(String word) {
+        this.word = word;
+    }
+
+    public String getWord() {
+        return word;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
new file mode 100644
index 0000000..48f5b23
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -0,0 +1,114 @@
+package org.apache.s4.wordcount;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.fixtures.TestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class WordClassifierPE extends ProcessingElement implements Watcher {
+
+    TreeMap<String, Integer> counts = new TreeMap<String, Integer>();
+    private int counter;
+    transient private ZooKeeper zk;
+
+    private WordClassifierPE () {}
+
+    public WordClassifierPE(App app) {
+        super(app);
+    }
+    
+    public void onEvent(WordCountEvent event) {
+        try {
+            WordCountEvent wcEvent = event;
+            if (zk == null) {
+                try {
+                    zk = new ZooKeeper("localhost:21810", 4000, this);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            System.out.println("seen: " + wcEvent.getWord() + "/" + wcEvent.getCount());
+
+            if (!counts.containsKey(wcEvent.getWord())
+                    || (counts.containsKey(wcEvent.getWord()) && counts.get(wcEvent.getWord()).compareTo(
+                            wcEvent.getCount()) < 0)) {
+                // this is because wcEvent events arrive unordered
+                counts.put(wcEvent.getWord(), wcEvent.getCount());
+            }
+            ++counter;
+            if (counter == WordCountTest.TOTAL_WORDS) {
+                File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+                if (results.exists()) {
+                    if (!results.delete()) {
+                        throw new RuntimeException("cannot delete results file");
+                    }
+                }
+                Set<Entry<String, Integer>> entrySet = counts.entrySet();
+                StringBuilder sb = new StringBuilder();
+                for (Entry<String, Integer> entry : entrySet) {
+                    sb.append(entry.getKey() + "=" + entry.getValue() + ";");
+                }
+                TestUtils.writeStringToFile(sb.toString(), results);
+
+                zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } else {
+                // NOTE: this will fail if we did not recover the latest
+                // counter,
+                // because there is already a counter with this number in
+                // zookeeper
+                zk.create("/classifierIteration_" + counter, new byte[counter], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+                Logger.getLogger("s4-ft").debug("wrote classifier iteration ["+counter+"]");
+                System.out.println("wrote classifier iteration ["+counter+"]");
+                // check if we are allowed to continue
+                if (null == zk.exists("/continue_" + counter, null)) {
+                    CountDownLatch latch = new CountDownLatch(1);
+                    TestUtils.watchAndSignalCreation("/continue_" + counter, latch, zk);
+                    latch.await();
+                } else {
+                    zk.delete("/continue_" + counter, -1);
+                    System.out.println("");
+                }
+            }
+
+        } catch (Exception e) {
+            // TODO should propagate some exceptions
+            e.printStackTrace();
+        }
+
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        // TODO Auto-generated method stub
+ 
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java
new file mode 100644
index 0000000..4d0d31a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java
@@ -0,0 +1,71 @@
+package org.apache.s4.wordcount;
+
+
+import java.io.IOException;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+import org.apache.s4.fixtures.SocketAdapter;
+
+
+import com.google.inject.Inject;
+
+public class WordCountApp extends App {
+
+    protected boolean checkpointing = false;
+    SocketAdapter<StringEvent> socketAdapter;
+
+    @Inject
+    public WordCountApp() {
+        super();
+    }
+
+    @Override
+    protected void start() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void init() {
+
+        WordClassifierPE wordClassifierPrototype = createPE(WordClassifierPE.class);
+        Stream<WordCountEvent> wordCountStream = createStream("words counts stream", new WordCountKeyFinder(),
+                wordClassifierPrototype);
+        WordCounterPE wordCounterPrototype = createPE(WordCounterPE.class);
+//        wordCounterPrototype.setTrigger(WordSeenEvent.class, 1, 0, null);
+        wordCounterPrototype.setWordClassifierStream(wordCountStream);
+        Stream<WordSeenEvent> wordSeenStream = createStream("words seen stream", new WordSeenKeyFinder(),
+                wordCounterPrototype);
+        WordSplitterPE wordSplitterPrototype = createPE(WordSplitterPE.class);
+        wordSplitterPrototype.setWordSeenStream(wordSeenStream);
+        Stream<StringEvent> sentenceStream = createStream("sentences stream", new SentenceKeyFinder(),
+                wordSplitterPrototype);
+
+        try {
+            socketAdapter = new SocketAdapter<StringEvent>(sentenceStream, new SocketAdapter.SentenceEventFactory());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        if (checkpointing) {
+
+            // TODO move to subclass
+
+            // checkpoint word classifier because it maintains a sync counter
+            // for the test
+            // LoggerFactory.getLogger(getClass()).info("setting checkpointing for word classifier and word counter");
+            // wordClassifierPrototype.setCheckpointingFrequency(1);
+            // // checkpoint word counter because it maintains word counts
+            // wordCounterPrototype.setCheckpointingFrequency(1);
+        }
+
+    }
+
+    @Override
+    protected void close() {
+        socketAdapter.close();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountEvent.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountEvent.java
new file mode 100644
index 0000000..fa288d3
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountEvent.java
@@ -0,0 +1,26 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.base.Event;
+
+public class WordCountEvent extends Event {
+
+        private String word;
+        private int count;
+        
+        protected WordCountEvent() {}
+        
+        public WordCountEvent(String word, int count) {
+            super();
+            this.word = word;
+            this.count = count;
+        }
+
+        public String getWord() {
+            return word;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountKeyFinder.java
new file mode 100644
index 0000000..74c044c
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountKeyFinder.java
@@ -0,0 +1,17 @@
+package org.apache.s4.wordcount;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class WordCountKeyFinder implements KeyFinder<WordCountEvent> {
+
+    @SuppressWarnings("serial")
+    @Override
+    public List<String> get(WordCountEvent event) {
+        return new ArrayList<String>(){{add("classifier");}};
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
new file mode 100644
index 0000000..4af6a4f
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
@@ -0,0 +1,7 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.fixtures.FileBasedClusterManagementTestModule;
+
+public class WordCountModule extends FileBasedClusterManagementTestModule<WordCountApp> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
new file mode 100644
index 0000000..9439e29
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -0,0 +1,93 @@
+package org.apache.s4.wordcount;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.s4.core.App;
+import org.apache.s4.fixtures.TestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class WordCountTest {
+    
+    public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
+    public static final int SENTENCE_1_TOTAL_WORDS = SENTENCE_1.split(" ").length;
+    public static final String SENTENCE_2 = "doobie doobie da";
+    public static final int SENTENCE_2_TOTAL_WORDS = SENTENCE_2.split(" ").length;
+    public static final String SENTENCE_3 = "doobie";
+    public static final int SENTENCE_3_TOTAL_WORDS = SENTENCE_3.split(" ").length;
+    public static final String FLAG = ";";
+    public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS
+            + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
+    private static Factory zookeeperServerConnectionFactory;
+    
+    @Before
+    public void prepare() throws IOException, InterruptedException, KeeperException {
+        TestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+
+    }
+
+    /**
+     * A simple word count application:
+     * 
+     * 
+     * 
+     * 
+     *           sentences                      words                    word counts
+     * Adapter ------------> WordSplitterPE -----------> WordCounterPE -------------> WordClassifierPE 
+     *                       key = "sentence"             key = word                   key="classifier"
+     *                       (should be *)               
+     * 
+     * 
+     * The test consists in checking that words are correctly counted.
+     * 
+     * 
+     */
+    @Test
+    public void testSimple() throws Exception {
+        
+        final ZooKeeper zk = TestUtils.createZkClient();
+        
+        App.main(new String[]{WordCountModule.class.getName(), WordCountApp.class.getName()});
+        
+
+        CountDownLatch signalTextProcessed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
+                zk);
+        
+        // add authorizations for processing
+        for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS
+                + 1; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL);
+        }
+        TestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
+        TestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
+        TestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
+        signalTextProcessed.await();
+        File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR
+                + File.separator + "wordcount");
+        String s = TestUtils.readFile(results);
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+        
+    }
+
+    @After
+    public void cleanup() throws IOException, InterruptedException {
+        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+
+    }
+
+}


Mime
View raw message