incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [18/50] [abbrv] git commit: Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:15 GMT
Rename packages in preparation for move to Apache


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/169653cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/169653cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/169653cd

Branch: refs/heads/dev
Commit: 169653cdacb4fccbb762354bece6728a9da16489
Parents: 01d2dd8
Author: Bruce Robbins <robbins@everychoose-lm.corp.yahoo.com>
Authored: Sun Nov 20 22:40:38 2011 -0800
Committer: Bruce Robbins <robbins@everychoose-lm.corp.yahoo.com>
Committed: Sun Nov 20 22:40:38 2011 -0800

----------------------------------------------------------------------
 .../resources/s4-example-speech01-scala-conf.xml   |    2 +-
 .../speech01-scala/src/main/scala/Events.scala     |    2 +-
 .../src/main/scala/SentenceReceiverPE.scala        |    4 +-
 .../java/io/s4/example/speech01/Highlight.java     |   32 --
 .../main/java/io/s4/example/speech01/Sentence.java |   81 ----
 .../io/s4/example/speech01/SentenceReceiverPE.java |   15 -
 .../main/java/io/s4/example/speech01/Speech.java   |   56 ---
 .../org/apache/s4/example/speech01/Highlight.java  |   32 ++
 .../org/apache/s4/example/speech01/Sentence.java   |   81 ++++
 .../s4/example/speech01/SentenceReceiverPE.java    |   15 +
 .../org/apache/s4/example/speech01/Speech.java     |   56 +++
 .../main/resources/s4-example-speech01-conf.xml    |    2 +-
 .../main/resources/s4-example-speech02-conf.xml    |   20 +-
 .../testinput/src/main/resources/speeches.txt      |    2 +-
 .../twittertopiccount/DirectToFilePersister.java   |  118 -----
 .../io/s4/example/twittertopiccount/Status.java    |  154 -------
 .../s4/example/twittertopiccount/TopNTopicPE.java  |  178 --------
 .../twittertopiccount/TopicCountAndReportPE.java   |   76 ---
 .../twittertopiccount/TopicExtractorPE.java        |  114 -----
 .../io/s4/example/twittertopiccount/TopicSeen.java |   67 ---
 .../twittertopiccount/TwitterFeedListener.java     |  355 ---------------
 .../twittertopiccount/TwitterFeedReader.java       |   66 ---
 .../java/io/s4/example/twittertopiccount/User.java |  280 ------------
 .../twittertopiccount/DirectToFilePersister.java   |  118 +++++
 .../s4/example/twittertopiccount/Status.java       |  154 +++++++
 .../s4/example/twittertopiccount/TopNTopicPE.java  |  178 ++++++++
 .../twittertopiccount/TopicCountAndReportPE.java   |   76 +++
 .../twittertopiccount/TopicExtractorPE.java        |  114 +++++
 .../s4/example/twittertopiccount/TopicSeen.java    |   67 +++
 .../twittertopiccount/TwitterFeedListener.java     |  355 +++++++++++++++
 .../twittertopiccount/TwitterFeedReader.java       |   66 +++
 .../apache/s4/example/twittertopiccount/User.java  |  280 ++++++++++++
 .../src/main/resources/s4-core-conf.xml            |   46 +-
 .../s4-example-twittertopiccount-ft-conf.xml       |   20 +-
 .../twittertopiccount/DirectToFilePersister.java   |  118 -----
 .../twittertopiccount/DirectToFilePersister.java   |  118 +++++
 .../src/main/resources/adapter_conf.xml            |    4 +-
 ...s4-example-twittertopiccount-scala-conf.xml.xml |   20 +-
 .../src/main/scala/events/Events.scala             |    2 +-
 .../scala/listener/TwitterStreamListener.scala     |   10 +-
 .../src/main/scala/processor/TopNTopicPE.scala     |    8 +-
 .../scala/processor/TopicCountAndReportPE.scala    |    8 +-
 .../main/scala/processor/TopicExtractorPE.scala    |    8 +-
 .../src/main/scala/util/TwitterStreamClient.scala  |    2 +-
 44 files changed, 1790 insertions(+), 1790 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01-scala/src/main/resources/s4-example-speech01-scala-conf.xml
----------------------------------------------------------------------
diff --git a/s4-examples/speech01-scala/src/main/resources/s4-example-speech01-scala-conf.xml b/s4-examples/speech01-scala/src/main/resources/s4-example-speech01-scala-conf.xml
index 48c4243..fecb749 100644
--- a/s4-examples/speech01-scala/src/main/resources/s4-example-speech01-scala-conf.xml
+++ b/s4-examples/speech01-scala/src/main/resources/s4-example-speech01-scala-conf.xml
@@ -2,7 +2,7 @@
 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
 
-  <bean id="eventCatcher" class="io.s4.example.speech01.SentenceReceiverPE">
+  <bean id="eventCatcher" class="org.apache.s4.example.speech01.SentenceReceiverPE">
     <property name="keys">
       <list>
         <value>RawSentence *</value>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01-scala/src/main/scala/Events.scala
----------------------------------------------------------------------
diff --git a/s4-examples/speech01-scala/src/main/scala/Events.scala b/s4-examples/speech01-scala/src/main/scala/Events.scala
index 7a06954..84715de 100644
--- a/s4-examples/speech01-scala/src/main/scala/Events.scala
+++ b/s4-examples/speech01-scala/src/main/scala/Events.scala
@@ -14,7 +14,7 @@
  * License. See accompanying LICENSE file. 
  */
 
-package io.s4.example.speech01
+package org.apache.s4.example.speech01
 
 import scala.reflect.BeanProperty
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala
----------------------------------------------------------------------
diff --git a/s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala b/s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala
index d966fc0..4f099d1 100644
--- a/s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala
+++ b/s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala
@@ -14,9 +14,9 @@
  * License. See accompanying LICENSE file. 
  */
 
-package io.s4.example.speech01
+package org.apache.s4.example.speech01
 
