incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [21/50] [abbrv] renamed test packages
Date Tue, 03 Jan 2012 14:03:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
new file mode 100644
index 0000000..2bb3a82
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
@@ -0,0 +1,43 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class WordCounterPE extends ProcessingElement {
+    
+    int wordCounter;
+    transient Stream<WordCountEvent> wordClassifierStream;
+
+    private WordCounterPE() {}
+    
+    public WordCounterPE(App app) {
+        super(app);
+    }
+    
+    public void setWordClassifierStream(Stream<WordCountEvent> stream) {
+        this.wordClassifierStream = stream;
+    }
+
+    public void onEvent(WordSeenEvent event) { 
+        wordCounter++;
+        System.out.println("seen word " + event.getWord());
+        // NOTE: it seems the id is the key for now...     
+        wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+        
+    }
+
+   
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSeenEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSeenEvent.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSeenEvent.java
new file mode 100644
index 0000000..932f559
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSeenEvent.java
@@ -0,0 +1,21 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.base.Event;
+
+public class WordSeenEvent extends Event {
+    
+    private String word;
+    
+    protected WordSeenEvent() {}
+
+    public WordSeenEvent(String word) {
+        super();
+        this.word = word;
+    }
+
+    public String getWord() {
+        return word;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSeenKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSeenKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSeenKeyFinder.java
new file mode 100644
index 0000000..676843d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSeenKeyFinder.java
@@ -0,0 +1,18 @@
+package org.apache.s4.wordcount;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class WordSeenKeyFinder implements KeyFinder<WordSeenEvent> {
+
+    @Override
+    public List<String> get(WordSeenEvent event) {
+        List<String> key = new ArrayList<String>();
+        key.add(event.getWord());
+        return key;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java
new file mode 100644
index 0000000..745f521
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java
@@ -0,0 +1,40 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+
+public class WordSplitterPE extends ProcessingElement {
+    
+    private Stream<WordSeenEvent> wordSeenStream;
+
+    public WordSplitterPE(App app) {
+        super(app);
+    }
+
+    public void onEvent(StringEvent event) {
+        StringEvent sentence = event;
+        String[] split = sentence.getString().split(" ");
+        for (String word : split) {
+            wordSeenStream.put(new WordSeenEvent(word));
+        }
+    }
+    
+    public void setWordSeenStream(Stream<WordSeenEvent> stream) {
+        this.wordSeenStream = stream;
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
new file mode 100644
index 0000000..87814c5
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
@@ -0,0 +1,9 @@
+package org.apache.s4.wordcount.zk;
+
+import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+import org.apache.s4.wordcount.WordCountApp;
+
+
+public class WordCountModuleZk extends ZkBasedClusterManagementTestModule<WordCountApp> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
new file mode 100644
index 0000000..6e89e54
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
@@ -0,0 +1,104 @@
+package org.apache.s4.wordcount.zk;
+
+import static org.apache.s4.wordcount.WordCountTest.*;
+import static org.junit.Assert.*;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.core.App;
+import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.wordcount.WordCountApp;
+import org.apache.s4.wordcount.WordCountModule;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class WordCountTestZk {
+
+    private ZkServer zkServer;
+    private ZkClient zkClient;
+
+    @Before
+    public void prepare() {
+
+        String dataDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
+        String logDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+        TestUtils.cleanupTmpDirs();
+
+        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+            @Override
+            public void createDefaultNameSpace(ZkClient zkClient) {
+
+            }
+        };
+
+        zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, TestUtils.ZK_PORT);
+        zkServer.start();
+
+        // zkClient = zkServer.getZkClient();
+        String zookeeperAddress = "localhost:" + TestUtils.ZK_PORT;
+        zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
+
+        ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
+        zkClient2.getCreationTime("/");
+        TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
+        final String clusterName = "s4-test-cluster";
+        taskSetup.clean(clusterName);
+        taskSetup.setup(clusterName, 1);
+        // final CountDownLatch latch = new CountDownLatch(10);
+        // for (int i = 0; i < 10; i++) {
+        // Runnable runnable = new Runnable() {
+        //
+        // @Override
+        // public void run() {
+        // AssignmentFromZK assignmentFromZK;
+        // try {
+        // assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
+        // ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+        // latch.countDown();
+        // } catch (Exception e) {
+        // e.printStackTrace();
+        // }
+        // }
+        // };
+        // Thread t = new Thread(runnable);
+        // t.start();
+        // }
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        final ZooKeeper zk = TestUtils.createZkClient();
+
+        App.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
+
+        CountDownLatch signalTextProcessed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+
+        // add authorizations for processing
+        for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+        TestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
+        TestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
+        TestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
+        signalTextProcessed.await();
+        File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+        String s = TestUtils.readFile(results);
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/TestCircularFifoBuffer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/TestCircularFifoBuffer.java b/subprojects/s4-core/src/test/java/test/s4/core/TestCircularFifoBuffer.java
deleted file mode 100644
index 7e109fe..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/TestCircularFifoBuffer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package test.s4.core;
-
-import org.apache.commons.collections15.buffer.CircularFifoBuffer;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-public class TestCircularFifoBuffer extends TestCase {
-
-    protected void setUp() {
-
-    }
-
-    public void test1() {
-
-        System.out.println("Buffer size is 10.\n");
-        CircularFifoBuffer<Integer> circularBuffer = new CircularFifoBuffer<Integer>(
-                10);
-
-        System.out.println("Add ints 100-114.");
-        for (int i = 0; i < 15; i++) {
-            circularBuffer.add(i + 100);
-        }
-
-        System.out.println("Iterate.");
-        int j = 5;
-        for(Integer num : circularBuffer) {
-            System.out.print(num + " ");
-            Assert.assertEquals(j + 100, num.intValue());
-            j++;
-        }
-        System.out.println("\nLeast recent value: " + circularBuffer.get());
-        Assert.assertEquals(105, circularBuffer.get().intValue());
-        System.out.println("\n");
-        
-        circularBuffer.clear();
-        
-        /* Less than max size. */
-        System.out.println("Clear and add ints 200-204.");
-        for (int i = 0; i < 5; i++) {
-            circularBuffer.add(i + 200);
-        }
-        
-        System.out.println("Iterate.");
-        int z = 0;
-        for(Integer num : circularBuffer) {
-            System.out.print(num + " ");
-            Assert.assertEquals(z + 200, num.intValue());
-            z++;
-        }
-        System.out.println("\n");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/apploading/AppLoadingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/apploading/AppLoadingTest.java b/subprojects/s4-core/src/test/java/test/s4/core/apploading/AppLoadingTest.java
deleted file mode 100644
index e6be074..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/apploading/AppLoadingTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package test.s4.core.apploading;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.jar.JarEntry;
-import java.util.jar.JarOutputStream;
-
-import org.apache.s4.core.Server;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import test.s4.fixtures.TestUtils;
-
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-
-/**
- * 
- * Tests packaging and deployment of an S4 app
- * 
- */
-public class AppLoadingTest {
-
-    public static final long ZOOKEEPER_PORT = 21810;
-    private static Factory zookeeperServerConnectionFactory = null;
-    private Process forkedApp;
-
-    @Before
-    public void prepare() throws Exception {
-        TestUtils.cleanupTmpDirs();
-        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
-        final ZooKeeper zk = TestUtils.createZkClient();
-        try {
-            zk.delete("/simpleAppCreated", -1);
-        } catch (Exception ignored) {
-        }
-
-        zk.close();
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
-        TestUtils.killS4App(forkedApp);
-    }
-
-    @Ignore("fix paths")
-    @Test
-    public void testA() throws Exception {
-
-        // add all classes from counter app
-        File rootAppDir = new File(new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath()
-                + "/s4-example/bin");
-        File appFilesDir = new File(rootAppDir, "org/apache/s4/example/counter");
-        generateS4RFromDirectoryContents(rootAppDir, appFilesDir, "counterExample",
-                "org.apache.s4.example.counter.MyApp");
-
-        forkedApp = TestUtils.forkS4Node();
-        Thread.sleep(15000);
-    }
-
-    private void generateS4RFromDirectoryContents(File rootAppDir, File appFilesDir, String s4rName, String appClassName)
-            throws IOException, FileNotFoundException {
-        Collection<File> s4rFiles = listFilesRecursively(appFilesDir);
-        File jarFile = new File(System.getProperty("user.dir") + "/bin/apps/" + s4rName + ".s4r");
-        Files.createParentDirs(jarFile);
-        FileOutputStream fos = new FileOutputStream(jarFile);
-        JarOutputStream jos = new JarOutputStream(fos);
-        System.out.println(System.getProperty("java.class.path"));
-        for (File file : s4rFiles) {
-            JarEntry jarEntry = new JarEntry(file.getAbsolutePath().substring(rootAppDir.getAbsolutePath().length()));
-            jos.putNextEntry(jarEntry);
-            ByteStreams.copy(Files.newInputStreamSupplier(file), jos);
-        }
-        // add manifest
-        File manifest = File.createTempFile("s4app", "manifest");
-        String manifestContents = "Manifest-Version: 1.0\n" + Server.MANIFEST_S4_APP_CLASS + ": " + appClassName + "\n";
-        Files.write(manifestContents, manifest, Charset.forName("UTF-8"));
-        JarEntry jarEntry = new JarEntry("META-INF/MANIFEST.MF");
-        jos.putNextEntry(jarEntry);
-        ByteStreams.copy(Files.newInputStreamSupplier(manifest), jos);
-
-        jos.close();
-    }
-
-    private Collection<File> listFilesRecursively(File dir) {
-        if (dir.isDirectory()) {
-            File[] listFiles = dir.listFiles();
-            List<File> filesToAdd = new ArrayList<File>();
-            if (listFiles.length != 0) {
-                for (File file : listFiles) {
-                    if (file.isFile()) {
-                        filesToAdd.add(file);
-                    } else if (file.isDirectory()) {
-                        filesToAdd.addAll(listFilesRecursively(file));
-                    }
-                }
-            }
-            return filesToAdd;
-        } else {
-            // TODO throw exception
-            return null;
-        }
-    }
-
-    /**
-     * 
-     * 1. generates an s4r package from classes in the apploading package (TODO process still to be improved), 2.
-     * deploys it to bin/apps 3. starts a forked S4 node, which loads apps from bin/apps 4. verifies app is working (s4
-     * app started, event correctly processed)
-     * 
-     * NOTE: we'll need to add an automatic test for which we make sure code cannot be in the classpath
-     */
-    @Test
-    public void testAppLoading() throws Exception {
-
-        // TODO fix paths
-
-        final ZooKeeper zk = TestUtils.createZkClient();
-
-        File rootAppDir = TestUtils.findDirForCompiledTestClasses();
-
-        File appFilesDir = new File(rootAppDir, "test/s4/core/apploading");
-        // 1. create app jar and place it in tmp/s4-apps
-        generateS4RFromDirectoryContents(rootAppDir, appFilesDir, "appLoadingTest", SimpleApp.class.getName());
-
-        CountDownLatch signalAppStarted = new CountDownLatch(1);
-        // 2. start s4 node and check results
-        forkedApp = TestUtils.forkS4Node();
-
-        // TODO wait for ready state (zk node available)
-        Thread.sleep(5000);
-
-        // note: make sure we don't delete existing znode if it was already created
-        TestUtils.watchAndSignalCreation("/simpleAppCreated", signalAppStarted, zk, false);
-
-        Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
-
-        String time1 = String.valueOf(System.currentTimeMillis());
-
-        CountDownLatch signalEvent1Processed = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
-
-        TestUtils.injectIntoStringSocketAdapter(time1);
-
-        // check event processed
-        Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleApp.java b/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleApp.java
deleted file mode 100644
index c49d6d3..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleApp.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package test.s4.core.apploading;
-
-import java.io.IOException;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.Stream;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-
-import test.s4.fixtures.SocketAdapter;
-import test.s4.fixtures.TestUtils;
-import test.s4.wordcount.SentenceKeyFinder;
-import test.s4.wordcount.StringEvent;
-
-public class SimpleApp extends App {
-    private SocketAdapter<StringEvent> socketAdapter;
-    
-    public SimpleApp () {}
-
-    @Override
-    protected void start() {
-        try {
-            final ZooKeeper zk = TestUtils.createZkClient();
-            zk.create("/simpleAppCreated", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zk.close();
-        } catch (Exception e) {
-            System.exit(-1);
-        }
-    }
-
-    @Override
-    protected void init() {
-        SimplePE prototype = createPE(SimplePE.class);
-        Stream<StringEvent> stream = createStream("stream", new SentenceKeyFinder(), prototype);
-        try {
-            socketAdapter = new SocketAdapter<StringEvent>(stream, new SocketAdapter.SentenceEventFactory());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    protected void close() {
-        // TODO Auto-generated method stub
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleModule.java b/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleModule.java
deleted file mode 100644
index 205b8ac..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleModule.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package test.s4.core.apploading;
-
-import test.s4.fixtures.FileBasedClusterManagementTestModule;
-
-public class SimpleModule extends FileBasedClusterManagementTestModule<SimpleApp> {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimplePE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimplePE.java b/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimplePE.java
deleted file mode 100644
index 3325dc0..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimplePE.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package test.s4.core.apploading;
-
-import java.io.IOException;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.LoggerFactory;
-
-import test.s4.fixtures.TestUtils;
-import test.s4.wordcount.StringEvent;
-
-public class SimplePE  extends ProcessingElement implements Watcher {
-
-    private ZooKeeper zk;
-
-    public SimplePE() {}
-    
-    public SimplePE(App app) {
-        super(app);
-    }
-    
-    public void onEvent(StringEvent event) {
-        try {
-            LoggerFactory.getLogger(getClass()).debug("processing envent {}", event.getString());
-            zk.create("/onEvent@"+event.getString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-            zk.close();
-        } catch (KeeperException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void onCreate() {
-        if (zk == null) {
-            try {
-                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        // TODO Auto-generated method stub
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/A.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/A.java b/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/A.java
deleted file mode 100644
index 1c98f2c..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/A.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package test.s4.core.overloadgen;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.ProcessingElement;
-
-
-public class A extends ProcessingElement {
-    
-    public Class<? extends Event> processedEventClass;
-    public Class<? extends Event> processedTriggerEventClass;
-    boolean processedTriggerThroughGenericMethod = false;
-    
-    public void onEvent(Event event) {
-        processedEventClass = Event.class;
-    }
-        
-    public void onEvent(Event2 event) {
-        processedEventClass = event.getClass();
-    }
-    
-    public void onEvent(Event1 event) {
-        processedEventClass = event.getClass();
-    }
-
-    public void onEvent(Event1a event) {
-        processedEventClass = event.getClass();
-    }
-    
-    public void onTrigger(Event event) {
-        processedTriggerEventClass = event.getClass();
-        processedTriggerThroughGenericMethod = true;
-    }
-    
-    public void onTrigger(Event1 event ) {
-        processedTriggerEventClass = event.getClass();
-        processedTriggerThroughGenericMethod = false;
-    }
-    
-    
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-        
-    }
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-        
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/B.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/B.java b/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/B.java
deleted file mode 100644
index 4a518a1..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/B.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package test.s4.core.overloadgen;
-
-import org.apache.s4.core.ProcessingElement;
-
-public class B extends ProcessingElement {
-    
-    
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/C.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/C.java b/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/C.java
deleted file mode 100644
index 91776a0..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/C.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package test.s4.core.overloadgen;
-
-import org.apache.s4.core.ProcessingElement;
-
-public class C extends ProcessingElement {
-    
-    public boolean processedEvent1Class = false;
-
-    public void onEvent(Event1 event) {
-        processedEvent1Class = true;
-    }
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/D.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/D.java b/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/D.java
deleted file mode 100644
index c077f2a..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/D.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package test.s4.core.overloadgen;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.ProcessingElement;
-
-
-public class D extends ProcessingElement {
-    
-    public boolean processedGenericEvent = false;
-    public boolean processedEvent1 = false;
-    
-    public void onEvent(Event event) {
-        processedGenericEvent =true;
-    }
-    
-    public void onEvent(Event1 event) {
-        processedEvent1 = true;
-    }
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event1.java b/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event1.java
deleted file mode 100644
index 97d1636..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event1.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package test.s4.core.overloadgen;
-
-import org.apache.s4.base.Event;
-
-public class Event1 extends Event {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event1a.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event1a.java b/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event1a.java
deleted file mode 100644
index b218299..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event1a.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package test.s4.core.overloadgen;
-
-import org.apache.s4.base.Event;
-
-public class Event1a extends Event {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event2.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event2.java b/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event2.java
deleted file mode 100644
index 0277835..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/Event2.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package test.s4.core.overloadgen;
-
-import org.apache.s4.base.Event;
-
-public class Event2 extends Event {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/OverloadDispatcherTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/OverloadDispatcherTest.java b/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/OverloadDispatcherTest.java
deleted file mode 100644
index 225deb3..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/overloadgen/OverloadDispatcherTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package test.s4.core.overloadgen;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.util.regex.Pattern;
-
-import junit.framework.Assert;
-
-import org.apache.s4.core.gen.OverloadDispatcher;
-import org.apache.s4.core.gen.OverloadDispatcherGenerator;
-import org.junit.Test;
-
-public class OverloadDispatcherTest {
-
-    @Test
-    public void testDispatchWithEventHierarchies() throws Exception {
-        OverloadDispatcherGenerator gen = new OverloadDispatcherGenerator(A.class);
-        OverloadDispatcher dispatcher = (OverloadDispatcher) gen.generate().newInstance();
-        A a = new A();
-        // input events
-        dispatcher.dispatchEvent(a, new Event1());
-        Assert.assertEquals(Event1.class, a.processedEventClass);
-        dispatcher.dispatchEvent(a, new Event1a());
-        Assert.assertEquals(Event1a.class, a.processedEventClass);
-        dispatcher.dispatchEvent(a, new Event2());
-        Assert.assertEquals(Event2.class, a.processedEventClass);
-       
-        // trigger events
-        dispatcher.dispatchTrigger(a, new Event2());
-        Assert.assertEquals(Event2.class, a.processedTriggerEventClass);
-        Assert.assertTrue(a.processedTriggerThroughGenericMethod);
-        dispatcher.dispatchTrigger(a, new Event1());
-        Assert.assertEquals(Event1.class, a.processedTriggerEventClass);
-        Assert.assertFalse(a.processedTriggerThroughGenericMethod);
-    }
-    
-    @Test
-    public void testDispatchWithSingleMethod() throws Exception {
-        OverloadDispatcherGenerator gen = new OverloadDispatcherGenerator(C.class);
-        OverloadDispatcher dispatcher = (OverloadDispatcher) gen.generate().newInstance();
-        C c = new C();
-        dispatcher.dispatchEvent(c, new Event2());
-        Assert.assertFalse(c.processedEvent1Class);
-        dispatcher.dispatchEvent(c, new Event1());
-        Assert.assertTrue(c.processedEvent1Class);
-    }
-
-    @Test
-    public void testNoMatchingMethod() throws Exception {
-        PrintStream stdout = System.out;
-        try {
-            ByteArrayOutputStream tmpOut = new ByteArrayOutputStream();
-            System.setOut(new PrintStream(tmpOut));
-
-            OverloadDispatcherGenerator gen = new OverloadDispatcherGenerator(B.class);
-            OverloadDispatcher dispatcher = (OverloadDispatcher) gen.generate().newInstance();
-            B b = new B();
-            dispatcher.dispatchEvent(b, new Event1());
-            String output = tmpOut.toString().trim();
-            // use DOTALL to ignore previous lines in output debug mode
-            Assert.assertTrue(Pattern.compile("^.+OverloadDispatcher\\d+ - Cannot dispatch event "
-                    + "of type \\[" + Event1.class.getName() + "\\] to PE of type \\[" + B.class.getName()
-                    + "\\] : no matching onEvent method found$", Pattern.DOTALL).matcher(output).matches());
-        } finally {
-            System.setOut(stdout);
-        }
-
-    }
-    
-    @Test
-    public void testGenericEvent() throws Exception {
-        OverloadDispatcherGenerator gen = new OverloadDispatcherGenerator(D.class);
-        OverloadDispatcher dispatcher = (OverloadDispatcher) gen.generate().newInstance();
-        D d = new D();
-        dispatcher.dispatchEvent(d, new Event2());
-        Assert.assertTrue(d.processedGenericEvent);
-        dispatcher.dispatchEvent(d, new Event1());
-        Assert.assertTrue(d.processedEvent1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/triggers/CountTriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/triggers/CountTriggerTest.java b/subprojects/s4-core/src/test/java/test/s4/core/triggers/CountTriggerTest.java
deleted file mode 100644
index 9d135a2..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/triggers/CountTriggerTest.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package test.s4.core.triggers;
-
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class CountTriggerTest extends TriggerTest {
-    
-    @Test
-    public void testEventCountBasedTrigger() throws Exception {
-        triggerType = TriggerType.COUNT_BASED;
-        Assert.assertTrue(createTriggerAppAndSendEvent().await(5, TimeUnit.SECONDS));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/triggers/NoTriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/triggers/NoTriggerTest.java b/subprojects/s4-core/src/test/java/test/s4/core/triggers/NoTriggerTest.java
deleted file mode 100644
index 684eaec..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/triggers/NoTriggerTest.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package test.s4.core.triggers;
-
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.junit.Test;
-
-public class NoTriggerTest extends TriggerTest {
-
-    @Test
-    public void testNoTrigger() throws Exception {
-        triggerType = TriggerType.NONE;
-        Assert.assertFalse(createTriggerAppAndSendEvent().await(5, TimeUnit.SECONDS));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/triggers/TimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TimeTriggerTest.java b/subprojects/s4-core/src/test/java/test/s4/core/triggers/TimeTriggerTest.java
deleted file mode 100644
index f49c440..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TimeTriggerTest.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package test.s4.core.triggers;
-
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TimeTriggerTest extends TriggerTest {
-
-    @Test
-    public void testTimeBasedTrigger() throws Exception {
-        triggerType = TriggerType.TIME_BASED;
-        Assert.assertTrue(createTriggerAppAndSendEvent().await(5, TimeUnit.SECONDS));
-
-    }
-
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggerTest.java b/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggerTest.java
deleted file mode 100644
index b07a1a7..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggerTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package test.s4.core.triggers;
-
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.junit.After;
-import org.junit.Before;
-
-import test.s4.fixtures.TestUtils;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-/**
- * tests from subclasses are forked in separate VMs, an easy way to avoid
- * conflict with unavailable resources when instantiating new S4 nodes
- */
-
-public abstract class TriggerTest {
-
-    private Factory zookeeperServerConnectionFactory;
-    public static TriggerType triggerType;
-    protected TriggeredApp app;
-
-    public enum TriggerType {
-        TIME_BASED, COUNT_BASED, NONE
-    }
-
-    @Before
-    public void prepare() throws IOException, InterruptedException, KeeperException {
-        TestUtils.cleanupTmpDirs();
-        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
-    }
-
-    @After
-    public void cleanup() throws IOException, InterruptedException {
-        if (app != null) {
-            app.close();
-            app = null;
-        }
-        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
-    }
-
-    protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
-        final ZooKeeper zk = TestUtils.createZkClient();
-        Injector injector = Guice.createInjector(new TriggeredModule());
-        app = injector.getInstance(TriggeredApp.class);
-        app.init();
-        app.start();
-        
-        String time1 = String.valueOf(System.currentTimeMillis());
-
-        CountDownLatch signalEvent1Processed = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
-        
-        CountDownLatch signalEvent1Triggered = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
-
-        TestUtils.injectIntoStringSocketAdapter(time1);
-
-        // check event processed
-        Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
-
-        // return latch on trigger signal
-        return signalEvent1Triggered;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggerablePE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggerablePE.java b/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggerablePE.java
deleted file mode 100644
index f36e176..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggerablePE.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package test.s4.core.triggers;
-
-
-import java.io.IOException;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-
-import test.s4.fixtures.TestUtils;
-import test.s4.wordcount.StringEvent;
-
-public class TriggerablePE extends ProcessingElement implements Watcher {
-    
-    private ZooKeeper zk;
-
-    public TriggerablePE() {}
-    
-    public TriggerablePE(App app) {
-        super(app);
-    }
-    
-    public void onEvent(StringEvent event) {
-        try {
-            zk.create("/onEvent@"+event.getString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-    }
-    
-    @Override
-    protected void onCreate() {
-        if (zk == null) {
-            try {
-                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    public void onTrigger(StringEvent event) {
-        try {
-            zk.create("/onTrigger[StringEvent]@"+event.getString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-    }
-    
-    
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        // TODO Auto-generated method stub
-        
-    }
-    
-    
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredApp.java b/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredApp.java
deleted file mode 100644
index 7a2f3f0..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredApp.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package test.s4.core.triggers;
-
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.Stream;
-
-import test.s4.fixtures.SocketAdapter;
-import test.s4.wordcount.SentenceKeyFinder;
-import test.s4.wordcount.StringEvent;
-
-import com.google.inject.Inject;
-
-public class TriggeredApp extends App {
-
-    SocketAdapter<StringEvent> socketAdapter;
-
-    @Inject
-    public TriggeredApp() {
-        super();
-    }
-
-    @Override
-    protected void start() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void init() {
-
-        TriggerablePE prototype = createPE(TriggerablePE.class);
-        Stream<StringEvent> stream = createStream("stream", new SentenceKeyFinder(), prototype);
-        switch (TriggerTest.triggerType) {
-            case COUNT_BASED:
-                prototype.setTrigger(Event.class, 1, 0, TimeUnit.SECONDS);
-                break;
-            case TIME_BASED:
-                prototype.setTrigger(Event.class, 1, 1, TimeUnit.MILLISECONDS);
-            default:
-                break;
-        }
-
-        try {
-            socketAdapter = new SocketAdapter<StringEvent>(stream, new SocketAdapter.SentenceEventFactory());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    protected void close() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java b/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
deleted file mode 100644
index 51c6c5f..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package test.s4.core.triggers;
-
-import test.s4.fixtures.FileBasedClusterManagementTestModule;
-
-public class TriggeredModule extends FileBasedClusterManagementTestModule<TriggeredApp> {
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/test/s4/fixtures/FileBasedClusterManagementTestModule.java
deleted file mode 100644
index c7e6c40..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/fixtures/FileBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package test.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public abstract class FileBasedClusterManagementTestModule<T> extends AbstractModule {
-
-    protected PropertiesConfiguration config = null;
-    private final Class<?> appClass;
-
-    protected FileBasedClusterManagementTestModule() {
-        // infer actual app class through "super type tokens" (this simple code
-        // assumes actual module class is a direct subclass from this one)
-        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
-        Type[] fieldArgTypes = pt.getActualTypeArguments();
-        this.appClass = (Class<?>) fieldArgTypes[0];
-    }
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-            config.setProperty("cluster.lock_dir",
-                    config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        bind(appClass);
-        bind(Cluster.class);
-        bind(Hasher.class).to(DefaultHasher.class);
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-        bind(Assignment.class).to(AssignmentFromFile.class);
-        bind(Topology.class).to(TopologyFromFile.class);
-        bind(Emitter.class).to(UDPEmitter.class);
-        bind(Listener.class).to(UDPListener.class);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/fixtures/SocketAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/fixtures/SocketAdapter.java b/subprojects/s4-core/src/test/java/test/s4/fixtures/SocketAdapter.java
deleted file mode 100644
index 4890ab9..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/fixtures/SocketAdapter.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package test.s4.fixtures;
-
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-import org.apache.s4.core.Stream;
-
-import test.s4.wordcount.KeyValueEvent;
-import test.s4.wordcount.StringEvent;
-
-public class SocketAdapter<T extends StringEvent> {
-
-    static ServerSocket serverSocket;
-
-    /**
-     * Listens to incoming sentence and forwards them to a sentence Stream.
-     * Each sentence is sent through a new socket connection
-     * 
-     * @param stream
-     * @throws IOException
-     */
-    public SocketAdapter(final Stream<T> stream, final StringEventFactory<T> stringEventFactory) throws IOException {
-        Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                serverSocket = null;
-                Socket connectedSocket;
-                BufferedReader in = null;
-                try {
-                    serverSocket = new ServerSocket(12000);
-                    while (true) {
-                        connectedSocket = serverSocket.accept();
-                        in = new BufferedReader(new InputStreamReader(connectedSocket.getInputStream()));
-
-                        String line = in.readLine();
-                        System.out.println("read: " + line);
-                        stream.put(stringEventFactory.create(line)) ;
-                        connectedSocket.close();
-                    }
-
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    System.exit(-1);
-                } finally {
-                    if (in != null) {
-                        try {
-                            in.close();
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                    if (serverSocket != null) {
-                        try {
-                            serverSocket.close();
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }
-            }
-        });
-        t.start();
-
-    }
-    
-    public void close()  {
-        if(serverSocket !=null) {
-            try {
-                serverSocket.close();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-    
-    interface StringEventFactory<T> {
-        T create(String string);
-    }
-    
-    public static class SentenceEventFactory implements StringEventFactory<StringEvent> {
-
-        @Override
-        public StringEvent create(String string) {
-            return new StringEvent(string);
-        }
-        
-    }
-    
-    public static class KeyValueEventFactory implements StringEventFactory<KeyValueEvent> {
-
-        @Override
-        public KeyValueEvent create(String string) {
-            return new KeyValueEvent(string);
-        }
-        
-    }
-
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/fixtures/TestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/fixtures/TestUtils.java b/subprojects/s4-core/src/test/java/test/s4/fixtures/TestUtils.java
deleted file mode 100644
index e894fb4..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/fixtures/TestUtils.java
+++ /dev/null
@@ -1,451 +0,0 @@
-package test.s4.fixtures;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import junit.framework.Assert;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.Main;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
- * conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
- * latches through zookeeper - etc...
- * 
- */
-public class TestUtils {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestUtils.class);
-
-    public static final int ZK_PORT = 21810;
-    public static final int INITIAL_BOOKIE_PORT = 5000;
-    public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
-    public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
-            + "storage");
-    public static ServerSocket serverSocket;
-    static {
-        logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
-    }
-
-    public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
-        return forkProcess(App.class.getName(), moduleClass.getName(), appClass.getName());
-    }
-
-    public static Process forkS4Node() throws IOException, InterruptedException {
-        return forkProcess(Main.class.getName(), new String[] {});
-    }
-
-    private static Process forkProcess(String mainClass, String... args) throws IOException, InterruptedException {
-        List<String> cmdList = new ArrayList<String>();
-        cmdList.add("java");
-        cmdList.add("-cp");
-        cmdList.add(System.getProperty("java.class.path"));
-        // cmdList.add("-Xdebug");
-        // cmdList.add("-Xnoagent");
-        //
-        // cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
-
-        cmdList.add(mainClass);
-        for (String arg : args) {
-            cmdList.add(arg);
-        }
-
-        System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
-        ProcessBuilder pb = new ProcessBuilder(cmdList);
-
-        pb.directory(new File(System.getProperty("user.dir")));
-        pb.redirectErrorStream();
-        final Process process = pb.start();
-
-        // TODO some synchro with s4 platform ready state
-        Thread.sleep(2000);
-        try {
-            int exitValue = process.exitValue();
-            Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
-        } catch (IllegalThreadStateException ignored) {
-        }
-
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
-                String line;
-                try {
-                    line = br.readLine();
-                    while (line != null) {
-                        System.out.println(line);
-                        line = br.readLine();
-                    }
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }).start();
-
-        return process;
-    }
-
-    public static void killS4App(Process forkedApp) throws IOException, InterruptedException {
-        if (forkedApp != null) {
-            forkedApp.destroy();
-        }
-    }
-
-    public static void writeStringToFile(String s, File f) throws IOException {
-        if (f.exists()) {
-            if (!f.delete()) {
-                throw new RuntimeException("Cannot delete file " + f.getAbsolutePath());
-            }
-        }
-
-        FileWriter fw = null;
-        try {
-            if (!f.createNewFile()) {
-                throw new RuntimeException("Cannot create new file : " + f.getAbsolutePath());
-            }
-            fw = new FileWriter(f);
-
-            fw.write(s);
-        } catch (IOException e) {
-            throw (e);
-        } finally {
-            if (fw != null) {
-                try {
-                    fw.close();
-                } catch (IOException e) {
-                    throw (e);
-                }
-            }
-        }
-    }
-
-    public static String readFile(File f) throws IOException {
-        BufferedReader br = null;
-        try {
-            br = new BufferedReader(new FileReader(f));
-            StringBuilder sb = new StringBuilder();
-            String line = br.readLine();
-            while (line != null) {
-                sb.append(line);
-                line = br.readLine();
-                if (line != null) {
-                    sb.append("\n");
-                }
-            }
-            return sb.toString();
-        } finally {
-            if (br != null) {
-                try {
-                    br.close();
-                } catch (IOException e) {
-                    throw (e);
-                }
-            }
-        }
-
-    }
-
-    public static NIOServerCnxn.Factory startZookeeperServer() throws IOException, InterruptedException,
-            KeeperException {
-
-        List<String> cmdList = new ArrayList<String>();
-        final File zkDataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
-                + "zookeeper" + File.separator + "data");
-        if (zkDataDir.exists()) {
-            TestUtils.deleteDirectoryContents(zkDataDir);
-        } else {
-            zkDataDir.mkdirs();
-        }
-
-        ZooKeeperServer zks = new ZooKeeperServer(zkDataDir, zkDataDir, 3000);
-        NIOServerCnxn.Factory nioZookeeperConnectionFactory = new NIOServerCnxn.Factory(new InetSocketAddress(ZK_PORT));
-        nioZookeeperConnectionFactory.startup(zks);
-        Assert.assertTrue("waiting for server being up", waitForServerUp("localhost", ZK_PORT, 4000));
-        return nioZookeeperConnectionFactory;
-
-    }
-
-    public static void stopZookeeperServer(NIOServerCnxn.Factory f) throws IOException, InterruptedException {
-        if (f != null) {
-            f.shutdown();
-            Assert.assertTrue("waiting for server down", waitForServerDown("localhost", ZK_PORT, 3000));
-        }
-    }
-
-    public static void deleteDirectoryContents(File dir) {
-        File[] files = dir.listFiles();
-        for (File file : files) {
-            if (file.isDirectory()) {
-                deleteDirectoryContents(file);
-            }
-            if (!file.delete()) {
-                throw new RuntimeException("could not delete : " + file);
-            }
-        }
-    }
-
-    public static String readFileAsString(File f) throws IOException {
-        FileReader fr = new FileReader(f);
-        StringBuilder sb = new StringBuilder("");
-        BufferedReader br = new BufferedReader(fr);
-        String line = br.readLine();
-        while (line != null) {
-            sb.append(line);
-            line = br.readLine();
-            if (line != null) {
-                sb.append("\n");
-            }
-        }
-        return sb.toString();
-
-    }
-
-    // TODO factor this code (see BasicFSStateStorage) - or use commons io or
-    // guava
-    public static byte[] readFileAsByteArray(File file) throws Exception {
-        FileInputStream in = null;
-        try {
-            in = new FileInputStream(file);
-
-            long length = file.length();
-
-            /*
-             * Arrays can only be created using int types, so ensure that the file size is not too big before we
-             * downcast to create the array.
-             */
-            if (length > Integer.MAX_VALUE) {
-                throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
-            }
-
-            byte[] buffer = new byte[(int) length];
-            int offSet = 0;
-            int numRead = 0;
-
-            while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
-                offSet += numRead;
-            }
-
-            if (offSet < buffer.length) {
-                throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
-                        + buffer.length + " bytes read");
-            }
-
-            in.close();
-            return buffer;
-
-        } finally {
-            if (in != null) {
-                in.close();
-            }
-        }
-    }
-
-    public static ZooKeeper createZkClient() throws IOException {
-        final ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 4000, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-            }
-        });
-        return zk;
-    }
-
-    public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk)
-            throws KeeperException, InterruptedException {
-
-        // by default delete existing nodes with same path
-        watchAndSignalCreation(path, latch, zk, false);
-    }
-
-    public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk,
-            boolean deleteIfExists) throws KeeperException, InterruptedException {
-
-        if (zk.exists(path, false) != null) {
-            if (deleteIfExists) {
-                zk.delete(path, -1);
-            } else {
-                latch.countDown();
-            }
-        }
-        zk.exists(path, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                if (EventType.NodeCreated.equals(event.getType())) {
-                    latch.countDown();
-                }
-            }
-        });
-    }
-
-    public static void watchAndSignalChangedChildren(String path, final CountDownLatch latch, final ZooKeeper zk)
-            throws KeeperException, InterruptedException {
-
-        zk.getChildren(path, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                if (EventType.NodeChildrenChanged.equals(event.getType())) {
-                    latch.countDown();
-                }
-            }
-        });
-    }
-
-    // from zookeeper's codebase
-    public static boolean waitForServerUp(String host, int port, long timeout) {
-        long start = System.currentTimeMillis();
-        while (true) {
-            try {
-                // if there are multiple hostports, just take the first one
-                String result = send4LetterWord(host, port, "stat");
-                if (result.startsWith("Zookeeper version:")) {
-                    return true;
-                }
-            } catch (IOException ignored) {
-                // ignore as this is expected
-            }
-
-            if (System.currentTimeMillis() > start + timeout) {
-                break;
-            }
-            try {
-                Thread.sleep(250);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-        return false;
-    }
-
-    // from zookeeper's codebase
-    public static String send4LetterWord(String host, int port, String cmd) throws IOException {
-        Socket sock = new Socket(host, port);
-        BufferedReader reader = null;
-        try {
-            OutputStream outstream = sock.getOutputStream();
-            outstream.write(cmd.getBytes());
-            outstream.flush();
-            // this replicates NC - close the output stream before reading
-            sock.shutdownOutput();
-
-            reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
-            StringBuilder sb = new StringBuilder();
-            String line;
-            while ((line = reader.readLine()) != null) {
-                sb.append(line + "\n");
-            }
-            return sb.toString();
-        } finally {
-            sock.close();
-            if (reader != null) {
-                reader.close();
-            }
-        }
-    }
-
-    // from zookeeper's codebase
-    public static boolean waitForServerDown(String host, int port, long timeout) {
-        long start = System.currentTimeMillis();
-        while (true) {
-            try {
-                send4LetterWord(host, port, "stat");
-            } catch (IOException e) {
-                return true;
-            }
-
-            if (System.currentTimeMillis() > start + timeout) {
-                break;
-            }
-            try {
-                Thread.sleep(250);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-        return false;
-    }
-
-    public static void cleanupTmpDirs() {
-        if (TestUtils.DEFAULT_TEST_OUTPUT_DIR.exists()) {
-            deleteDirectoryContents(TestUtils.DEFAULT_TEST_OUTPUT_DIR);
-        }
-        TestUtils.DEFAULT_STORAGE_DIR.mkdirs();
-
-    }
-
-    public static void stopSocketAdapter() throws IOException {
-        if (serverSocket != null) {
-            serverSocket.close();
-        }
-    }
-
-    /**
-     * gradle and eclipse have different directories for output files This is justified here
-     * http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
-     * 
-     * A consequence is that for tests to reference compiled files, we need to resolve the corresponding directory at
-     * runtime.
-     * 
-     * This is what this method does
-     * 
-     * @return directory containing the compiled test classes for this project and execution environment.
-     */
-    public static File findDirForCompiledTestClasses() {
-        String userDir = System.getProperty("user.dir");
-        String classpath = System.getProperty("java.class.path");
-        System.out.println(userDir);
-        System.out.println(classpath);
-        if (classpath.contains(userDir + "/bin")) {
-            // eclipse classpath
-            return new File(userDir + "/bin");
-        } else if (classpath.contains(userDir + "/build/classes/test")) {
-            // gradle classpath
-            return new File(userDir + "/build/classes/test");
-        } else {
-            // TODO other IDEs
-            throw new RuntimeException("Cannot find path for compiled test classes");
-        }
-
-    }
-
-    public static void injectIntoStringSocketAdapter(String string) throws IOException {
-        Socket socket = null;
-        PrintWriter writer = null;
-        try {
-            socket = new Socket("localhost", 12000);
-            writer = new PrintWriter(socket.getOutputStream(), true);
-            writer.println(string);
-        } catch (IOException e) {
-            e.printStackTrace();
-            System.exit(-1);
-        } finally {
-            if (socket != null) {
-                socket.close();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java
deleted file mode 100644
index 27218a4..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package test.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.netty.NettyEmitter;
-import org.apache.s4.comm.netty.NettyListener;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-// also uses netty
-public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
-
-    protected PropertiesConfiguration config = null;
-    private final Class<?> appClass;
-
-    protected ZkBasedClusterManagementTestModule() {
-        // infer actual app class through "super type tokens" (this simple code
-        // assumes actual module class is a direct subclass from this one)
-        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
-        Type[] fieldArgTypes = pt.getActualTypeArguments();
-        this.appClass = (Class<?>) fieldArgTypes[0];
-    }
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-            config.setProperty("cluster.zk_address",
-                    config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + TestUtils.ZK_PORT));
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        bind(appClass);
-        bind(Cluster.class);
-        bind(Hasher.class).to(DefaultHasher.class);
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-        bind(Assignment.class).to(AssignmentFromZK.class);
-        bind(Topology.class).to(TopologyFromZK.class);
-        bind(Emitter.class).to(NettyEmitter.class);
-        bind(Listener.class).to(NettyListener.class);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/KeyValueEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/KeyValueEvent.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/KeyValueEvent.java
deleted file mode 100644
index abf228f..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/KeyValueEvent.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package test.s4.wordcount;
-
-import test.s4.wordcount.StringEvent;
-
-public class KeyValueEvent extends StringEvent {
-
-    String key;
-    String value;
-
-    public KeyValueEvent(String keyValue) {
-        key = keyValue.split(";")[0];
-        value = keyValue.split(";")[1];
-    }
-
-    public String getKey() {
-        return key;
-    }
-
-    public String getValue() {
-        return value;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/KeyValueKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/KeyValueKeyFinder.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/KeyValueKeyFinder.java
deleted file mode 100644
index ef11eb3..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/KeyValueKeyFinder.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package test.s4.wordcount;
-
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.s4.core.KeyFinder;
-
-public class KeyValueKeyFinder implements KeyFinder<KeyValueEvent> {
-
-    public static final String UNIQUE_KEY = "KEY";
-
-    @Override
-    public List<String> get(final KeyValueEvent event) {
-        return Arrays.asList(new String[] {UNIQUE_KEY});
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/SentenceKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/SentenceKeyFinder.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/SentenceKeyFinder.java
deleted file mode 100644
index 4c15afd..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/SentenceKeyFinder.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package test.s4.wordcount;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.core.KeyFinder;
-
-public class SentenceKeyFinder implements KeyFinder<StringEvent> {
-
-    private static final String SENTENCE_KEY = "sentence";
-
-    @SuppressWarnings("serial")
-    @Override
-    public List<String> get(StringEvent event) {
-        return new ArrayList<String>(){{add(SENTENCE_KEY);}};
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/StringEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/StringEvent.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/StringEvent.java
deleted file mode 100644
index e21d76d..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/StringEvent.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package test.s4.wordcount;
-
-import org.apache.s4.base.Event;
-
-public class StringEvent extends Event {
-    
-    String string;
-    
-    public StringEvent() {}
-    
-    public StringEvent(String string) {
-        super();
-        this.string = string;
-    }
-    
-    public void setString(String string) {
-        this.string = string;
-    }
-    
-    public String getString() {
-        return string;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/Word.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/Word.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/Word.java
deleted file mode 100644
index d9752f1..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/Word.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package test.s4.wordcount;
-
-
-public class Word {
-
-    private String word;
-
-    public Word() {
-    }
-
-    public Word(String word) {
-        this.word = word;
-    }
-
-    public void setWord(String word) {
-        this.word = word;
-    }
-
-    public String getWord() {
-        return word;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/WordClassifierPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/WordClassifierPE.java
deleted file mode 100644
index 4b591f9..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordClassifierPE.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package test.s4.wordcount;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.log4j.Logger;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-
-import test.s4.fixtures.TestUtils;
-
-public class WordClassifierPE extends ProcessingElement implements Watcher {
-
-    TreeMap<String, Integer> counts = new TreeMap<String, Integer>();
-    private int counter;
-    transient private ZooKeeper zk;
-
-    private WordClassifierPE () {}
-
-    public WordClassifierPE(App app) {
-        super(app);
-    }
-    
-    public void onEvent(WordCountEvent event) {
-        try {
-            WordCountEvent wcEvent = event;
-            if (zk == null) {
-                try {
-                    zk = new ZooKeeper("localhost:21810", 4000, this);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            System.out.println("seen: " + wcEvent.getWord() + "/" + wcEvent.getCount());
-
-            if (!counts.containsKey(wcEvent.getWord())
-                    || (counts.containsKey(wcEvent.getWord()) && counts.get(wcEvent.getWord()).compareTo(
-                            wcEvent.getCount()) < 0)) {
-                // this is because wcEvent events arrive unordered
-                counts.put(wcEvent.getWord(), wcEvent.getCount());
-            }
-            ++counter;
-            if (counter == WordCountTest.TOTAL_WORDS) {
-                File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
-                if (results.exists()) {
-                    if (!results.delete()) {
-                        throw new RuntimeException("cannot delete results file");
-                    }
-                }
-                Set<Entry<String, Integer>> entrySet = counts.entrySet();
-                StringBuilder sb = new StringBuilder();
-                for (Entry<String, Integer> entry : entrySet) {
-                    sb.append(entry.getKey() + "=" + entry.getValue() + ";");
-                }
-                TestUtils.writeStringToFile(sb.toString(), results);
-
-                zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            } else {
-                // NOTE: this will fail if we did not recover the latest
-                // counter,
-                // because there is already a counter with this number in
-                // zookeeper
-                zk.create("/classifierIteration_" + counter, new byte[counter], Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-                Logger.getLogger("s4-ft").debug("wrote classifier iteration ["+counter+"]");
-                System.out.println("wrote classifier iteration ["+counter+"]");
-                // check if we are allowed to continue
-                if (null == zk.exists("/continue_" + counter, null)) {
-                    CountDownLatch latch = new CountDownLatch(1);
-                    TestUtils.watchAndSignalCreation("/continue_" + counter, latch, zk);
-                    latch.await();
-                } else {
-                    zk.delete("/continue_" + counter, -1);
-                    System.out.println("");
-                }
-            }
-
-        } catch (Exception e) {
-            // TODO should propagate some exceptions
-            e.printStackTrace();
-        }
-
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        // TODO Auto-generated method stub
- 
-    }
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-        
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountApp.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountApp.java
deleted file mode 100644
index c4cf573..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountApp.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package test.s4.wordcount;
-
-
-import java.io.IOException;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.Stream;
-
-import test.s4.fixtures.SocketAdapter;
-
-import com.google.inject.Inject;
-
-public class WordCountApp extends App {
-
-    protected boolean checkpointing = false;
-    SocketAdapter<StringEvent> socketAdapter;
-
-    @Inject
-    public WordCountApp() {
-        super();
-    }
-
-    @Override
-    protected void start() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void init() {
-
-        WordClassifierPE wordClassifierPrototype = createPE(WordClassifierPE.class);
-        Stream<WordCountEvent> wordCountStream = createStream("words counts stream", new WordCountKeyFinder(),
-                wordClassifierPrototype);
-        WordCounterPE wordCounterPrototype = createPE(WordCounterPE.class);
-//        wordCounterPrototype.setTrigger(WordSeenEvent.class, 1, 0, null);
-        wordCounterPrototype.setWordClassifierStream(wordCountStream);
-        Stream<WordSeenEvent> wordSeenStream = createStream("words seen stream", new WordSeenKeyFinder(),
-                wordCounterPrototype);
-        WordSplitterPE wordSplitterPrototype = createPE(WordSplitterPE.class);
-        wordSplitterPrototype.setWordSeenStream(wordSeenStream);
-        Stream<StringEvent> sentenceStream = createStream("sentences stream", new SentenceKeyFinder(),
-                wordSplitterPrototype);
-
-        try {
-            socketAdapter = new SocketAdapter<StringEvent>(sentenceStream, new SocketAdapter.SentenceEventFactory());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-
-        if (checkpointing) {
-
-            // TODO move to subclass
-
-            // checkpoint word classifier because it maintains a sync counter
-            // for the test
-            // LoggerFactory.getLogger(getClass()).info("setting checkpointing for word classifier and word counter");
-            // wordClassifierPrototype.setCheckpointingFrequency(1);
-            // // checkpoint word counter because it maintains word counts
-            // wordCounterPrototype.setCheckpointingFrequency(1);
-        }
-
-    }
-
-    @Override
-    protected void close() {
-        socketAdapter.close();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountEvent.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountEvent.java
deleted file mode 100644
index 2f66546..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountEvent.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package test.s4.wordcount;
-
-import org.apache.s4.base.Event;
-
-public class WordCountEvent extends Event {
-
-        private String word;
-        private int count;
-        
-        protected WordCountEvent() {}
-        
-        public WordCountEvent(String word, int count) {
-            super();
-            this.word = word;
-            this.count = count;
-        }
-
-        public String getWord() {
-            return word;
-        }
-
-        public int getCount() {
-            return count;
-        }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountKeyFinder.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountKeyFinder.java
deleted file mode 100644
index ae2138b..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountKeyFinder.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package test.s4.wordcount;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.core.KeyFinder;
-
-public class WordCountKeyFinder implements KeyFinder<WordCountEvent> {
-
-    @SuppressWarnings("serial")
-    @Override
-    public List<String> get(WordCountEvent event) {
-        return new ArrayList<String>(){{add("classifier");}};
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/90f68b42/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountModule.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountModule.java
deleted file mode 100644
index 9e04f4b..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountModule.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package test.s4.wordcount;
-
-import test.s4.fixtures.FileBasedClusterManagementTestModule;
-
-public class WordCountModule extends FileBasedClusterManagementTestModule<WordCountApp> {
-
-}


Mime
View raw message