incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [27/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_fs_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_fs_backend.xml
b/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_fs_backend.xml
new file mode 100755
index 0000000..1092868
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_fs_backend.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+	<bean id="propertyConfigurer"
+		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+		<property name="location">
+			<value>classpath:s4_core.properties</value>
+		</property>
+		<property name="properties">
+			<props>
+				<prop key="kryoSerDeser.initialBufferSize">2048</prop>
+				<prop key="kryoSerDeser.maxBufferSize">262144</prop>
+			</props>
+		</property>
+		<property name="ignoreUnresolvablePlaceholders" value="true" />
+	</bean>
+
+	<bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+	<bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="listenerAppName" value="${adapter_app_name}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+		<property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+		<property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+	</bean>
+
+	<!--START: Dispatchers for control event processor. If stream name in Response 
+		is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).

+		Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+	<bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+		<property name="dispatchers">
+			<list>
+				<ref bean="ctrlDispatcherFilteredS4" />
+				<ref bean="ctrlDispatcherFilteredAdapter" />
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherAdapter" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherS4" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+		<property name="hasher" ref="hasher" />
+		<property name="debug" value="false" />
+	</bean>
+
+	<bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+	<!-- END: Dispatchers for control events -->
+
+	<!-- Control Events handler -->
+	<bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+		<property name="dispatcher" ref="ctrlDispatcher" />
+	</bean>
+
+	<bean id="peContainer" class="io.s4.processor.PEContainer"
+		init-method="init" lazy-init="true">
+		<property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+		<property name="trackByKey" value="true" />
+		<property name="clock" ref="clock" />
+		<property name="controlEventProcessor" ref="ctrlHandler" />
+		<property name="safeKeeper" ref="safeKeeper" />
+	</bean>
+
+	<bean id="rawListener" class="io.s4.listener.CommLayerListener"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="clusterManagerAddress" value="${zk_address}" />
+		<property name="appName" value="${s4_app_name}" />
+		<property name="maxQueueSize" value="${listener_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="eventListener" class="io.s4.collector.EventListener"
+		init-method="init">
+		<property name="rawListener" ref="rawListener" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+		init-method="init">
+		<property name="flushInterval" value="30" />
+		<property name="loggerName" value="monitor" />
+	</bean>
+
+	<bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+		lazy-init="true">
+		<property name="monitor" ref="monitor" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="minimumMemory" value="52428800" />
+	</bean>
+
+
+
+
+	<!-- Some useful beans related to client-adapter for apps -->
+
+	<!-- Dispatcher to send to all adapter nodes. -->
+	<bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="broadcastPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<!-- Partitioner to achieve broadcast -->
+	<bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner"
/>
+
+
+
+	<bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+        init-method="init">
+        <property name="partitioners">
+            <list>
+                <ref bean="loopbackPartitioner" />
+            </list>
+        </property>
+        <property name="eventEmitter" ref="commLayerEmitter" />
+        <property name="loggerName" value="s4" />
+    </bean>
+
+    <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+        <property name="eventEmitter" ref="commLayerEmitter"/>
+    </bean>
+
+    <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+        <property name="stateStorage" ref="fsStateStorage" />
+        <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+        <property name="serializer" ref="serDeser"/>
+        <property name="hasher" ref="hasher"/>
+        <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+    </bean>
+    
+    <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
+    <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="init">
+        <!-- if not specified, default is <current_dir>/tmp/storage 
+        <property name="storageRootPath" value="${storage_root_path}" /> -->
+    </bean>
+    
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_redis_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_redis_backend.xml
b/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_redis_backend.xml
new file mode 100755
index 0000000..3b6e251
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_redis_backend.xml
@@ -0,0 +1,198 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+	<bean id="propertyConfigurer"
+		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+		<property name="location">
+			<value>classpath:s4_core.properties</value>
+		</property>
+		<property name="properties">
+			<props>
+				<prop key="kryoSerDeser.initialBufferSize">2048</prop>
+				<prop key="kryoSerDeser.maxBufferSize">262144</prop>
+			</props>
+		</property>
+		<property name="ignoreUnresolvablePlaceholders" value="true" />
+	</bean>
+
+	<bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+	<bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="listenerAppName" value="${adapter_app_name}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+		<property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+		<property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+	</bean>
+
+	<!--START: Dispatchers for control event processor. If stream name in Response 
+		is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).