-import io.s4.processor.AbstractPE
+import org.apache.s4.processor.AbstractPE
 
 class SentenceReceiverPE extends AbstractPE {
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01/src/main/java/io/s4/example/speech01/Highlight.java
----------------------------------------------------------------------
diff --git a/s4-examples/speech01/src/main/java/io/s4/example/speech01/Highlight.java b/s4-examples/speech01/src/main/java/io/s4/example/speech01/Highlight.java
deleted file mode 100644
index 8a7ce4e..0000000
--- a/s4-examples/speech01/src/main/java/io/s4/example/speech01/Highlight.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package io.s4.example.speech01;
-
-public class Highlight {
-    private long sentenceId;
-    private long time;
-    
-    public long getSentenceId() {
-        return sentenceId;
-    }
-
-    public void setSentenceId(long sentenceId) {
-        this.sentenceId = sentenceId;
-    }
-
-    public long getTime() {
-        return time;
-    }
-
-    public void setTime(long time) {
-        this.time = time;
-    }
-
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("{sentenceId:")
-            .append(sentenceId)
-            .append(",time:")
-            .append(time)
-            .append("}");
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01/src/main/java/io/s4/example/speech01/Sentence.java
----------------------------------------------------------------------
diff --git a/s4-examples/speech01/src/main/java/io/s4/example/speech01/Sentence.java b/s4-examples/speech01/src/main/java/io/s4/example/speech01/Sentence.java
deleted file mode 100644
index 376a3c4..0000000
--- a/s4-examples/speech01/src/main/java/io/s4/example/speech01/Sentence.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.speech01;
-
-public class Sentence {
-    private long id;
-    private long speechId;
-    private String text;
-    private long time;
-    private String location;
-    
-    public long getId() {
-        return id;
-    }
-
-    public void setId(long id) {
-        this.id = id;
-    }
-
-    public long getSpeechId() {
-        return speechId;
-    }
-
-    public void setSpeechId(long speechId) {
-        this.speechId = speechId;
-    }
-
-    public String getText() {
-        return text;
-    }
-
-    public void setText(String text) {
-        this.text = text;
-    }
-
-    public long getTime() {
-        return time;
-    }
-
-    public void setTime(long time) {
-        this.time = time;
-    }
-
-    public String getLocation() {
-        return location;
-    }
-
-    public void setLocation(String location) {
-        this.location = location;
-    }
-
-    public String toString() {
-        StringBuffer sb = new StringBuffer();
-        sb.append("{id:")
-          .append(id)
-          .append(",speechId:")
-          .append(speechId)         
-          .append(",text:")
-          .append(text)       
-          .append(",time:")
-          .append(time) 
-          .append(",location:")
-          .append(location)               
-          .append("}");
-
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java
----------------------------------------------------------------------
diff --git a/s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java b/s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java
deleted file mode 100644
index b673b49..0000000
--- a/s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package io.s4.example.speech01;
-
-import io.s4.processor.AbstractPE;
-
-public class SentenceReceiverPE extends AbstractPE {
-
-    public void processEvent(Sentence sentence) {
-        System.out.printf("Sentence is '%s', location %s\n", sentence.getText(), sentence.getLocation());
-    }
-    
-    @Override
-    public void output() {
-        // not called in this example
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01/src/main/java/io/s4/example/speech01/Speech.java
----------------------------------------------------------------------
diff --git a/s4-examples/speech01/src/main/java/io/s4/example/speech01/Speech.java b/s4-examples/speech01/src/main/java/io/s4/example/speech01/Speech.java
deleted file mode 100644
index 065ba9a..0000000
--- a/s4-examples/speech01/src/main/java/io/s4/example/speech01/Speech.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package io.s4.example.speech01;
-
-public class Speech {
-    private long id;
-    private String location;
-    private String speaker;
-    private long time;
-    
-    public long getId() {
-        return id;
-    }
-
-    public void setId(long id) {
-        this.id = id;
-    }
-
-    public String getLocation() {
-        return location;
-    }
-
-    public void setLocation(String location) {
-        this.location = location;
-    }
-
-    public String getSpeaker() {
-        return speaker;
-    }
-
-    public void setSpeaker(String speaker) {
-        this.speaker = speaker;
-    }
-
-    public long getTime() {
-        return time;
-    }
-
-    public void setTime(long time) {
-        this.time = time;
-    }
-
-    public String toString() {
-        StringBuffer sb = new StringBuffer();
-        sb.append("{id:")
-            .append(id)
-            .append(",location:")
-            .append(location)
-            .append(",speaker")
-            .append(speaker)
-            .append(",time")
-            .append(time)
-            .append("}");
-        
-        return sb.toString();
-    }
-        
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Highlight.java
----------------------------------------------------------------------
diff --git a/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Highlight.java b/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Highlight.java
new file mode 100644
index 0000000..e7a390a
--- /dev/null
+++ b/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Highlight.java
@@ -0,0 +1,32 @@
+package org.apache.s4.example.speech01;
+
+public class Highlight {
+    private long sentenceId;
+    private long time;
+    
+    public long getSentenceId() {
+        return sentenceId;
+    }
+
+    public void setSentenceId(long sentenceId) {
+        this.sentenceId = sentenceId;
+    }
+
+    public long getTime() {
+        return time;
+    }
+
+    public void setTime(long time) {
+        this.time = time;
+    }
+
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{sentenceId:")
+            .append(sentenceId)
+            .append(",time:")
+            .append(time)
+            .append("}");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Sentence.java
----------------------------------------------------------------------
diff --git a/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Sentence.java b/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Sentence.java
new file mode 100644
index 0000000..41675d6
--- /dev/null
+++ b/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Sentence.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.speech01;
+
+public class Sentence {
+    private long id;
+    private long speechId;
+    private String text;
+    private long time;
+    private String location;
+    
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getSpeechId() {
+        return speechId;
+    }
+
+    public void setSpeechId(long speechId) {
+        this.speechId = speechId;
+    }
+
+    public String getText() {
+        return text;
+    }
+
+    public void setText(String text) {
+        this.text = text;
+    }
+
+    public long getTime() {
+        return time;
+    }
+
+    public void setTime(long time) {
+        this.time = time;
+    }
+
+    public String getLocation() {
+        return location;
+    }
+
+    public void setLocation(String location) {
+        this.location = location;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("{id:")
+          .append(id)
+          .append(",speechId:")
+          .append(speechId)         
+          .append(",text:")
+          .append(text)       
+          .append(",time:")
+          .append(time) 
+          .append(",location:")
+          .append(location)               
+          .append("}");
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/SentenceReceiverPE.java
----------------------------------------------------------------------
diff --git a/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/SentenceReceiverPE.java b/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/SentenceReceiverPE.java
new file mode 100644
index 0000000..c84f357
--- /dev/null
+++ b/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/SentenceReceiverPE.java
@@ -0,0 +1,15 @@
+package org.apache.s4.example.speech01;
+
+import org.apache.s4.processor.AbstractPE;
+
+public class SentenceReceiverPE extends AbstractPE {
+
+    public void processEvent(Sentence sentence) {
+        System.out.printf("Sentence is '%s', location %s\n", sentence.getText(), sentence.getLocation());
+    }
+    
+    @Override
+    public void output() {
+        // not called in this example
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Speech.java
----------------------------------------------------------------------
diff --git a/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Speech.java b/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Speech.java
new file mode 100644
index 0000000..cae6f53
--- /dev/null
+++ b/s4-examples/speech01/src/main/java/org/apache/s4/example/speech01/Speech.java
@@ -0,0 +1,56 @@
+package org.apache.s4.example.speech01;
+
+public class Speech {
+    private long id;
+    private String location;
+    private String speaker;
+    private long time;
+    
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public String getLocation() {
+        return location;
+    }
+
+    public void setLocation(String location) {
+        this.location = location;
+    }
+
+    public String getSpeaker() {
+        return speaker;
+    }
+
+    public void setSpeaker(String speaker) {
+        this.speaker = speaker;
+    }
+
+    public long getTime() {
+        return time;
+    }
+
+    public void setTime(long time) {
+        this.time = time;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("{id:")
+            .append(id)
+            .append(",location:")
+            .append(location)
+            .append(",speaker")
+            .append(speaker)
+            .append(",time")
+            .append(time)
+            .append("}");
+        
+        return sb.toString();
+    }
+        
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech01/src/main/resources/s4-example-speech01-conf.xml
----------------------------------------------------------------------
diff --git a/s4-examples/speech01/src/main/resources/s4-example-speech01-conf.xml b/s4-examples/speech01/src/main/resources/s4-example-speech01-conf.xml
index 48c4243..fecb749 100644
--- a/s4-examples/speech01/src/main/resources/s4-example-speech01-conf.xml
+++ b/s4-examples/speech01/src/main/resources/s4-example-speech01-conf.xml
@@ -2,7 +2,7 @@
 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
 
-  <bean id="eventCatcher" class="io.s4.example.speech01.SentenceReceiverPE">
+  <bean id="eventCatcher" class="org.apache.s4.example.speech01.SentenceReceiverPE">
     <property name="keys">
       <list>
         <value>RawSentence *</value>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/speech02/src/main/resources/s4-example-speech02-conf.xml
----------------------------------------------------------------------
diff --git a/s4-examples/speech02/src/main/resources/s4-example-speech02-conf.xml b/s4-examples/speech02/src/main/resources/s4-example-speech02-conf.xml
index 1417740..825b792 100644
--- a/s4-examples/speech02/src/main/resources/s4-example-speech02-conf.xml
+++ b/s4-examples/speech02/src/main/resources/s4-example-speech02-conf.xml
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
 
-  <bean id="eventCatcher" class="io.s4.example.speech01.SentenceReceiverPE">
+  <bean id="eventCatcher" class="org.apache.s4.example.speech01.SentenceReceiverPE">
     <property name="keys">
       <list>
         <value>SentenceJoined *</value>
@@ -9,7 +9,7 @@
     </property>
   </bean>
 
-  <bean id="rerouteSentencePE" class="io.s4.processor.ReroutePE">
+  <bean id="rerouteSentencePE" class="org.apache.s4.processor.ReroutePE">
     <property name="id" value="rerouteSentencePE"/>
     <property name="dispatcher" ref="forkdispatcher"/>
     <property name="keys">
@@ -20,7 +20,7 @@
     <property name="outputStreamName" value="Sentence"/>
   </bean>
 
-  <bean id="rerouteSpeechPE" class="io.s4.processor.ReroutePE">
+  <bean id="rerouteSpeechPE" class="org.apache.s4.processor.ReroutePE">
     <property name="id" value="rerouteSpeechPE"/>
     <property name="dispatcher" ref="forkdispatcher"/>
     <property name="keys">
@@ -31,7 +31,7 @@
     <property name="outputStreamName" value="Speech"/>
   </bean>
 
-  <bean id="sentenceJoinPE" class="io.s4.processor.JoinPE">
+  <bean id="sentenceJoinPE" class="org.apache.s4.processor.JoinPE">
     <property name="id" value="sentenceJoinPE"/>
     <property name="keys">
       <list>
@@ -46,12 +46,12 @@
       </list>
     </property>
     <property name="outputStreamName" value="SentenceJoined"/>
-    <property name="outputClassName" value="io.s4.example.speech01.Sentence"/>
+    <property name="outputClassName" value="org.apache.s4.example.speech01.Sentence"/>
     <property name="dispatcher" ref="forkdispatcher"/>
     <property name="ttl" value="600"/> <!-- join related events that arrive no more than 10 minutes apart -->
   </bean>
 
-  <bean id="sentenceSpeechIdPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+  <bean id="sentenceSpeechIdPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
     <property name="streamNames">
       <list>
         <value>Sentence</value>
@@ -67,7 +67,7 @@
     <property name="debug" value="false"/>
   </bean>
 
-  <bean id="speechIdPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+  <bean id="speechIdPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
     <property name="streamNames">
       <list>
         <value>Speech</value>
@@ -82,7 +82,7 @@
     <property name="debug" value="false"/>
   </bean>
 
-  <bean id="dispatcher" class="io.s4.dispatcher.Dispatcher" init-method="init">
+  <bean id="dispatcher" class="org.apache.s4.dispatcher.Dispatcher" init-method="init">
     <property name="partitioners">
       <list>
         <ref bean="sentenceSpeechIdPartitioner"/>
@@ -94,7 +94,7 @@
   </bean>
 
   <!-- dispatcher to S4 and client adapter -->
-  <bean id="forkdispatcher" class="io.s4.dispatcher.MultiDispatcher">
+  <bean id="forkdispatcher" class="org.apache.s4.dispatcher.MultiDispatcher">
     <property name="dispatchers">
       <list>
 
@@ -103,7 +103,7 @@
 
 
         <!-- send some streams to client adapters -->
-        <bean id="selectiveDispatchToAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+        <bean id="selectiveDispatchToAdapter" class="org.apache.s4.dispatcher.StreamSelectingDispatcher">
           <property name="dispatcher" ref="dispatcherToClientAdapters"/>
           <property name="streams">
             <list>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/testinput/src/main/resources/speeches.txt
----------------------------------------------------------------------
diff --git a/s4-examples/testinput/src/main/resources/speeches.txt b/s4-examples/testinput/src/main/resources/speeches.txt
index d1bc1bc..cbac7e8 100644
--- a/s4-examples/testinput/src/main/resources/speeches.txt
+++ b/s4-examples/testinput/src/main/resources/speeches.txt
@@ -1,4 +1,4 @@
-{"io.s4.example.speech01.Speech":{"classIndex":0,"streamName":"RawSpeech"},"io.s4.example.speech01.Sentence":{"classIndex":1,"streamName":"RawSentence"},"io.s4.example.speech01.Highlight":{"classIndex":2,"streamName":"RawHighlight"}}
+{"org.apache.s4.example.speech01.Speech":{"classIndex":0,"streamName":"RawSpeech"},"org.apache.s4.example.speech01.Sentence":{"classIndex":1,"streamName":"RawSentence"},"org.apache.s4.example.speech01.Highlight":{"classIndex":2,"streamName":"RawHighlight"}}
 {"_index":0,"id":12000000,"location":"gettysburg, pa, us","speaker":"abraham lincoln","time":1242799200000}
 {"_index":1,"id":12000001,"speechId":12000000,"text":"Four score and seven years ago our fathers brought forth on this continent a new nation, conceived in liberty and dedicated to the proposition that all men are created equal.","time":1242799205000}
 {"_index":2,"sentenceId":12000001,"time":1242799215000}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/DirectToFilePersister.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/DirectToFilePersister.java b/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/DirectToFilePersister.java
deleted file mode 100644
index 02dc809..0000000
--- a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/DirectToFilePersister.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.twittertopiccount;
-
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import io.s4.persist.Persister;
-
-public class DirectToFilePersister implements Persister {
-    private String outputFilename;
-    private int persistCount;
-
-    public void setOutputFilename(String outputFilename) {
-        this.outputFilename = outputFilename;
-    }
-
-    @Override
-    public int cleanOutGarbage() throws InterruptedException {
-        return 0;
-    }
-
-    @Override
-    public Object get(String arg0) throws InterruptedException {
-        return null;
-    }
-
-    @Override
-    public Map<String, Object> getBulk(String[] arg0)
-            throws InterruptedException {
-        return new HashMap<String, Object>();
-    }
-
-    @Override
-    public Map<String, Object> getBulkObjects(String[] arg0)
-            throws InterruptedException {
-        return new HashMap<String, Object>();
-    }
-
-    @Override
-    public int getCacheEntryCount() {
-        return 1;
-    }
-
-    @Override
-    public Object getObject(String arg0) throws InterruptedException {
-        return null;
-    }
-
-    @Override
-    public int getPersistCount() {
-        return persistCount;
-    }
-
-    @Override
-    public int getQueueSize() {
-        return 0;
-    }
-
-    @Override
-    public Set<String> keySet() {
-        return new HashSet<String>();
-    }
-
-    @Override
-    public void remove(String arg0) throws InterruptedException {
-
-    }
-
-    @Override
-    public void set(String key, Object value, int persistTime)
-            throws InterruptedException {
-
-        FileWriter fw = null;
-        try {
-            fw = new FileWriter(outputFilename);
-            fw.write(String.valueOf(value));
-        } catch (IOException e) {
-            // TODO Auto-generated catch block
-            Logger.getLogger("s4").error(e);
-        } finally {
-            if (fw != null) {
-                try {
-                    fw.close();
-                } catch (Exception e) {
-                }
-            }
-        }
-    }
-
-    @Override
-    public void setAsynch(String key, Object value, int persistTime) {
-        try {
-            set(key, value, persistTime);
-        } catch (InterruptedException ie) {
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/Status.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/Status.java b/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/Status.java
deleted file mode 100644
index c9a1077..0000000
--- a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/Status.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.twittertopiccount;
-
-public class Status {
-    private long id;
-    private long inReplyToStatusId;
-    private String text;
-    private boolean truncated;
-    private String source;
-    private String inReplyToScreenName;
-    private boolean favorited;
-    private User user;
-    private long inReplyToUserId;
-    private String createdAt;
-
-    public long getId() {
-        return id;
-    }
-
-    public void setId(long id) {
-        this.id = id;
-    }
-
-    public long getInReplyToStatusId() {
-        return inReplyToStatusId;
-    }
-
-    public void setInReplyToStatusId(long inReplyToStatusId) {
-        this.inReplyToStatusId = inReplyToStatusId;
-    }
-
-    public String getText() {
-        return text;
-    }
-
-    public void setText(String text) {
-        this.text = text;
-    }
-
-    public boolean isTruncated() {
-        return truncated;
-    }
-
-    public void setTruncated(boolean truncated) {
-        this.truncated = truncated;
-    }
-
-    public String getSource() {
-        return source;
-    }
-
-    public void setSource(String source) {
-        this.source = source;
-    }
-
-    public String getInReplyToScreenName() {
-        return inReplyToScreenName;
-    }
-
-    public void setInReplyToScreenName(String inReplyToScreenName) {
-        this.inReplyToScreenName = inReplyToScreenName;
-    }
-
-    public boolean isFavorited() {
-        return favorited;
-    }
-
-    public void setFavorited(boolean favorited) {
-        this.favorited = favorited;
-    }
-
-    public User getUser() {
-        return user;
-    }
-
-    public void setUser(User user) {
-        this.user = user;
-    }
-
-    public long getInReplyToUserId() {
-        return inReplyToUserId;
-    }
-
-    public void setInReplyToUserId(long inReplyToUserId) {
-        this.inReplyToUserId = inReplyToUserId;
-    }
-
-    public String getCreatedAt() {
-        return createdAt;
-    }
-
-    public void setCreatedAt(String createdAt) {
-        this.createdAt = createdAt;
-    }
-
-    public String toString() {
-        StringBuffer sb = new StringBuffer();
-        sb.append("{")
-          .append("id:")
-          .append(id)
-          .append(",")
-          .append("inReplyToStatusId:")
-          .append(inReplyToStatusId)
-          .append(",")
-          .append("text:")
-          .append(text)
-          .append(",")
-          .append("truncated:")
-          .append(truncated)
-          .append(",")
-          .append("source:")
-          .append(source)
-          .append(",")
-          .append("inReplyToScreenName:")
-          .append(inReplyToScreenName)
-          .append(",")
-          .append("favorited:")
-          .append(favorited)
-          .append(",")
-          .append("user:")
-          .append(user)
-          .append(",")
-          .append("inReplyToUserId:")
-          .append(inReplyToUserId)
-          .append(",")
-          .append("createdAt:")
-          .append(createdAt)
-          .append("}");
-
-        return sb.toString();
-    }
-
-    public Object clone() {
-        try {
-            return super.clone();
-        } catch (CloneNotSupportedException cnse) {
-            throw new RuntimeException(cnse);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopNTopicPE.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopNTopicPE.java b/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopNTopicPE.java
deleted file mode 100644
index 25b9a13..0000000
--- a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopNTopicPE.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.twittertopiccount;
-
-import io.s4.persist.Persister;
-import io.s4.processor.AbstractPE;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.json.JSONArray;
-import org.json.JSONObject;
-
-import com.google.gson.Gson;
-
-public class TopNTopicPE extends AbstractPE {
-    private String id;
-    private transient Persister persister;
-    private int entryCount = 10;
-    private Map<String, Integer> topicMap = new ConcurrentHashMap<String, Integer>();
-    private int persistTime;
-    private String persistKey = "myapp:topNTopics";
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public Persister getPersister() {
-        return persister;
-    }
-
-    public void setPersister(Persister persister) {
-        this.persister = persister;
-    }
-
-    public int getEntryCount() {
-        return entryCount;
-    }
-
-    public void setEntryCount(int entryCount) {
-        this.entryCount = entryCount;
-    }
-
-    public int getPersistTime() {
-        return persistTime;
-    }
-
-    public void setPersistTime(int persistTime) {
-        this.persistTime = persistTime;
-    }
-
-    public String getPersistKey() {
-        return persistKey;
-    }
-
-    public void setPersistKey(String persistKey) {
-        this.persistKey = persistKey;
-    }
-
-    public void processEvent(TopicSeen topicSeen) {
-        topicMap.put(topicSeen.getTopic(), topicSeen.getCount());
-    }
-
-    public ArrayList<TopNEntry> getTopTopics() {
-        if (entryCount < 1) 
-            return null;
-
-        ArrayList<TopNEntry> sortedList = new ArrayList<TopNEntry>();
-
-        for (String key : topicMap.keySet()) {
-            sortedList.add(new TopNEntry(key, topicMap.get(key)));
-        }
-
-        Collections.sort(sortedList);
-
-        // truncate: Yuck!!
-        // unfortunately, Kryo cannot deserialize RandomAccessSubList
-        // if we use ArrayList.subList(...)
-        while (sortedList.size() > entryCount)
-            sortedList.remove(sortedList.size() - 1);
-
-        return sortedList;
-    }
-
-    @Override
-    public void output() {
-        List<TopNEntry> sortedList = new ArrayList<TopNEntry>();
-
-        for (String key : topicMap.keySet()) {
-            sortedList.add(new TopNEntry(key, topicMap.get(key)));
-        }
-
-        Collections.sort(sortedList);
-
-        try {
-            JSONObject message = new JSONObject();
-            JSONArray jsonTopN = new JSONArray();
-
-            for (int i = 0; i < entryCount; i++) {
-                if (i == sortedList.size()) {
-                    break;
-                }
-                TopNEntry tne = sortedList.get(i);
-                JSONObject jsonEntry = new JSONObject();
-                jsonEntry.put("topic", tne.getTopic());
-                jsonEntry.put("count", tne.getCount());
-                jsonTopN.put(jsonEntry);
-            }
-            message.put("topN", jsonTopN);
-            persister.set(persistKey, message.toString()+"\n", persistTime);
-        } catch (Exception e) {
-            Logger.getLogger("s4").error(e);
-        }
-    }
-
-    @Override
-    public String getId() {
-        return this.id;
-    }
-
-    public static class TopNEntry implements Comparable<TopNEntry> {
-        public TopNEntry(String topic, int count) {
-            this.topic = topic;
-            this.count = count;
-        }
-
-        public TopNEntry() {}
-
-        String topic = null;
-        int count = 0;
-
-        public String getTopic() {
-            return topic;
-        }
-
-        public void setTopic(String topic) {
-            this.topic = topic;
-        }
-
-        public int getCount() {
-            return count;
-        }
-
-        public void setCount(int count) {
-            this.count = count;
-        }
-
-        public int compareTo(TopNEntry topNEntry) {
-            if (topNEntry.getCount() < this.count) {
-                return -1;
-            } else if (topNEntry.getCount() > this.count) {
-                return 1;
-            }
-            return 0;
-        }
-
-        public String toString() {
-            return "topic:" + topic + " count:" + count;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicCountAndReportPE.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicCountAndReportPE.java b/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicCountAndReportPE.java
deleted file mode 100644
index fa552eb..0000000
--- a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicCountAndReportPE.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.twittertopiccount;
-
-import io.s4.dispatcher.EventDispatcher;
-import io.s4.processor.AbstractPE;
-
-public class TopicCountAndReportPE extends AbstractPE {
-    private String id;
-    private transient EventDispatcher dispatcher;
-    private String outputStreamName;
-    private int threshold;
-    private int count;
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public EventDispatcher getDispatcher() {
-        return dispatcher;
-    }
-
-    public void setDispatcher(EventDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    public String getOutputStreamName() {
-        return outputStreamName;
-    }
-
-    public void setOutputStreamName(String outputStreamName) {
-        this.outputStreamName = outputStreamName;
-    }
-
-    public int getThreshold() {
-        return threshold;
-    }
-
-    public void setThreshold(int threshold) {
-        this.threshold = threshold;
-    }
-
-    public void processEvent(TopicSeen topicSeen) {
-        count += topicSeen.getCount();
-    }
-
-    @Override
-    public void output() {
-        if (count < threshold) {
-            return;
-        }
-        TopicSeen topicSeen = new TopicSeen((String) this.getKeyValue().get(0),
-                                            count);
-        topicSeen.setReportKey("1");
-        dispatcher.dispatchEvent(outputStreamName, topicSeen);
-    }
-
-    @Override
-    public String getId() {
-        return this.id;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicExtractorPE.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicExtractorPE.java b/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicExtractorPE.java
deleted file mode 100644
index dfd7242..0000000
--- a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicExtractorPE.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.twittertopiccount;
-
-import io.s4.dispatcher.Dispatcher;
-import io.s4.dispatcher.EventDispatcher;
-import io.s4.processor.AbstractPE;
-
-public class TopicExtractorPE extends AbstractPE {
-    private String id;
-    private transient EventDispatcher dispatcher;
-    private String outputStreamName;
-
-    public EventDispatcher getDispatcher() {
-        return dispatcher;
-    }
-
-    public void setDispatcher(EventDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    public String getOutputStreamName() {
-        return outputStreamName;
-    }
-
-    public void setOutputStreamName(String outputStreamName) {
-        this.outputStreamName = outputStreamName;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public void processEvent(Status status) {
-        String text = status.getText();
-        if (text == null) {
-            return;
-        }
-
-        int textLength = text.length();
-        int index = 0;
-        int prevIndex = 0;
-        while ((index = text.indexOf('#', prevIndex)) != -1) {
-            prevIndex = index + 1;
-            if (prevIndex == textLength) { // if hash is the last character
-                break; // get out
-            }
-            StringBuffer sb = new StringBuffer();
-            for (int i = index + 1; i < textLength; i++) {
-                char ch = text.charAt(i);
-                if (!Character.isLetterOrDigit(ch)) {
-                    break;
-                }
-                sb.append(ch);
-            }
-
-            if (sb.length() == 0) {
-                continue;
-            }
-
-            TopicSeen topicSeen = new TopicSeen(sb.toString().toLowerCase(), 1);
-            dispatcher.dispatchEvent(outputStreamName, topicSeen);
-        }
-    }
-
-    @Override
-    public void output() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public String getId() {
-        return this.id;
-    }
-
-    static class DummyDispatcher extends Dispatcher {
-        public void dispatchEvent(String streamName, Object event) {
-            System.out.println(event);
-        }
-    }
-
-    public static void main(String args[]) {
-        TopicExtractorPE te = new TopicExtractorPE();
-        te.setDispatcher(new DummyDispatcher());
-        te.setOutputStreamName("test");
-
-        Status status = new Status();
-        status.setText("Hey this is a test");
-        te.processEvent(status);
-
-        status.setText("This is an edge test #");
-        te.processEvent(status);
-
-        status.setText("#GLOB this is a test");
-        te.processEvent(status);
-
-        status.setText("Hey there #FLOB, this is a test #GLOB");
-        te.processEvent(status);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicSeen.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicSeen.java b/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicSeen.java
deleted file mode 100644
index ef5fc34..0000000
--- a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TopicSeen.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.twittertopiccount;
-
-public class TopicSeen {
-    private String topic;
-    private int count;
-    private String reportKey;
-
-    public TopicSeen() {
-
-    }
-
-    public TopicSeen(String topic, int count) {
-        this.topic = topic;
-        this.count = count;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public int getCount() {
-        return count;
-    }
-
-    public void setCount(int count) {
-        this.count = count;
-    }
-
-    public String getReportKey() {
-        return reportKey;
-    }
-
-    public void setReportKey(String reportKey) {
-        this.reportKey = reportKey;
-    }
-
-    public String toString() {
-        return "{topic:" + topic + "}";
-    }
-
-    public Object clone() {
-        try {
-            return super.clone();
-        } catch (CloneNotSupportedException cnse) {
-            throw new RuntimeException(cnse);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TwitterFeedListener.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TwitterFeedListener.java b/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TwitterFeedListener.java
deleted file mode 100644
index 7f283d8..0000000
--- a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TwitterFeedListener.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.twittertopiccount;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.net.URLConnection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-import org.json.JSONObject;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.httpclient.util.EncodingUtil;
-
-import io.s4.collector.EventWrapper;
-import io.s4.listener.EventHandler;
-import io.s4.listener.EventProducer;
-
-public class TwitterFeedListener implements EventProducer, Runnable {
-    private String userid;
-    private String password;
-    private String urlString;
-    private long maxBackoffTime = 30 * 1000; // 5 seconds
-    private long messageCount = 0;
-    private long blankCount = 0;
-    private String streamName;
-
-    protected LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>();
-    private Set<io.s4.listener.EventHandler> handlers = new HashSet<io.s4.listener.EventHandler>();
-
-    public void setUserid(String userid) {
-        this.userid = userid;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public void setUrlString(String urlString) {
-        this.urlString = urlString;
-    }
-
-    public void setMaxBackoffTime(long maxBackoffTime) {
-        this.maxBackoffTime = maxBackoffTime;
-    }
-
-    public void setStreamName(String streamName) {
-        this.streamName = streamName;
-    }
-
-    public void init() {
-        for (int i = 0; i < 12; i++) {
-            Dequeuer dequeuer = new Dequeuer(i);
-            Thread t = new Thread(dequeuer);
-            t.start();
-        }
-        (new Thread(this)).start();
-    }
-
-    public void run() {
-        long backoffTime = 1000;
-        while (!Thread.interrupted()) {
-            try {
-                connectAndRead();
-            } catch (Exception e) {
-                Logger.getLogger("s4").error("Exception reading feed", e);
-                try {
-                    Thread.sleep(backoffTime);
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                }
-                backoffTime = backoffTime * 2;
-                if (backoffTime > maxBackoffTime) {
-                    backoffTime = maxBackoffTime;
-                }
-            }
-        }
-    }
-
-    public void connectAndRead() throws Exception {
-        URL url = new URL(urlString);
-
-        URLConnection connection = url.openConnection();
-        String userPassword = userid + ":" + password;
-        String encoded = EncodingUtil.getAsciiString(Base64.encodeBase64(EncodingUtil.getAsciiBytes(userPassword)));
-        connection.setRequestProperty("Authorization", "Basic " + encoded);
-        connection.connect();
-
-        InputStream is = connection.getInputStream();
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        String inputLine = null;
-        while ((inputLine = br.readLine()) != null) {
-            if (inputLine.trim().length() == 0) {
-                blankCount++;
-                continue;
-            }
-            messageCount++;
-            messageQueue.add(inputLine);
-        }
-    }
-
-    class Dequeuer implements Runnable {
-        private int id;
-
-        public Dequeuer(int id) {
-            this.id = id;
-        }
-
-        public void run() {
-            while (!Thread.interrupted()) {
-                try {
-                    String message = messageQueue.take();
-                    JSONObject jsonObject = new JSONObject(message);
-
-                    // ignore delete records for now
-                    if (jsonObject.has("delete")) {
-                        continue;
-                    }
-
-                    Status status = getStatus(jsonObject);
-
-                    EventWrapper ew = new EventWrapper(streamName, status, null);
-                    for (io.s4.listener.EventHandler handler : handlers) {
-                        try {
-                            handler.processEvent(ew);
-                        } catch (Exception e) {
-                            Logger.getLogger("s4")
-                                  .error("Exception in raw event handler", e);
-                        }
-                    }
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                } catch (Exception e) {
-                    Logger.getLogger("s4")
-                          .error("Exception processing message", e);
-                }
-            }
-        }
-
-        public Status getStatus(JSONObject jsonObject) {
-            try {
-                if (jsonObject == null || jsonObject.equals(JSONObject.NULL)) {
-                    return null;
-                }
-
-                Status status = new Status();
-
-                status.setUser(getUser((JSONObject) jsonObject.opt("user")));
-
-                Object value = jsonObject.opt("id");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    status.setId(((Number) value).longValue());
-                }
-
-                value = jsonObject.opt("in_reply_to_status_id");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    status.setInReplyToStatusId(((Number) value).longValue());
-                }
-
-                value = jsonObject.opt("text");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    status.setText((String) value);
-                }
-
-                value = jsonObject.opt("truncated");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    status.setTruncated((Boolean) value);
-                }
-
-                value = jsonObject.opt("source");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    status.setSource((String) value);
-                }
-
-                value = jsonObject.opt("in_reply_to_screen_name");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    status.setInReplyToScreenName((String) value);
-                }
-
-                value = jsonObject.opt("favorited");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    status.setFavorited((Boolean) value);
-                }
-
-                value = jsonObject.opt("in_reply_to_user_id");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    status.setInReplyToUserId(((Number) value).longValue());
-                }
-
-                value = jsonObject.opt("created_at");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    status.setCreatedAt((String) value);
-                }
-
-                return status;
-            } catch (Exception e) {
-                Logger.getLogger("s4").error(e);
-            }
-
-            return null;
-        }
-
-        public User getUser(JSONObject jsonObject) {
-            try {
-                if (jsonObject == null || jsonObject.equals(JSONObject.NULL)) {
-                    return null;
-                }
-
-                User user = new User();
-
-                Object value = jsonObject.opt("id");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setId(((Number) value).longValue());
-                }
-
-                value = jsonObject.opt("screen_name");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setScreenName((String) value);
-                }
-
-                value = jsonObject.opt("name");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setName((String) value);
-                }
-
-                value = jsonObject.opt("url");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setUrl((String) value);
-                }
-
-                value = jsonObject.opt("followers_count");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setFollowersCount(((Number) value).intValue());
-                }
-
-                value = jsonObject.opt("lang");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setLang((String) value);
-                }
-
-                value = jsonObject.opt("verified");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setVerified((Boolean) value);
-                }
-
-                value = jsonObject.opt("profile_image_url");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setProfileImageUrl((String) value);
-                }
-
-                value = jsonObject.opt("friends_count");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setFriendsCount(((Number) value).intValue());
-                }
-
-                value = jsonObject.opt("description");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setDescription((String) value);
-                }
-
-                value = jsonObject.opt("favourites_Count");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setFavouritesCount(((Number) value).intValue());
-                }
-
-                value = jsonObject.opt("geo_enabled");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setGeoEnabled((Boolean) value);
-                }
-
-                value = jsonObject.opt("listed_count");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setListedCount(((Number) value).intValue());
-                }
-
-                value = jsonObject.opt("profile_background_image_url");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setProfileBackgroundImageUrl((String) value);
-                }
-
-                value = jsonObject.opt("protected_user");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setProtectedUser((Boolean) value);
-                }
-
-                value = jsonObject.opt("location");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setLocation((String) value);
-                }
-
-                value = jsonObject.opt("statuses_count");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setStatusesCount(((Number) value).longValue());
-                }
-
-                value = jsonObject.opt("time_zone");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setTimeZone((String) value);
-                }
-
-                value = jsonObject.opt("contributors_enabled");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setContributorsEnabled((Boolean) value);
-                }
-
-                value = jsonObject.opt("utc_offset");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setUtcOffset(((Number) value).intValue());
-                }
-
-                value = jsonObject.opt("created_at");
-                if (value != null && !value.equals(JSONObject.NULL)) {
-                    user.setCreatedAt((String) value);
-                }
-
-                return user;
-            } catch (Exception e) {
-                Logger.getLogger("s4").error(e);
-            }
-
-            return null;
-        }
-    }
-
-    @Override
-    public void addHandler(EventHandler handler) {
-        handlers.add(handler);
-
-    }
-
-    @Override
-    public boolean removeHandler(EventHandler handler) {
-        return handlers.remove(handler);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TwitterFeedReader.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TwitterFeedReader.java b/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TwitterFeedReader.java
deleted file mode 100644
index 27b8f1d..0000000
--- a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/TwitterFeedReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package io.s4.example.twittertopiccount;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FilenameFilter;
-import java.io.InputStreamReader;
-import java.util.zip.GZIPInputStream;
-
-/**
- * 
- * Adapter for injecting twitter data from a twitter dump file, rather than from a live http stream.
- * 
- * The twitter dumps must be located in a directory in the file system
- *
- */
-public class TwitterFeedReader extends TwitterFeedListener {
-
-    String frequencyBySecond;
-    String twitterDumpsDir;
-    String twitterDumpsNamePattern = "\\A.+\\.gz\\z";
-
-    @Override
-    public void connectAndRead() throws Exception {
-        System.out.println("Reading files from dir " +  twitterDumpsDir + " matching: " + twitterDumpsNamePattern);
-        File[] dumps = new File(twitterDumpsDir)
-                .listFiles(new FilenameFilter() {
-                    @Override
-                    public boolean accept(File dir, String name) {
-                        return name.matches(twitterDumpsNamePattern);
-                    }
-                });
-        for (File dump : dumps) {
-            System.out.println("Reading file : " + dump.getAbsolutePath());
-            GZIPInputStream gzipIs = new GZIPInputStream(new FileInputStream(
-                    dump));
-            InputStreamReader isr = new InputStreamReader(gzipIs);
-            BufferedReader br = new BufferedReader(isr);
-            String line = null;
-            while ((line = br.readLine()) != null) {
-                // only consider lines with twitter json-encoded data
-                if (line.startsWith("{")) {
-//                    System.out.println("Adding line : " + line);
-                    messageQueue.add(line);
-                    Thread.sleep((1000 / Integer.valueOf(frequencyBySecond)));
-                }
-            }
-            br.close();
-        }
-        System.out.println("OK, read all dump files. Exiting normally.");
-        System.exit(0);
-    }
-
-    public void setFrequencyBySecond(String frequencyBySecond) {
-        this.frequencyBySecond = frequencyBySecond;
-    }
-
-    public void setTwitterDumpsDir(String twitterDumpsDir) {
-        this.twitterDumpsDir = twitterDumpsDir;
-    }
-
-    public void setTwitterDumpsNamePattern(String twitterDumpsNamePattern) {
-        this.twitterDumpsNamePattern = twitterDumpsNamePattern;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/User.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/User.java b/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/User.java
deleted file mode 100644
index ae6832a..0000000
--- a/s4-examples/twittertopiccount-ft/src/main/java/io/s4/example/twittertopiccount/User.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.twittertopiccount;
-
-public class User {
-    private long id;
-    private String screenName;
-    private String name;
-    private String url;
-    private int followersCount;
-    private String lang;
-    private boolean verified;
-    private String profileImageUrl;
-    private int friendsCount;
-    private String description;
-    private int favouritesCount;
-    private boolean geoEnabled;
-    private int listedCount;
-    private String profileBackgroundImageUrl;
-    private boolean protectedUser;
-    private String location;
-    private long statusesCount;
-    private String timeZone;
-    private boolean contributorsEnabled;
-    private int utcOffset;
-    private String createdAt;
-
-    public long getId() {
-        return id;
-    }
-
-    public void setId(long id) {
-        this.id = id;
-    }
-
-    public String getScreenName() {
-        return screenName;
-    }
-
-    public void setScreenName(String screenName) {
-        this.screenName = screenName;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getUrl() {
-        return url;
-    }
-
-    public void setUrl(String url) {
-        this.url = url;
-    }
-
-    public int getFollowersCount() {
-        return followersCount;
-    }
-
-    public void setFollowersCount(int followersCount) {
-        this.followersCount = followersCount;
-    }
-
-    public String getLang() {
-        return lang;
-    }
-
-    public void setLang(String lang) {
-        this.lang = lang;
-    }
-
-    public boolean isVerified() {
-        return verified;
-    }
-
-    public void setVerified(boolean verified) {
-        this.verified = verified;
-    }
-
-    public String getProfileImageUrl() {
-        return profileImageUrl;
-    }
-
-    public void setProfileImageUrl(String profileImageUrl) {
-        this.profileImageUrl = profileImageUrl;
-    }
-
-    public int getFriendsCount() {
-        return friendsCount;
-    }
-
-    public void setFriendsCount(int friendsCount) {
-        this.friendsCount = friendsCount;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public int getFavouritesCount() {
-        return favouritesCount;
-    }
-
-    public void setFavouritesCount(int favouritesCount) {
-        this.favouritesCount = favouritesCount;
-    }
-
-    public boolean isGeoEnabled() {
-        return geoEnabled;
-    }
-
-    public void setGeoEnabled(boolean geoEnabled) {
-        this.geoEnabled = geoEnabled;
-    }
-
-    public int getListedCount() {
-        return listedCount;
-    }
-
-    public void setListedCount(int listedCount) {
-        this.listedCount = listedCount;
-    }
-
-    public String getProfileBackgroundImageUrl() {
-        return profileBackgroundImageUrl;
-    }
-
-    public void setProfileBackgroundImageUrl(String profileBackgroundImageUrl) {
-        this.profileBackgroundImageUrl = profileBackgroundImageUrl;
-    }
-
-    public boolean isProtectedUser() {
-        return protectedUser;
-    }
-
-    public void setProtectedUser(boolean protectedUser) {
-        this.protectedUser = protectedUser;
-    }
-
-    public String getLocation() {
-        return location;
-    }
-
-    public void setLocation(String location) {
-        this.location = location;
-    }
-
-    public long getStatusesCount() {
-        return statusesCount;
-    }
-
-    public void setStatusesCount(long statusesCount) {
-        this.statusesCount = statusesCount;
-    }
-
-    public String getTimeZone() {
-        return timeZone;
-    }
-
-    public void setTimeZone(String timeZone) {
-        this.timeZone = timeZone;
-    }
-
-    public boolean isContributorsEnabled() {
-        return contributorsEnabled;
-    }
-
-    public void setContributorsEnabled(boolean contributorsEnabled) {
-        this.contributorsEnabled = contributorsEnabled;
-    }
-
-    public int getUtcOffset() {
-        return utcOffset;
-    }
-
-    public void setUtcOffset(int utcOffset) {
-        this.utcOffset = utcOffset;
-    }
-
-    public String getCreatedAt() {
-        return createdAt;
-    }
-
-    public void setCreatedAt(String createdAt) {
-        this.createdAt = createdAt;
-    }
-
-    public String toString() {
-        StringBuffer sb = new StringBuffer();
-        sb.append("{")
-          .append("id:")
-          .append(id)
-          .append(",")
-          .append("screenName:")
-          .append(screenName)
-          .append(",")
-          .append("name:")
-          .append(name)
-          .append(",")
-          .append("url:")
-          .append(url)
-          .append(",")
-          .append("followersCount:")
-          .append(followersCount)
-          .append(",")
-          .append("lang:")
-          .append(lang)
-          .append(",")
-          .append("verified:")
-          .append(verified)
-          .append(",")
-          .append("profileImageUrl:")
-          .append(profileImageUrl)
-          .append(",")
-          .append("friendsCount:")
-          .append(friendsCount)
-          .append(",")
-          .append("description:")
-          .append(description)
-          .append(",")
-          .append("favouritesCount:")
-          .append(favouritesCount)
-          .append(",")
-          .append("geoEnabled:")
-          .append(geoEnabled)
-          .append(",")
-          .append("listedCount:")
-          .append(listedCount)
-          .append(",")
-          .append("profileBackgroundImageUrl:")
-          .append(profileBackgroundImageUrl)
-          .append(",")
-          .append("protectedUser:")
-          .append(protectedUser)
-          .append(",")
-          .append("location:")
-          .append(location)
-          .append(",")
-          .append("statusesCount:")
-          .append(statusesCount)
-          .append(",")
-          .append("timeZone:")
-          .append(timeZone)
-          .append(",")
-          .append("contributorsEnabled:")
-          .append(contributorsEnabled)
-          .append(",")
-          .append("utcOffset:")
-          .append(utcOffset)
-          .append(",")
-          .append("createdAt:")
-          .append(createdAt)
-          .append("}");
-
-        return sb.toString();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/DirectToFilePersister.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/DirectToFilePersister.java b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/DirectToFilePersister.java
new file mode 100644
index 0000000..262b451
--- /dev/null
+++ b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/DirectToFilePersister.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.twittertopiccount;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import org.apache.s4.persist.Persister;
+
+public class DirectToFilePersister implements Persister {
+    private String outputFilename;
+    private int persistCount;
+
+    public void setOutputFilename(String outputFilename) {
+        this.outputFilename = outputFilename;
+    }
+
+    @Override
+    public int cleanOutGarbage() throws InterruptedException {
+        return 0;
+    }
+
+    @Override
+    public Object get(String arg0) throws InterruptedException {
+        return null;
+    }
+
+    @Override
+    public Map<String, Object> getBulk(String[] arg0)
+            throws InterruptedException {
+        return new HashMap<String, Object>();
+    }
+
+    @Override
+    public Map<String, Object> getBulkObjects(String[] arg0)
+            throws InterruptedException {
+        return new HashMap<String, Object>();
+    }
+
+    @Override
+    public int getCacheEntryCount() {
+        return 1;
+    }
+
+    @Override
+    public Object getObject(String arg0) throws InterruptedException {
+        return null;
+    }
+
+    @Override
+    public int getPersistCount() {
+        return persistCount;
+    }
+
+    @Override
+    public int getQueueSize() {
+        return 0;
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return new HashSet<String>();
+    }
+
+    @Override
+    public void remove(String arg0) throws InterruptedException {
+
+    }
+
+    @Override
+    public void set(String key, Object value, int persistTime)
+            throws InterruptedException {
+
+        FileWriter fw = null;
+        try {
+            fw = new FileWriter(outputFilename);
+            fw.write(String.valueOf(value));
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            Logger.getLogger("s4").error(e);
+        } finally {
+            if (fw != null) {
+                try {
+                    fw.close();
+                } catch (Exception e) {
+                }
+            }
+        }
+    }
+
+    @Override
+    public void setAsynch(String key, Object value, int persistTime) {
+        try {
+            set(key, value, persistTime);
+        } catch (InterruptedException ie) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/Status.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/Status.java b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/Status.java
new file mode 100644
index 0000000..89c697a
--- /dev/null
+++ b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/Status.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.twittertopiccount;
+
+public class Status {
+    private long id;
+    private long inReplyToStatusId;
+    private String text;
+    private boolean truncated;
+    private String source;
+    private String inReplyToScreenName;
+    private boolean favorited;
+    private User user;
+    private long inReplyToUserId;
+    private String createdAt;
+
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getInReplyToStatusId() {
+        return inReplyToStatusId;
+    }
+
+    public void setInReplyToStatusId(long inReplyToStatusId) {
+        this.inReplyToStatusId = inReplyToStatusId;
+    }
+
+    public String getText() {
+        return text;
+    }
+
+    public void setText(String text) {
+        this.text = text;
+    }
+
+    public boolean isTruncated() {
+        return truncated;
+    }
+
+    public void setTruncated(boolean truncated) {
+        this.truncated = truncated;
+    }
+
+    public String getSource() {
+        return source;
+    }
+
+    public void setSource(String source) {
+        this.source = source;
+    }
+
+    public String getInReplyToScreenName() {
+        return inReplyToScreenName;
+    }
+
+    public void setInReplyToScreenName(String inReplyToScreenName) {
+        this.inReplyToScreenName = inReplyToScreenName;
+    }
+
+    public boolean isFavorited() {
+        return favorited;
+    }
+
+    public void setFavorited(boolean favorited) {
+        this.favorited = favorited;
+    }
+
+    public User getUser() {
+        return user;
+    }
+
+    public void setUser(User user) {
+        this.user = user;
+    }
+
+    public long getInReplyToUserId() {
+        return inReplyToUserId;
+    }
+
+    public void setInReplyToUserId(long inReplyToUserId) {
+        this.inReplyToUserId = inReplyToUserId;
+    }
+
+    public String getCreatedAt() {
+        return createdAt;
+    }
+
+    public void setCreatedAt(String createdAt) {
+        this.createdAt = createdAt;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("{")
+          .append("id:")
+          .append(id)
+          .append(",")
+          .append("inReplyToStatusId:")
+          .append(inReplyToStatusId)
+          .append(",")
+          .append("text:")
+          .append(text)
+          .append(",")
+          .append("truncated:")
+          .append(truncated)
+          .append(",")
+          .append("source:")
+          .append(source)
+          .append(",")
+          .append("inReplyToScreenName:")
+          .append(inReplyToScreenName)
+          .append(",")
+          .append("favorited:")
+          .append(favorited)
+          .append(",")
+          .append("user:")
+          .append(user)
+          .append(",")
+          .append("inReplyToUserId:")
+          .append(inReplyToUserId)
+          .append(",")
+          .append("createdAt:")
+          .append(createdAt)
+          .append("}");
+
+        return sb.toString();
+    }
+
+    public Object clone() {
+        try {
+            return super.clone();
+        } catch (CloneNotSupportedException cnse) {
+            throw new RuntimeException(cnse);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopNTopicPE.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopNTopicPE.java b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopNTopicPE.java
new file mode 100644
index 0000000..80c5c86
--- /dev/null
+++ b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopNTopicPE.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.twittertopiccount;
+
+import org.apache.s4.persist.Persister;
+import org.apache.s4.processor.AbstractPE;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import com.google.gson.Gson;
+
+public class TopNTopicPE extends AbstractPE {
+    private String id;
+    private transient Persister persister;
+    private int entryCount = 10;
+    private Map<String, Integer> topicMap = new ConcurrentHashMap<String, Integer>();
+    private int persistTime;
+    private String persistKey = "myapp:topNTopics";
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public Persister getPersister() {
+        return persister;
+    }
+
+    public void setPersister(Persister persister) {
+        this.persister = persister;
+    }
+
+    public int getEntryCount() {
+        return entryCount;
+    }
+
+    public void setEntryCount(int entryCount) {
+        this.entryCount = entryCount;
+    }
+
+    public int getPersistTime() {
+        return persistTime;
+    }
+
+    public void setPersistTime(int persistTime) {
+        this.persistTime = persistTime;
+    }
+
+    public String getPersistKey() {
+        return persistKey;
+    }
+
+    public void setPersistKey(String persistKey) {
+        this.persistKey = persistKey;
+    }
+
+    public void processEvent(TopicSeen topicSeen) {
+        topicMap.put(topicSeen.getTopic(), topicSeen.getCount());
+    }
+
+    public ArrayList<TopNEntry> getTopTopics() {
+        if (entryCount < 1) 
+            return null;
+
+        ArrayList<TopNEntry> sortedList = new ArrayList<TopNEntry>();
+
+        for (String key : topicMap.keySet()) {
+            sortedList.add(new TopNEntry(key, topicMap.get(key)));
+        }
+
+        Collections.sort(sortedList);
+
+        // truncate: Yuck!!
+        // unfortunately, Kryo cannot deserialize RandomAccessSubList
+        // if we use ArrayList.subList(...)
+        while (sortedList.size() > entryCount)
+            sortedList.remove(sortedList.size() - 1);
+
+        return sortedList;
+    }
+
+    @Override
+    public void output() {
+        List<TopNEntry> sortedList = new ArrayList<TopNEntry>();
+
+        for (String key : topicMap.keySet()) {
+            sortedList.add(new TopNEntry(key, topicMap.get(key)));
+        }
+
+        Collections.sort(sortedList);
+
+        try {
+            JSONObject message = new JSONObject();
+            JSONArray jsonTopN = new JSONArray();
+
+            for (int i = 0; i < entryCount; i++) {
+                if (i == sortedList.size()) {
+                    break;
+                }
+                TopNEntry tne = sortedList.get(i);
+                JSONObject jsonEntry = new JSONObject();
+                jsonEntry.put("topic", tne.getTopic());
+                jsonEntry.put("count", tne.getCount());
+                jsonTopN.put(jsonEntry);
+            }
+            message.put("topN", jsonTopN);
+            persister.set(persistKey, message.toString()+"\n", persistTime);
+        } catch (Exception e) {
+            Logger.getLogger("s4").error(e);
+        }
+    }
+
+    @Override
+    public String getId() {
+        return this.id;
+    }
+
+    public static class TopNEntry implements Comparable<TopNEntry> {
+        public TopNEntry(String topic, int count) {
+            this.topic = topic;
+            this.count = count;
+        }
+
+        public TopNEntry() {}
+
+        String topic = null;
+        int count = 0;
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+        public void setCount(int count) {
+            this.count = count;
+        }
+
+        public int compareTo(TopNEntry topNEntry) {
+            if (topNEntry.getCount() < this.count) {
+                return -1;
+            } else if (topNEntry.getCount() > this.count) {
+                return 1;
+            }
+            return 0;
+        }
+
+        public String toString() {
+            return "topic:" + topic + " count:" + count;
+        }
+    }
+}


Mime
View raw message