+		Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+	<bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+		<property name="dispatchers">
+			<list>
+				<ref bean="ctrlDispatcherFilteredS4" />
+				<ref bean="ctrlDispatcherFilteredAdapter" />
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherAdapter" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherS4" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+		<property name="hasher" ref="hasher" />
+		<property name="debug" value="false" />
+	</bean>
+
+	<bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+	<!-- END: Dispatchers for control events -->
+
+	<!-- Control Events handler -->
+	<bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+		<property name="dispatcher" ref="ctrlDispatcher" />
+	</bean>
+
+	<bean id="peContainer" class="io.s4.processor.PEContainer"
+		init-method="init" lazy-init="true">
+		<property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+		<property name="trackByKey" value="true" />
+		<property name="clock" ref="clock" />
+		<property name="controlEventProcessor" ref="ctrlHandler" />
+		<property name="safeKeeper" ref="safeKeeper" />
+	</bean>
+
+	<bean id="rawListener" class="io.s4.listener.CommLayerListener"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="clusterManagerAddress" value="${zk_address}" />
+		<property name="appName" value="${s4_app_name}" />
+		<property name="maxQueueSize" value="${listener_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="eventListener" class="io.s4.collector.EventListener"
+		init-method="init">
+		<property name="rawListener" ref="rawListener" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+		init-method="init">
+		<property name="flushInterval" value="30" />
+		<property name="loggerName" value="monitor" />
+	</bean>
+
+	<bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+		lazy-init="true">
+		<property name="monitor" ref="monitor" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="minimumMemory" value="52428800" />
+	</bean>
+
+
+
+
+	<!-- Some useful beans related to client-adapter for apps -->
+
+	<!-- Dispatcher to send to all adapter nodes. -->
+	<bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="broadcastPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<!-- Partitioner to achieve broadcast -->
+	<bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner"
/>
+
+
+
+	<bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+        init-method="init">
+        <property name="partitioners">
+            <list>
+                <ref bean="loopbackPartitioner" />
+            </list>
+        </property>
+        <property name="eventEmitter" ref="commLayerEmitter" />
+        <property name="loggerName" value="s4" />
+    </bean>
+
+    <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+        <property name="eventEmitter" ref="commLayerEmitter"/>
+    </bean>
+
+    <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+        <property name="stateStorage" ref="redisStateStorage" />
+        <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+        <property name="serializer" ref="serDeser"/>
+        <property name="hasher" ref="hasher"/>
+        <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+    </bean>
+    
+    <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
+    
+    <bean id="redisStateStorage" class="io.s4.ft.RedisStateStorage" init-method="init">
+        <property name="redisHost" value="localhost"/>
+        <property name="redisPort" value="6379"/>
+    </bean>
+
+
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/wall_clock.xml b/s4-core/src/test/java/org/apache/s4/ft/wordcount/wall_clock.xml
new file mode 100644
index 0000000..e149ecc
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ 
+  <bean id="clock" class="io.s4.util.clock.WallClock"/>
+ 
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/processor/MockPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/processor/MockPE.java b/s4-core/src/test/java/org/apache/s4/processor/MockPE.java
new file mode 100644
index 0000000..10a38f9
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/processor/MockPE.java
@@ -0,0 +1,29 @@
+package org.apache.s4.processor;
+
+/**
+ * Mock PE for asserting correct behavior of AbstractPE
+ */
+public class MockPE extends AbstractPE {
+
+    private int initializeCount = 0;
+    
+    public void initInstance() {
+        initializeCount++;
+    }
+    
+    public void processEvent(Object obj) {
+    }
+
+    @Override
+    public void output() {
+
+    }
+
+    /**
+     * @return the initializeCount
+     */
+    public int getInitializeCount() {
+        return initializeCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java b/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
new file mode 100644
index 0000000..445972a
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
@@ -0,0 +1,31 @@
+package org.apache.s4.processor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import org.apache.s4.util.clock.WallClock;
+
+import org.junit.Test;
+
+public class TestPrototypeWrapper
+{
+   
+   /**
+    * Verifies ability to set an initialize method that will be called when 
+    * a new PE is instantiated
+    */
+   @Test
+   public void testCloneAndInitialize() {
+       MockPE prototype = new MockPE();
+       
+        PrototypeWrapper prototypeWrapper = new PrototypeWrapper(prototype,
+                new WallClock());
+
+       assertEquals(0, prototype.getInitializeCount());
+       MockPE instance = (MockPE)prototypeWrapper.getPE("asd");
+       assertNotNull(instance);
+       
+       assertEquals(0, prototype.getInitializeCount());
+       assertEquals(1, instance.getInitializeCount());
+   }
+   
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/Word.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/Word.java b/s4-core/src/test/java/org/apache/s4/wordcount/Word.java
new file mode 100644
index 0000000..c1fb3f1
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/Word.java
@@ -0,0 +1,22 @@
+package org.apache.s4.wordcount;
+
+public class Word {
+
+    private String word;
+
+    public Word() {
+    }
+
+    public Word(String word) {
+        this.word = word;
+    }
+
+    public void setWord(String word) {
+        this.word = word;
+    }
+
+    public String getWord() {
+        return word;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
new file mode 100644
index 0000000..5b0558d
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
@@ -0,0 +1,109 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.TestUtils;
+import org.apache.s4.processor.AbstractPE;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.Set;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class WordClassifier extends AbstractPE implements Watcher {
+
+    TreeMap<String, Integer> counts = new TreeMap();
+    private int counter;
+    transient private ZooKeeper zk;
+    private String id;
+    public final static String ROUTING_KEY = "classifier";
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    public void processEvent(WordCount wordCount) throws IOException,
+            Exception, InterruptedException {
+        if (zk == null) {
+            try {
+                zk = new ZooKeeper("localhost:21810", 4000, this);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        System.out.println("seen: " + wordCount.getWord() + "/"
+                + wordCount.getCount());
+
+        if (!counts.containsKey(wordCount.getWord())
+                || (counts.containsKey(wordCount.getWord()) && counts.get(
+                        wordCount.getWord()).compareTo(wordCount.getCount()) < 0)) {
+            // this is because wordCount events arrive unordered
+            counts.put(wordCount.getWord(), wordCount.getCount());
+        }
+        ++counter;
+        if (counter == WordCountTest.TOTAL_WORDS) {
+            File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
+                    + File.separator + "wordcount");
+            if (results.exists()) {
+                if (!results.delete()) {
+                    throw new RuntimeException("cannot delete results file");
+                }
+            }
+            Set<Entry<String, Integer>> entrySet = counts.entrySet();
+            StringBuilder sb = new StringBuilder();
+            for (Entry<String, Integer> entry : entrySet) {
+                sb.append(entry.getKey() + "=" + entry.getValue() + ";");
+            }
+            TestUtils.writeStringToFile(sb.toString(), results);
+
+            zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } else {
+            // NOTE: this will fail if we did not recover the latest counter,
+            // because there is already a counter with this number in zookeeper
+            zk.create("/classifierIteration_" + counter, new byte[counter],
+                    Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            // check if we are allowed to continue
+            if (null == zk.exists("/continue_" + counter, null)) {
+                CountDownLatch latch = new CountDownLatch(1);
+                TestUtils.watchAndSignalCreation("/continue_" + counter, latch,
+                        zk);
+                latch.await();
+            } else {
+                zk.delete("/continue_" + counter, -1);
+                System.out.println("");
+            }
+
+        }
+    }
+
+    @Override
+    public void output() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordCount.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordCount.java
new file mode 100644
index 0000000..2ea5b09
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordCount.java
@@ -0,0 +1,43 @@
+package org.apache.s4.wordcount;
+
+public class WordCount {
+
+    private String word;
+    private int count;
+    private String routingKey;
+
+    public WordCount() {
+    }
+
+    public WordCount(String word, int count, String routingKey) {
+        super();
+        this.word = word;
+        this.count = count;
+        this.routingKey = routingKey;
+    }
+
+    public String getWord() {
+        return word;
+    }
+
+    public void setWord(String word) {
+        this.word = word;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    public void setCount(int count) {
+        this.count = count;
+    }
+
+    public String getRoutingKey() {
+        return routingKey;
+    }
+
+    public void setRoutingKey(String routingKey) {
+        this.routingKey = routingKey;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
new file mode 100644
index 0000000..5a511d7
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -0,0 +1,83 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.ft.EventGenerator;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4App;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.TestUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class WordCountTest extends S4TestCase {
+    public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
+    public static final int SENTENCE_1_TOTAL_WORDS = SENTENCE_1.split(" ").length;
+    public static final String SENTENCE_2 = "doobie doobie da";
+    public static final int SENTENCE_2_TOTAL_WORDS = SENTENCE_2.split(" ").length;
+    public static final String SENTENCE_3 = "doobie";
+    public static final int SENTENCE_3_TOTAL_WORDS = SENTENCE_3.split(" ").length;
+    public static final String FLAG = ";";
+    public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS
+            + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
+    private static Factory zookeeperServerConnectionFactory;
+
+    
+    @Before
+    public void prepare() throws IOException, InterruptedException, KeeperException {
+        TestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+
+    }
+
+    @Test
+    public void testSimple() throws Exception {
+        
+        S4App app = new S4App(getClass(), "s4_core_conf.xml");
+        app.initializeS4App();
+        final ZooKeeper zk = TestUtils.createZkClient();
+
+        CountDownLatch signalTextProcessed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
+                zk);
+        EventGenerator gen = new EventGenerator();
+        
+        // add authorizations for processing
+        for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS
+                + 1; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL);
+        }
+        gen.injectValueEvent(new KeyValue("sentence", SENTENCE_1),
+                    "Sentences", 0);
+        gen.injectValueEvent(new KeyValue("sentence", SENTENCE_2), "Sentences",
+                0);
+        gen.injectValueEvent(new KeyValue("sentence", SENTENCE_3), "Sentences",
+                0);
+        signalTextProcessed.await();
+        File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
+                + File.separator + "wordcount");
+        String s = TestUtils.readFile(results);
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+        
+    }
+
+    @After
+    public void cleanup() throws IOException, InterruptedException {
+        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordCounter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordCounter.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordCounter.java
new file mode 100644
index 0000000..de7c291
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordCounter.java
@@ -0,0 +1,60 @@
+package org.apache.s4.wordcount;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.processor.AbstractPE;
+
+public class WordCounter extends AbstractPE {
+
+    private transient EventDispatcher dispatcher;
+    private String outputStreamName;
+    int wordCounter;
+    private String id;
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    public EventDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getOutputStreamName() {
+        return outputStreamName;
+    }
+
+    public void setOutputStreamName(String outputStreamName) {
+        this.outputStreamName = outputStreamName;
+    }
+
+    public void processEvent(Word word) {
+        wordCounter++;
+        // System.out.println("seen word " + word.getWord());
+    }
+
+    @Override
+    public void output() {
+        System.out.println("dispatch: " + getKeyValueString() + " / "
+                + wordCounter);
+        List<List<String>> compoundKeyNames = new ArrayList<List<String>>();
+        List<String> keyNames = new ArrayList<String>();
+        keyNames.add("routingKey");
+        compoundKeyNames.add(keyNames);
+        dispatcher.dispatchEvent(outputStreamName, compoundKeyNames,
+                new WordCount(getKeyValueString(), wordCounter,
+                        WordClassifier.ROUTING_KEY));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitter.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitter.java
new file mode 100644
index 0000000..7f9715d
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitter.java
@@ -0,0 +1,61 @@
+package org.apache.s4.wordcount;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.processor.AbstractPE;
+
+public class WordSplitter extends AbstractPE {
+
+    private transient EventDispatcher dispatcher;
+    private String outputStreamName;
+    private String id;
+
+    public EventDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getOutputStreamName() {
+        return outputStreamName;
+    }
+
+    public void setOutputStreamName(String outputStreamName) {
+        this.outputStreamName = outputStreamName;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    public void processEvent(KeyValue keyValue) {
+        List<List<String>> compoundKeyNames = new ArrayList<List<String>>();
+        List<String> keyNames = new ArrayList<String>(1);
+        keyNames.add("word");
+        compoundKeyNames.add(keyNames);
+        if ("sentence".equals(keyValue.getKey())) {
+            String[] split = keyValue.getValue().split(" ");
+            for (int i = 0; i < split.length; i++) {
+                dispatcher.dispatchEvent(outputStreamName, compoundKeyNames,
+                        new Word(split[i]));
+            }
+        }
+    }
+
+    @Override
+    public void output() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/app_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/app_conf.xml b/s4-core/src/test/java/org/apache/s4/wordcount/app_conf.xml
new file mode 100644
index 0000000..f03ea26
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/app_conf.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+	<!-- <bean id="printEventPE" class="io.s4.processor.PrintEventPE"> <property

+		name="id" value="printEventPE"/> <property name="keys"> <list> <value>TopicSeen

+		topic</value> </list> </property> </bean> -->
+
+	<bean id="wordSplitter" class="io.s4.wordcount.WordSplitter">
+		<property name="id" value="wordSplitter" />
+		<property name="keys">
+			<list>
+				<value>Sentences *</value>
+			</list>
+		</property>
+		<property name="outputStreamName" value="Words" />
+		<property name="dispatcher" ref="wordDispatcher" />
+	</bean>
+
+	<bean id="wordCounter" class="io.s4.wordcount.WordCounter">
+		<property name="id" value="wordCounter" />
+		<property name="keys">
+			<list>
+				<value>Words word</value>
+			</list>
+		</property>
+		<property name="outputFrequencyByEventCount" value="1" />
+		<property name="outputStreamName" value="WordsCount" />
+		<property name="dispatcher" ref="wordsCountDispatcher" />
+	</bean>
+
+	<bean id="wordClassifier" class="io.s4.wordcount.WordClassifier">
+		<property name="id" value="wordClassifier" />
+		<property name="keys">
+			<list>
+			    <!-- use a predefined agreed-upon key -->
+				<value>WordsCount routingKey</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="wordDispatcher" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="wordPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<bean id="wordPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+		<property name="streamNames">
+			<list>
+				<value>Words</value>
+			</list>
+		</property>
+		<property name="hashKey">
+			<list>
+				<value>word</value>
+			</list>
+		</property>
+		<property name="hasher" ref="hasher" />
+		<property name="debug" value="false" />
+	</bean>
+	
+	<bean id="wordsCountDispatcher" class="io.s4.dispatcher.Dispatcher"
+        init-method="init">
+        <property name="partitioners">
+            <list>
+                <ref bean="wordsCountPartitioner" />
+            </list>
+        </property>
+        <property name="eventEmitter" ref="commLayerEmitter" />
+        <property name="loggerName" value="s4" />
+    </bean>
+    
+	<bean id="wordsCountPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+        <property name="streamNames">
+            <list>
+                <value>WordsCount</value>
+            </list>
+        </property>
+        <property name="hasher" ref="hasher" />
+        <property name="debug" value="false" />
+    </bean>
+	
+	
+	
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/s4_core_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/s4_core_conf.xml b/s4-core/src/test/java/org/apache/s4/wordcount/s4_core_conf.xml
new file mode 100755
index 0000000..20c8bb1
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/s4_core_conf.xml
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+	<bean id="propertyConfigurer"
+		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+		<property name="location">
+			<value>classpath:s4_core.properties</value>
+		</property>
+		<property name="properties">
+			<props>
+				<prop key="kryoSerDeser.initialBufferSize">2048</prop>
+				<prop key="kryoSerDeser.maxBufferSize">262144</prop>
+			</props>
+		</property>
+		<property name="ignoreUnresolvablePlaceholders" value="true" />
+	</bean>
+
+	<bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+	<bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="listenerAppName" value="${adapter_app_name}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+		<property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+		<property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+	</bean>
+
+	<!--START: Dispatchers for control event processor. If stream name in Response 
+		is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).

+		Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+	<bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+		<property name="dispatchers">
+			<list>
+				<ref bean="ctrlDispatcherFilteredS4" />
+				<ref bean="ctrlDispatcherFilteredAdapter" />
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherAdapter" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherS4" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+		<property name="hasher" ref="hasher" />
+		<property name="debug" value="false" />
+	</bean>
+
+	<bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+	<!-- END: Dispatchers for control events -->
+
+	<!-- Control Events handler -->
+	<bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+		<property name="dispatcher" ref="ctrlDispatcher" />
+	</bean>
+
+	<bean id="peContainer" class="io.s4.processor.PEContainer"
+		init-method="init" lazy-init="true">
+		<property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+		<property name="trackByKey" value="true" />
+		<property name="clock" ref="clock" />
+		<property name="controlEventProcessor" ref="ctrlHandler" />
+		<property name="safeKeeper" ref="safeKeeper" />
+	</bean>
+
+	<bean id="rawListener" class="io.s4.listener.CommLayerListener"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="clusterManagerAddress" value="${zk_address}" />
+		<property name="appName" value="${s4_app_name}" />
+		<property name="maxQueueSize" value="${listener_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="eventListener" class="io.s4.collector.EventListener"
+		init-method="init">
+		<property name="rawListener" ref="rawListener" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+		init-method="init">
+		<property name="flushInterval" value="30" />
+		<property name="loggerName" value="monitor" />
+	</bean>
+
+	<bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+		lazy-init="true">
+		<property name="monitor" ref="monitor" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="minimumMemory" value="52428800" />
+	</bean>
+
+
+
+
+	<!-- Some useful beans related to client-adapter for apps -->
+
+	<!-- Dispatcher to send to all adapter nodes. -->
+	<bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="broadcastPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<!-- Partitioner to achieve broadcast -->
+	<bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner"
/>
+
+
+
+	<bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+        init-method="init">
+        <property name="partitioners">
+            <list>
+                <ref bean="loopbackPartitioner" />
+            </list>
+        </property>
+        <property name="eventEmitter" ref="commLayerEmitter" />
+        <property name="loggerName" value="s4" />
+    </bean>
+
+    <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+        <property name="eventEmitter" ref="commLayerEmitter"/>
+    </bean>
+
+    <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+        <property name="stateStorage" ref="fsStateStorage" />
+        <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+        <property name="serializer" ref="serDeser"/>
+        <property name="hasher" ref="hasher"/>
+    </bean>
+
+    <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="checkStorageDir">
+        <!-- if not specified, default is <current_dir>/tmp/storage 
+        <property name="storageRootPath" value="${storage_root_path}" /> -->
+    </bean>
+
+
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/wall_clock.xml b/s4-core/src/test/java/org/apache/s4/wordcount/wall_clock.xml
new file mode 100644
index 0000000..e149ecc
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ 
+  <bean id="clock" class="io.s4.util.clock.WallClock"/>
+ 
+</beans>


Mime
View raw message