incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [19/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicCountAndReportPE.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicCountAndReportPE.java b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicCountAndReportPE.java
new file mode 100644
index 0000000..e96b21c
--- /dev/null
+++ b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicCountAndReportPE.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.twittertopiccount;
+
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.processor.AbstractPE;
+
+public class TopicCountAndReportPE extends AbstractPE {
+    private String id;
+    private transient EventDispatcher dispatcher;
+    private String outputStreamName;
+    private int threshold;
+    private int count;
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public EventDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getOutputStreamName() {
+        return outputStreamName;
+    }
+
+    public void setOutputStreamName(String outputStreamName) {
+        this.outputStreamName = outputStreamName;
+    }
+
+    public int getThreshold() {
+        return threshold;
+    }
+
+    public void setThreshold(int threshold) {
+        this.threshold = threshold;
+    }
+
+    public void processEvent(TopicSeen topicSeen) {
+        count += topicSeen.getCount();
+    }
+
+    @Override
+    public void output() {
+        if (count < threshold) {
+            return;
+        }
+        TopicSeen topicSeen = new TopicSeen((String) this.getKeyValue().get(0),
+                                            count);
+        topicSeen.setReportKey("1");
+        dispatcher.dispatchEvent(outputStreamName, topicSeen);
+    }
+
+    @Override
+    public String getId() {
+        return this.id;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicExtractorPE.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicExtractorPE.java b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicExtractorPE.java
new file mode 100644
index 0000000..02a5600
--- /dev/null
+++ b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicExtractorPE.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.twittertopiccount;
+
+import org.apache.s4.dispatcher.Dispatcher;
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.processor.AbstractPE;
+
+public class TopicExtractorPE extends AbstractPE {
+    private String id;
+    private transient EventDispatcher dispatcher;
+    private String outputStreamName;
+
+    public EventDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getOutputStreamName() {
+        return outputStreamName;
+    }
+
+    public void setOutputStreamName(String outputStreamName) {
+        this.outputStreamName = outputStreamName;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void processEvent(Status status) {
+        String text = status.getText();
+        if (text == null) {
+            return;
+        }
+
+        int textLength = text.length();
+        int index = 0;
+        int prevIndex = 0;
+        while ((index = text.indexOf('#', prevIndex)) != -1) {
+            prevIndex = index + 1;
+            if (prevIndex == textLength) { // if hash is the last character
+                break; // get out
+            }
+            StringBuffer sb = new StringBuffer();
+            for (int i = index + 1; i < textLength; i++) {
+                char ch = text.charAt(i);
+                if (!Character.isLetterOrDigit(ch)) {
+                    break;
+                }
+                sb.append(ch);
+            }
+
+            if (sb.length() == 0) {
+                continue;
+            }
+
+            TopicSeen topicSeen = new TopicSeen(sb.toString().toLowerCase(), 1);
+            dispatcher.dispatchEvent(outputStreamName, topicSeen);
+        }
+    }
+
+    @Override
+    public void output() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public String getId() {
+        return this.id;
+    }
+
+    static class DummyDispatcher extends Dispatcher {
+        public void dispatchEvent(String streamName, Object event) {
+            System.out.println(event);
+        }
+    }
+
+    public static void main(String args[]) {
+        TopicExtractorPE te = new TopicExtractorPE();
+        te.setDispatcher(new DummyDispatcher());
+        te.setOutputStreamName("test");
+
+        Status status = new Status();
+        status.setText("Hey this is a test");
+        te.processEvent(status);
+
+        status.setText("This is an edge test #");
+        te.processEvent(status);
+
+        status.setText("#GLOB this is a test");
+        te.processEvent(status);
+
+        status.setText("Hey there #FLOB, this is a test #GLOB");
+        te.processEvent(status);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicSeen.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicSeen.java b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicSeen.java
new file mode 100644
index 0000000..c5d585f
--- /dev/null
+++ b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TopicSeen.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.twittertopiccount;
+
+public class TopicSeen {
+    private String topic;
+    private int count;
+    private String reportKey;
+
+    public TopicSeen() {
+
+    }
+
+    public TopicSeen(String topic, int count) {
+        this.topic = topic;
+        this.count = count;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    public void setCount(int count) {
+        this.count = count;
+    }
+
+    public String getReportKey() {
+        return reportKey;
+    }
+
+    public void setReportKey(String reportKey) {
+        this.reportKey = reportKey;
+    }
+
+    public String toString() {
+        return "{topic:" + topic + "}";
+    }
+
+    public Object clone() {
+        try {
+            return super.clone();
+        } catch (CloneNotSupportedException cnse) {
+            throw new RuntimeException(cnse);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TwitterFeedListener.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TwitterFeedListener.java b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TwitterFeedListener.java
new file mode 100644
index 0000000..ffdff73
--- /dev/null
+++ b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TwitterFeedListener.java
@@ -0,0 +1,355 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.twittertopiccount;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+import org.json.JSONObject;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.httpclient.util.EncodingUtil;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.listener.EventProducer;
+
+public class TwitterFeedListener implements EventProducer, Runnable {
+    private String userid;
+    private String password;
+    private String urlString;
+    private long maxBackoffTime = 30 * 1000; // 5 seconds
+    private long messageCount = 0;
+    private long blankCount = 0;
+    private String streamName;
+
+    protected LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>();
+    private Set<org.apache.s4.listener.EventHandler> handlers = new HashSet<org.apache.s4.listener.EventHandler>();
+
+    public void setUserid(String userid) {
+        this.userid = userid;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public void setUrlString(String urlString) {
+        this.urlString = urlString;
+    }
+
+    public void setMaxBackoffTime(long maxBackoffTime) {
+        this.maxBackoffTime = maxBackoffTime;
+    }
+
+    public void setStreamName(String streamName) {
+        this.streamName = streamName;
+    }
+
+    public void init() {
+        for (int i = 0; i < 12; i++) {
+            Dequeuer dequeuer = new Dequeuer(i);
+            Thread t = new Thread(dequeuer);
+            t.start();
+        }
+        (new Thread(this)).start();
+    }
+
+    public void run() {
+        long backoffTime = 1000;
+        while (!Thread.interrupted()) {
+            try {
+                connectAndRead();
+            } catch (Exception e) {
+                Logger.getLogger("s4").error("Exception reading feed", e);
+                try {
+                    Thread.sleep(backoffTime);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+                backoffTime = backoffTime * 2;
+                if (backoffTime > maxBackoffTime) {
+                    backoffTime = maxBackoffTime;
+                }
+            }
+        }
+    }
+
+    public void connectAndRead() throws Exception {
+        URL url = new URL(urlString);
+
+        URLConnection connection = url.openConnection();
+        String userPassword = userid + ":" + password;
+        String encoded = EncodingUtil.getAsciiString(Base64.encodeBase64(EncodingUtil.getAsciiBytes(userPassword)));
+        connection.setRequestProperty("Authorization", "Basic " + encoded);
+        connection.connect();
+
+        InputStream is = connection.getInputStream();
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        String inputLine = null;
+        while ((inputLine = br.readLine()) != null) {
+            if (inputLine.trim().length() == 0) {
+                blankCount++;
+                continue;
+            }
+            messageCount++;
+            messageQueue.add(inputLine);
+        }
+    }
+
+    class Dequeuer implements Runnable {
+        private int id;
+
+        public Dequeuer(int id) {
+            this.id = id;
+        }
+
+        public void run() {
+            while (!Thread.interrupted()) {
+                try {
+                    String message = messageQueue.take();
+                    JSONObject jsonObject = new JSONObject(message);
+
+                    // ignore delete records for now
+                    if (jsonObject.has("delete")) {
+                        continue;
+                    }
+
+                    Status status = getStatus(jsonObject);
+
+                    EventWrapper ew = new EventWrapper(streamName, status, null);
+                    for (org.apache.s4.listener.EventHandler handler : handlers) {
+                        try {
+                            handler.processEvent(ew);
+                        } catch (Exception e) {
+                            Logger.getLogger("s4")
+                                  .error("Exception in raw event handler", e);
+                        }
+                    }
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                } catch (Exception e) {
+                    Logger.getLogger("s4")
+                          .error("Exception processing message", e);
+                }
+            }
+        }
+
+        public Status getStatus(JSONObject jsonObject) {
+            try {
+                if (jsonObject == null || jsonObject.equals(JSONObject.NULL)) {
+                    return null;
+                }
+
+                Status status = new Status();
+
+                status.setUser(getUser((JSONObject) jsonObject.opt("user")));
+
+                Object value = jsonObject.opt("id");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    status.setId(((Number) value).longValue());
+                }
+
+                value = jsonObject.opt("in_reply_to_status_id");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    status.setInReplyToStatusId(((Number) value).longValue());
+                }
+
+                value = jsonObject.opt("text");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    status.setText((String) value);
+                }
+
+                value = jsonObject.opt("truncated");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    status.setTruncated((Boolean) value);
+                }
+
+                value = jsonObject.opt("source");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    status.setSource((String) value);
+                }
+
+                value = jsonObject.opt("in_reply_to_screen_name");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    status.setInReplyToScreenName((String) value);
+                }
+
+                value = jsonObject.opt("favorited");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    status.setFavorited((Boolean) value);
+                }
+
+                value = jsonObject.opt("in_reply_to_user_id");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    status.setInReplyToUserId(((Number) value).longValue());
+                }
+
+                value = jsonObject.opt("created_at");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    status.setCreatedAt((String) value);
+                }
+
+                return status;
+            } catch (Exception e) {
+                Logger.getLogger("s4").error(e);
+            }
+
+            return null;
+        }
+
+        public User getUser(JSONObject jsonObject) {
+            try {
+                if (jsonObject == null || jsonObject.equals(JSONObject.NULL)) {
+                    return null;
+                }
+
+                User user = new User();
+
+                Object value = jsonObject.opt("id");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setId(((Number) value).longValue());
+                }
+
+                value = jsonObject.opt("screen_name");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setScreenName((String) value);
+                }
+
+                value = jsonObject.opt("name");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setName((String) value);
+                }
+
+                value = jsonObject.opt("url");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setUrl((String) value);
+                }
+
+                value = jsonObject.opt("followers_count");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setFollowersCount(((Number) value).intValue());
+                }
+
+                value = jsonObject.opt("lang");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setLang((String) value);
+                }
+
+                value = jsonObject.opt("verified");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setVerified((Boolean) value);
+                }
+
+                value = jsonObject.opt("profile_image_url");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setProfileImageUrl((String) value);
+                }
+
+                value = jsonObject.opt("friends_count");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setFriendsCount(((Number) value).intValue());
+                }
+
+                value = jsonObject.opt("description");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setDescription((String) value);
+                }
+
+                value = jsonObject.opt("favourites_Count");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setFavouritesCount(((Number) value).intValue());
+                }
+
+                value = jsonObject.opt("geo_enabled");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setGeoEnabled((Boolean) value);
+                }
+
+                value = jsonObject.opt("listed_count");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setListedCount(((Number) value).intValue());
+                }
+
+                value = jsonObject.opt("profile_background_image_url");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setProfileBackgroundImageUrl((String) value);
+                }
+
+                value = jsonObject.opt("protected_user");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setProtectedUser((Boolean) value);
+                }
+
+                value = jsonObject.opt("location");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setLocation((String) value);
+                }
+
+                value = jsonObject.opt("statuses_count");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setStatusesCount(((Number) value).longValue());
+                }
+
+                value = jsonObject.opt("time_zone");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setTimeZone((String) value);
+                }
+
+                value = jsonObject.opt("contributors_enabled");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setContributorsEnabled((Boolean) value);
+                }
+
+                value = jsonObject.opt("utc_offset");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setUtcOffset(((Number) value).intValue());
+                }
+
+                value = jsonObject.opt("created_at");
+                if (value != null && !value.equals(JSONObject.NULL)) {
+                    user.setCreatedAt((String) value);
+                }
+
+                return user;
+            } catch (Exception e) {
+                Logger.getLogger("s4").error(e);
+            }
+
+            return null;
+        }
+    }
+
+    @Override
+    public void addHandler(EventHandler handler) {
+        handlers.add(handler);
+
+    }
+
+    @Override
+    public boolean removeHandler(EventHandler handler) {
+        return handlers.remove(handler);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TwitterFeedReader.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TwitterFeedReader.java b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TwitterFeedReader.java
new file mode 100644
index 0000000..31d4b4d
--- /dev/null
+++ b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/TwitterFeedReader.java
@@ -0,0 +1,66 @@
+package org.apache.s4.example.twittertopiccount;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.InputStreamReader;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * 
+ * Adapter for injecting twitter data from a twitter dump file, rather than from a live http stream.
+ * 
+ * The twitter dumps must be located in a directory in the file system
+ *
+ */
+public class TwitterFeedReader extends TwitterFeedListener {
+
+    String frequencyBySecond;
+    String twitterDumpsDir;
+    String twitterDumpsNamePattern = "\\A.+\\.gz\\z";
+
+    @Override
+    public void connectAndRead() throws Exception {
+        System.out.println("Reading files from dir " +  twitterDumpsDir + " matching: " + twitterDumpsNamePattern);
+        File[] dumps = new File(twitterDumpsDir)
+                .listFiles(new FilenameFilter() {
+                    @Override
+                    public boolean accept(File dir, String name) {
+                        return name.matches(twitterDumpsNamePattern);
+                    }
+                });
+        for (File dump : dumps) {
+            System.out.println("Reading file : " + dump.getAbsolutePath());
+            GZIPInputStream gzipIs = new GZIPInputStream(new FileInputStream(
+                    dump));
+            InputStreamReader isr = new InputStreamReader(gzipIs);
+            BufferedReader br = new BufferedReader(isr);
+            String line = null;
+            while ((line = br.readLine()) != null) {
+                // only consider lines with twitter json-encoded data
+                if (line.startsWith("{")) {
+//                    System.out.println("Adding line : " + line);
+                    messageQueue.add(line);
+                    Thread.sleep((1000 / Integer.valueOf(frequencyBySecond)));
+                }
+            }
+            br.close();
+        }
+        System.out.println("OK, read all dump files. Exiting normally.");
+        System.exit(0);
+    }
+
+    public void setFrequencyBySecond(String frequencyBySecond) {
+        this.frequencyBySecond = frequencyBySecond;
+    }
+
+    public void setTwitterDumpsDir(String twitterDumpsDir) {
+        this.twitterDumpsDir = twitterDumpsDir;
+    }
+
+    public void setTwitterDumpsNamePattern(String twitterDumpsNamePattern) {
+        this.twitterDumpsNamePattern = twitterDumpsNamePattern;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/User.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/User.java b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/User.java
new file mode 100644
index 0000000..08b21c3
--- /dev/null
+++ b/s4-examples/twittertopiccount-ft/src/main/java/org/apache/s4/example/twittertopiccount/User.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.twittertopiccount;
+
+public class User {
+    private long id;
+    private String screenName;
+    private String name;
+    private String url;
+    private int followersCount;
+    private String lang;
+    private boolean verified;
+    private String profileImageUrl;
+    private int friendsCount;
+    private String description;
+    private int favouritesCount;
+    private boolean geoEnabled;
+    private int listedCount;
+    private String profileBackgroundImageUrl;
+    private boolean protectedUser;
+    private String location;
+    private long statusesCount;
+    private String timeZone;
+    private boolean contributorsEnabled;
+    private int utcOffset;
+    private String createdAt;
+
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public String getScreenName() {
+        return screenName;
+    }
+
+    public void setScreenName(String screenName) {
+        this.screenName = screenName;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public int getFollowersCount() {
+        return followersCount;
+    }
+
+    public void setFollowersCount(int followersCount) {
+        this.followersCount = followersCount;
+    }
+
+    public String getLang() {
+        return lang;
+    }
+
+    public void setLang(String lang) {
+        this.lang = lang;
+    }
+
+    public boolean isVerified() {
+        return verified;
+    }
+
+    public void setVerified(boolean verified) {
+        this.verified = verified;
+    }
+
+    public String getProfileImageUrl() {
+        return profileImageUrl;
+    }
+
+    public void setProfileImageUrl(String profileImageUrl) {
+        this.profileImageUrl = profileImageUrl;
+    }
+
+    public int getFriendsCount() {
+        return friendsCount;
+    }
+
+    public void setFriendsCount(int friendsCount) {
+        this.friendsCount = friendsCount;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public int getFavouritesCount() {
+        return favouritesCount;
+    }
+
+    public void setFavouritesCount(int favouritesCount) {
+        this.favouritesCount = favouritesCount;
+    }
+
+    public boolean isGeoEnabled() {
+        return geoEnabled;
+    }
+
+    public void setGeoEnabled(boolean geoEnabled) {
+        this.geoEnabled = geoEnabled;
+    }
+
+    public int getListedCount() {
+        return listedCount;
+    }
+
+    public void setListedCount(int listedCount) {
+        this.listedCount = listedCount;
+    }
+
+    public String getProfileBackgroundImageUrl() {
+        return profileBackgroundImageUrl;
+    }
+
+    public void setProfileBackgroundImageUrl(String profileBackgroundImageUrl) {
+        this.profileBackgroundImageUrl = profileBackgroundImageUrl;
+    }
+
+    public boolean isProtectedUser() {
+        return protectedUser;
+    }
+
+    public void setProtectedUser(boolean protectedUser) {
+        this.protectedUser = protectedUser;
+    }
+
+    public String getLocation() {
+        return location;
+    }
+
+    public void setLocation(String location) {
+        this.location = location;
+    }
+
+    public long getStatusesCount() {
+        return statusesCount;
+    }
+
+    public void setStatusesCount(long statusesCount) {
+        this.statusesCount = statusesCount;
+    }
+
+    public String getTimeZone() {
+        return timeZone;
+    }
+
+    public void setTimeZone(String timeZone) {
+        this.timeZone = timeZone;
+    }
+
+    public boolean isContributorsEnabled() {
+        return contributorsEnabled;
+    }
+
+    public void setContributorsEnabled(boolean contributorsEnabled) {
+        this.contributorsEnabled = contributorsEnabled;
+    }
+
+    public int getUtcOffset() {
+        return utcOffset;
+    }
+
+    public void setUtcOffset(int utcOffset) {
+        this.utcOffset = utcOffset;
+    }
+
+    public String getCreatedAt() {
+        return createdAt;
+    }
+
+    public void setCreatedAt(String createdAt) {
+        this.createdAt = createdAt;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("{")
+          .append("id:")
+          .append(id)
+          .append(",")
+          .append("screenName:")
+          .append(screenName)
+          .append(",")
+          .append("name:")
+          .append(name)
+          .append(",")
+          .append("url:")
+          .append(url)
+          .append(",")
+          .append("followersCount:")
+          .append(followersCount)
+          .append(",")
+          .append("lang:")
+          .append(lang)
+          .append(",")
+          .append("verified:")
+          .append(verified)
+          .append(",")
+          .append("profileImageUrl:")
+          .append(profileImageUrl)
+          .append(",")
+          .append("friendsCount:")
+          .append(friendsCount)
+          .append(",")
+          .append("description:")
+          .append(description)
+          .append(",")
+          .append("favouritesCount:")
+          .append(favouritesCount)
+          .append(",")
+          .append("geoEnabled:")
+          .append(geoEnabled)
+          .append(",")
+          .append("listedCount:")
+          .append(listedCount)
+          .append(",")
+          .append("profileBackgroundImageUrl:")
+          .append(profileBackgroundImageUrl)
+          .append(",")
+          .append("protectedUser:")
+          .append(protectedUser)
+          .append(",")
+          .append("location:")
+          .append(location)
+          .append(",")
+          .append("statusesCount:")
+          .append(statusesCount)
+          .append(",")
+          .append("timeZone:")
+          .append(timeZone)
+          .append(",")
+          .append("contributorsEnabled:")
+          .append(contributorsEnabled)
+          .append(",")
+          .append("utcOffset:")
+          .append(utcOffset)
+          .append(",")
+          .append("createdAt:")
+          .append(createdAt)
+          .append("}");
+
+        return sb.toString();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/resources/s4-core-conf.xml
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/resources/s4-core-conf.xml b/s4-examples/twittertopiccount-ft/src/main/resources/s4-core-conf.xml
index 2c1a0c6..d45b49e 100644
--- a/s4-examples/twittertopiccount-ft/src/main/resources/s4-core-conf.xml
+++ b/s4-examples/twittertopiccount-ft/src/main/resources/s4-core-conf.xml
@@ -13,22 +13,22 @@
     <property name="ignoreUnresolvablePlaceholders" value="true"/>
   </bean> 
   
-  <bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher"/>
+  <bean id="hasher" class="org.apache.s4.dispatcher.partitioner.DefaultHasher"/>
   
-  <bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter" init-method="init">
+  <bean id="commLayerEmitterToAdapter" class="org.apache.s4.emitter.CommLayerEmitter" init-method="init">
     <property name="serDeser" ref="serDeser"/>
     <property name="listener" ref="rawListener"/>
     <property name="listenerAppName" value="${adapter_app_name}"/>
     <property name="monitor" ref="monitor"/>
   </bean>
 
-  <bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter" init-method="init">
+  <bean id="commLayerEmitter" class="org.apache.s4.emitter.CommLayerEmitter" init-method="init">
     <property name="serDeser" ref="serDeser"/>
     <property name="listener" ref="rawListener"/>
     <property name="monitor" ref="monitor"/>
   </bean>
 
-  <bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+  <bean id="serDeser" class="org.apache.s4.serialize.KryoSerDeser">
     <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}"/>
     <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}"/>
   </bean>
@@ -39,7 +39,7 @@
        then the event is sent to the adapter
        (via ctrlDispatcherAdapter). Else it is sent to the
        S4 cluster itself (via ctrlDispatcherS4) -->
-  <bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+  <bean id="ctrlDispatcher" class="org.apache.s4.dispatcher.MultiDispatcher">
     <property name="dispatchers">
       <list>
         <ref bean="ctrlDispatcherFilteredS4"/>
@@ -48,7 +48,7 @@
     </property>
   </bean>
 
-  <bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+  <bean id="ctrlDispatcherFilteredAdapter" class="org.apache.s4.dispatcher.StreamSelectingDispatcher">
     <property name="dispatcher" ref="ctrlDispatcherAdapter"/>
     <property name="streams">
       <list>
@@ -57,7 +57,7 @@
     </property>
   </bean>
 
-  <bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+  <bean id="ctrlDispatcherFilteredS4" class="org.apache.s4.dispatcher.StreamExcludingDispatcher">
     <property name="dispatcher" ref="ctrlDispatcherS4"/>
     <property name="streams">
       <list>
@@ -66,12 +66,12 @@
     </property>
   </bean>
 
-  <bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+  <bean id="genericPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
     <property name="hasher" ref="hasher"/>
     <property name="debug" value="false"/>
   </bean>
 
-  <bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher" init-method="init">
+  <bean id="ctrlDispatcherS4" class="org.apache.s4.dispatcher.Dispatcher" init-method="init">
     <property name="partitioners">
       <list>
         <ref bean="genericPartitioner"/>
@@ -81,7 +81,7 @@
     <property name="loggerName" value="s4"/>
   </bean>
 
-  <bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher" init-method="init">
+  <bean id="ctrlDispatcherAdapter" class="org.apache.s4.dispatcher.Dispatcher" init-method="init">
     <property name="partitioners">
       <list>
         <ref bean="genericPartitioner"/>
@@ -93,11 +93,11 @@
   <!-- END: Dispatchers for control events -->
 
   <!-- Control Events handler -->
-  <bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+  <bean id="ctrlHandler" class="org.apache.s4.processor.ControlEventProcessor">
     <property name="dispatcher" ref="ctrlDispatcher"/>
   </bean>
 
-  <bean id="peContainer" class="io.s4.processor.PEContainer" init-method="init" lazy-init="true">
+  <bean id="peContainer" class="org.apache.s4.processor.PEContainer" init-method="init" lazy-init="true">
     <property name="maxQueueSize" value="${pe_container_max_queue_size}"/>
     <property name="monitor" ref="monitor"/>
     <property name="trackByKey" value="true"/>
@@ -106,7 +106,7 @@
     <property name="safeKeeper" ref="safeKeeper"/>
   </bean>
 
-  <bean id="rawListener" class="io.s4.listener.CommLayerListener" init-method="init">
+  <bean id="rawListener" class="org.apache.s4.listener.CommLayerListener" init-method="init">
     <property name="serDeser" ref="serDeser"/>
     <property name="clusterManagerAddress" value="${zk_address}"/>
     <property name="appName" value="${s4_app_name}"/>
@@ -114,18 +114,18 @@
     <property name="monitor" ref="monitor"/>
   </bean>
 
-  <bean id="eventListener" class="io.s4.collector.EventListener" init-method="init">
+  <bean id="eventListener" class="org.apache.s4.collector.EventListener" init-method="init">
     <property name="rawListener" ref="rawListener"/>
     <property name="peContainer" ref="peContainer"/>
     <property name="monitor" ref="monitor"/>
   </bean>
 
-  <bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true" init-method="init">
+  <bean id="monitor" class="org.apache.s4.logger.Log4jMonitor" lazy-init="true" init-method="init">
     <property name="flushInterval" value="30"/>
     <property name="loggerName" value="monitor"/>
   </bean>
 
-  <bean id="watcher" class="io.s4.util.Watcher" init-method="init" lazy-init="true">
+  <bean id="watcher" class="org.apache.s4.util.Watcher" init-method="init" lazy-init="true">
     <property name="monitor" ref="monitor"/>
     <property name="peContainer" ref="peContainer"/>
     <property name="minimumMemory" value="52428800"/>
@@ -137,7 +137,7 @@
   <!-- Some useful beans related to client-adapter for apps -->
 
   <!-- Dispatcher to send to all adapter nodes. -->
-  <bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher" init-method="init">
+  <bean id="dispatcherToClientAdapters" class="org.apache.s4.dispatcher.Dispatcher" init-method="init">
     <property name="partitioners">
       <list>
         <ref bean="broadcastPartitioner"/>
@@ -148,9 +148,9 @@
   </bean>
 
   <!-- Partitioner to achieve broadcast -->
-  <bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner"/>
+  <bean id="broadcastPartitioner" class="org.apache.s4.dispatcher.partitioner.BroadcastPartitioner"/>
   
-  <bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+  <bean id="loopbackDispatcher" class="org.apache.s4.dispatcher.Dispatcher"
         init-method="init">
         <property name="partitioners">
             <list>
@@ -161,11 +161,11 @@
         <property name="loggerName" value="s4" />
     </bean>
 
-    <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+    <bean id="loopbackPartitioner" class="org.apache.s4.dispatcher.partitioner.LoopbackPartitioner">
         <property name="eventEmitter" ref="commLayerEmitter"/>
     </bean>
 
-    <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+    <bean id="safeKeeper" class="org.apache.s4.ft.SafeKeeper" init-method="init">
         <property name="stateStorage" ref="fsStateStorage" />
         <property name="loopbackDispatcher" ref="loopbackDispatcher" />
         <property name="serializer" ref="serDeser"/>
@@ -173,11 +173,11 @@
         <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
     </bean>
     
-    <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+    <bean id="loggingStorageCallbackFactory" class="org.apache.s4.ft.LoggingStorageCallbackFactory"/>
 
 
 
-    <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="init">
+    <bean id="fsStateStorage" class="org.apache.s4.ft.DefaultFileSystemStateStorage" init-method="init">
         <!-- if not specified, default is <current_dir>/tmp/storage 
         <property name="storageRootPath" value="${storage_root_path}" /> -->
     </bean>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-ft/src/main/resources/s4-example-twittertopiccount-ft-conf.xml
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-ft/src/main/resources/s4-example-twittertopiccount-ft-conf.xml b/s4-examples/twittertopiccount-ft/src/main/resources/s4-example-twittertopiccount-ft-conf.xml
index 556739d..3fcd095 100644
--- a/s4-examples/twittertopiccount-ft/src/main/resources/s4-example-twittertopiccount-ft-conf.xml
+++ b/s4-examples/twittertopiccount-ft/src/main/resources/s4-example-twittertopiccount-ft-conf.xml
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
 
-  <!-- <bean id="printEventPE" class="io.s4.processor.PrintEventPE">
+  <!-- <bean id="printEventPE" class="org.apache.s4.processor.PrintEventPE">
     <property name="id" value="printEventPE"/>
     <property name="keys">
       <list>
@@ -10,7 +10,7 @@
     </property>
   </bean> -->
 
-  <bean id="topicExtractorPE" class="io.s4.example.twittertopiccount.TopicExtractorPE">
+  <bean id="topicExtractorPE" class="org.apache.s4.example.twittertopiccount.TopicExtractorPE">
     <property name="id" value="topicSeenPE"/>
     <property name="keys">
       <list>
@@ -21,7 +21,7 @@
     <property name="outputStreamName" value="TopicSeen"/>
   </bean>
 
-  <bean id="topicCountAndReportPE" class="io.s4.example.twittertopiccount.TopicCountAndReportPE">
+  <bean id="topicCountAndReportPE" class="org.apache.s4.example.twittertopiccount.TopicCountAndReportPE">
     <property name="id" value="topicCountAndReportPE"/>
     <property name="keys">
       <list>
@@ -35,11 +35,11 @@
     <property name="ttl" value="36000"/>
   </bean>
 
-  <bean id="dtfPersister" class="io.s4.example.twittertopiccount.DirectToFilePersister">
+  <bean id="dtfPersister" class="org.apache.s4.example.twittertopiccount.DirectToFilePersister">
     <property name="outputFilename" value="/tmp/top_n_hashtags"/>
   </bean>
 
-  <bean id="top10TopicPE" class="io.s4.example.twittertopiccount.TopNTopicPE">
+  <bean id="top10TopicPE" class="org.apache.s4.example.twittertopiccount.TopNTopicPE">
     <property name="id" value="top10TopicPE"/>
     <property name="keys">
       <list>
@@ -54,7 +54,7 @@
     <property name="ttl" value="36000"/>
   </bean>
 
-  <bean id="topicSeenPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+  <bean id="topicSeenPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
     <property name="streamNames">
       <list>
         <value>TopicSeen</value>
@@ -69,7 +69,7 @@
     <property name="debug" value="false"/>
   </bean>
 
-  <bean id="aggregatedTopicSeenPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+  <bean id="aggregatedTopicSeenPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
     <property name="streamNames">
       <list>
         <value>AggregatedTopicSeen</value>
@@ -84,7 +84,7 @@
     <property name="debug" value="false"/>
   </bean>
 
-  <bean id="twitDispatcher" class="io.s4.dispatcher.Dispatcher" init-method="init">
+  <bean id="twitDispatcher" class="org.apache.s4.dispatcher.Dispatcher" init-method="init">
     <property name="partitioners">
       <list>
         <ref bean="topicSeenPartitioner"/>
@@ -96,7 +96,7 @@
   </bean>
 
   <!-- dispatcher to S4 and client adapter -->
-  <bean id="forkdispatcher" class="io.s4.dispatcher.MultiDispatcher">
+  <bean id="forkdispatcher" class="org.apache.s4.dispatcher.MultiDispatcher">
     <property name="dispatchers">
       <list>
 
@@ -105,7 +105,7 @@
 
 
         <!-- send some streams to client adapters -->
-        <bean id="selectiveDispatchToAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+        <bean id="selectiveDispatchToAdapter" class="org.apache.s4.dispatcher.StreamSelectingDispatcher">
           <property name="dispatcher" ref="dispatcherToClientAdapters"/>
           <property name="streams">
             <list>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/java/io/s4/example/twittertopiccount/DirectToFilePersister.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/java/io/s4/example/twittertopiccount/DirectToFilePersister.java b/s4-examples/twittertopiccount-scala/src/main/java/io/s4/example/twittertopiccount/DirectToFilePersister.java
deleted file mode 100644
index 02dc809..0000000
--- a/s4-examples/twittertopiccount-scala/src/main/java/io/s4/example/twittertopiccount/DirectToFilePersister.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.example.twittertopiccount;
-
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import io.s4.persist.Persister;
-
-public class DirectToFilePersister implements Persister {
-    private String outputFilename;
-    private int persistCount;
-
-    public void setOutputFilename(String outputFilename) {
-        this.outputFilename = outputFilename;
-    }
-
-    @Override
-    public int cleanOutGarbage() throws InterruptedException {
-        return 0;
-    }
-
-    @Override
-    public Object get(String arg0) throws InterruptedException {
-        return null;
-    }
-
-    @Override
-    public Map<String, Object> getBulk(String[] arg0)
-            throws InterruptedException {
-        return new HashMap<String, Object>();
-    }
-
-    @Override
-    public Map<String, Object> getBulkObjects(String[] arg0)
-            throws InterruptedException {
-        return new HashMap<String, Object>();
-    }
-
-    @Override
-    public int getCacheEntryCount() {
-        return 1;
-    }
-
-    @Override
-    public Object getObject(String arg0) throws InterruptedException {
-        return null;
-    }
-
-    @Override
-    public int getPersistCount() {
-        return persistCount;
-    }
-
-    @Override
-    public int getQueueSize() {
-        return 0;
-    }
-
-    @Override
-    public Set<String> keySet() {
-        return new HashSet<String>();
-    }
-
-    @Override
-    public void remove(String arg0) throws InterruptedException {
-
-    }
-
-    @Override
-    public void set(String key, Object value, int persistTime)
-            throws InterruptedException {
-
-        FileWriter fw = null;
-        try {
-            fw = new FileWriter(outputFilename);
-            fw.write(String.valueOf(value));
-        } catch (IOException e) {
-            // TODO Auto-generated catch block
-            Logger.getLogger("s4").error(e);
-        } finally {
-            if (fw != null) {
-                try {
-                    fw.close();
-                } catch (Exception e) {
-                }
-            }
-        }
-    }
-
-    @Override
-    public void setAsynch(String key, Object value, int persistTime) {
-        try {
-            set(key, value, persistTime);
-        } catch (InterruptedException ie) {
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/java/org/apache/s4/example/twittertopiccount/DirectToFilePersister.java
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/java/org/apache/s4/example/twittertopiccount/DirectToFilePersister.java b/s4-examples/twittertopiccount-scala/src/main/java/org/apache/s4/example/twittertopiccount/DirectToFilePersister.java
new file mode 100644
index 0000000..262b451
--- /dev/null
+++ b/s4-examples/twittertopiccount-scala/src/main/java/org/apache/s4/example/twittertopiccount/DirectToFilePersister.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.twittertopiccount;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import org.apache.s4.persist.Persister;
+
+public class DirectToFilePersister implements Persister {
+    private String outputFilename;
+    private int persistCount;
+
+    public void setOutputFilename(String outputFilename) {
+        this.outputFilename = outputFilename;
+    }
+
+    @Override
+    public int cleanOutGarbage() throws InterruptedException {
+        return 0;
+    }
+
+    @Override
+    public Object get(String arg0) throws InterruptedException {
+        return null;
+    }
+
+    @Override
+    public Map<String, Object> getBulk(String[] arg0)
+            throws InterruptedException {
+        return new HashMap<String, Object>();
+    }
+
+    @Override
+    public Map<String, Object> getBulkObjects(String[] arg0)
+            throws InterruptedException {
+        return new HashMap<String, Object>();
+    }
+
+    @Override
+    public int getCacheEntryCount() {
+        return 1;
+    }
+
+    @Override
+    public Object getObject(String arg0) throws InterruptedException {
+        return null;
+    }
+
+    @Override
+    public int getPersistCount() {
+        return persistCount;
+    }
+
+    @Override
+    public int getQueueSize() {
+        return 0;
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return new HashSet<String>();
+    }
+
+    @Override
+    public void remove(String arg0) throws InterruptedException {
+
+    }
+
+    @Override
+    public void set(String key, Object value, int persistTime)
+            throws InterruptedException {
+
+        FileWriter fw = null;
+        try {
+            fw = new FileWriter(outputFilename);
+            fw.write(String.valueOf(value));
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            Logger.getLogger("s4").error(e);
+        } finally {
+            if (fw != null) {
+                try {
+                    fw.close();
+                } catch (Exception e) {
+                }
+            }
+        }
+    }
+
+    @Override
+    public void setAsynch(String key, Object value, int persistTime) {
+        try {
+            set(key, value, persistTime);
+        } catch (InterruptedException ie) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/resources/adapter_conf.xml
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/resources/adapter_conf.xml b/s4-examples/twittertopiccount-scala/src/main/resources/adapter_conf.xml
index 2ea17bb..9581959 100644
--- a/s4-examples/twittertopiccount-scala/src/main/resources/adapter_conf.xml
+++ b/s4-examples/twittertopiccount-scala/src/main/resources/adapter_conf.xml
@@ -7,13 +7,13 @@
 
   <bean id="tweetQueue" class="java.util.concurrent.LinkedBlockingQueue" />
 
-  <bean id="twitterStreamListener" class="io.s4.example.twittertopiccount.listener.TwitterStreamListener"
+  <bean id="twitterStreamListener" class="org.apache.s4.example.twittertopiccount.listener.TwitterStreamListener"
         init-method="init">
     <property name="queue" ref="tweetQueue"/>
     <property name="streamName" value="RawStatus"/>
   </bean>
 
-  <bean id="twitterStreamClient" class="io.s4.example.twittertopiccount.util.TwitterStreamClient"
+  <bean id="twitterStreamClient" class="org.apache.s4.example.twittertopiccount.util.TwitterStreamClient"
         init-method="init">
     <property name="user" value="<USER ID GOES HERE>"/>
     <property name="pass" value="<PASSWORD GOES HERE>"/>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/resources/s4-example-twittertopiccount-scala-conf.xml.xml
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/resources/s4-example-twittertopiccount-scala-conf.xml.xml b/s4-examples/twittertopiccount-scala/src/main/resources/s4-example-twittertopiccount-scala-conf.xml.xml
index 19ec264..6808770 100644
--- a/s4-examples/twittertopiccount-scala/src/main/resources/s4-example-twittertopiccount-scala-conf.xml.xml
+++ b/s4-examples/twittertopiccount-scala/src/main/resources/s4-example-twittertopiccount-scala-conf.xml.xml
@@ -4,7 +4,7 @@
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
 			   http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
 
-  <bean id="printEventPE" class="io.s4.processor.PrintEventPE">
+  <bean id="printEventPE" class="org.apache.s4.processor.PrintEventPE">
     <property name="id" value="printEventPE"/>
     <property name="keys">
       <list>
@@ -13,7 +13,7 @@
     </property>
   </bean>
 
-  <bean id="topicExtractorPE" class="io.s4.example.twittertopiccount.processor.TopicExtractorPE">
+  <bean id="topicExtractorPE" class="org.apache.s4.example.twittertopiccount.processor.TopicExtractorPE">
     <property name="id" value="topicSeenPE"/>
     <property name="keys">
       <list>
@@ -24,7 +24,7 @@
     <property name="outputStreamName" value="TopicSeen"/>
   </bean>
 
-  <bean id="topicCountAndReportPE" class="io.s4.example.twittertopiccount.processor.TopicCountAndReportPE">
+  <bean id="topicCountAndReportPE" class="org.apache.s4.example.twittertopiccount.processor.TopicCountAndReportPE">
     <property name="id" value="topicCountAndReportPE"/>
     <property name="keys">
       <list>
@@ -38,11 +38,11 @@
     <property name="ttl" value="36000"/>
   </bean>
 
-  <bean id="dtfPersister" class="io.s4.example.twittertopiccount.DirectToFilePersister">
+  <bean id="dtfPersister" class="org.apache.s4.example.twittertopiccount.DirectToFilePersister">
     <property name="outputFilename" value="/tmp/top_n_hashtags"/>
   </bean>
 
-  <bean id="top10TopicPE" class="io.s4.example.twittertopiccount.processor.TopNTopicPE">
+  <bean id="top10TopicPE" class="org.apache.s4.example.twittertopiccount.processor.TopNTopicPE">
     <property name="id" value="top10TopicPE"/>
     <property name="keys">
       <list>
@@ -57,7 +57,7 @@
     <property name="ttl" value="36000"/>
   </bean>
 
-  <bean id="topicSeenPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+  <bean id="topicSeenPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
     <property name="streamNames">
       <list>
         <value>TopicSeen</value>
@@ -72,7 +72,7 @@
     <property name="debug" value="false"/>
   </bean>
 
-  <bean id="aggregatedTopicSeenPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+  <bean id="aggregatedTopicSeenPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
     <property name="streamNames">
       <list>
         <value>AggregatedTopicSeen</value>
@@ -87,7 +87,7 @@
     <property name="debug" value="false"/>
   </bean>
 
-  <bean id="twitDispatcher" class="io.s4.dispatcher.Dispatcher" init-method="init">
+  <bean id="twitDispatcher" class="org.apache.s4.dispatcher.Dispatcher" init-method="init">
     <property name="partitioners">
       <list>
         <ref bean="topicSeenPartitioner"/>
@@ -99,14 +99,14 @@
   </bean>
 
 <!-- dispatcher to S4 and client adapter -->
-  <bean id="forkDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+  <bean id="forkDispatcher" class="org.apache.s4.dispatcher.MultiDispatcher">
     <property name="dispatchers">
       <list>
         <!-- send everything through the S4 dispatcher -->
         <ref bean="twitDispatcher"/>
 
         <!-- send some streams to client adapters -->
-        <bean id="selectiveDispatchToAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+        <bean id="selectiveDispatchToAdapter" class="org.apache.s4.dispatcher.StreamSelectingDispatcher">
           <property name="dispatcher" ref="dispatcherToClientAdapters"/>
           <property name="streams">
             <list>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/scala/events/Events.scala
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/scala/events/Events.scala b/s4-examples/twittertopiccount-scala/src/main/scala/events/Events.scala
index 0e432e8..58c043d 100644
--- a/s4-examples/twittertopiccount-scala/src/main/scala/events/Events.scala
+++ b/s4-examples/twittertopiccount-scala/src/main/scala/events/Events.scala
@@ -14,7 +14,7 @@
  * License. See accompanying LICENSE file. 
  */
 
-package io.s4.example.twittertopiccount.event
+package org.apache.s4.example.twittertopiccount.event
 
 import scala.reflect.BeanProperty
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/scala/listener/TwitterStreamListener.scala
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/scala/listener/TwitterStreamListener.scala b/s4-examples/twittertopiccount-scala/src/main/scala/listener/TwitterStreamListener.scala
index e0683da..8ca69e9 100644
--- a/s4-examples/twittertopiccount-scala/src/main/scala/listener/TwitterStreamListener.scala
+++ b/s4-examples/twittertopiccount-scala/src/main/scala/listener/TwitterStreamListener.scala
@@ -14,7 +14,7 @@
  * License. See accompanying LICENSE file. 
  */
 
-package io.s4.example.twittertopiccount.listener
+package org.apache.s4.example.twittertopiccount.listener
 
 import scala.reflect.BeanProperty
 import scala.concurrent.ops._
@@ -25,14 +25,14 @@ import java.util.concurrent._
 import java.util.HashSet
 import java.util.concurrent.LinkedBlockingQueue
 
-import io.s4.collector.EventWrapper
-import io.s4.listener.EventHandler
-import io.s4.listener.EventProducer
+import org.apache.s4.collector.EventWrapper
+import org.apache.s4.listener.EventHandler
+import org.apache.s4.listener.EventProducer
 
 import net.liftweb.json._
 import net.liftweb.json.Extraction._
 
-import io.s4.example.twittertopiccount.event._
+import org.apache.s4.example.twittertopiccount.event._
 
 class TwitterStreamListener extends EventProducer {
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala
index 8c75d07..eceabcb 100644
--- a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala
+++ b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala
@@ -13,7 +13,7 @@
  * language governing permissions and limitations under the
  * License. See accompanying LICENSE file. 
  */
-package io.s4.example.twittertopiccount.processor
+package org.apache.s4.example.twittertopiccount.processor
 
 import scala.math._
 import scala.reflect.BeanProperty
@@ -26,10 +26,10 @@ import net.liftweb.json._
 import net.liftweb.json.JsonDSL._
 import net.liftweb.json.JsonAST._
 
-import io.s4.persist.Persister
-import io.s4.processor.AbstractPE
+import org.apache.s4.persist.Persister
+import org.apache.s4.processor.AbstractPE
 
-import io.s4.example.twittertopiccount.event._
+import org.apache.s4.example.twittertopiccount.event._
 
 class TopNTopicPE extends AbstractPE {
   @BeanProperty var persistKey = "myapp:topNTopics"

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala
index 8f70511..db454c7 100644
--- a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala
+++ b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala
@@ -13,15 +13,15 @@
  * language governing permissions and limitations under the
  * License. See accompanying LICENSE file. 
  */
-package io.s4.example.twittertopiccount.processor
+package org.apache.s4.example.twittertopiccount.processor
 
 import org.apache.log4j.Logger
 import scala.reflect.BeanProperty
 
-import io.s4.dispatcher.EventDispatcher
-import io.s4.processor.AbstractPE
+import org.apache.s4.dispatcher.EventDispatcher
+import org.apache.s4.processor.AbstractPE
 
-import io.s4.example.twittertopiccount.event._
+import org.apache.s4.example.twittertopiccount.event._
 
 class TopicCountAndReportPE extends AbstractPE {
   @BeanProperty var dispatcher: EventDispatcher = _ 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala
index 8da09a8..5e5d862 100644
--- a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala
+++ b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala
@@ -13,15 +13,15 @@
  * language governing permissions and limitations under the
  * License. See accompanying LICENSE file. 
  */
-package io.s4.example.twittertopiccount.processor
+package org.apache.s4.example.twittertopiccount.processor
 
 import org.apache.log4j.Logger
 import scala.reflect.BeanProperty
 
-import io.s4.dispatcher.EventDispatcher
-import io.s4.processor.AbstractPE
+import org.apache.s4.dispatcher.EventDispatcher
+import org.apache.s4.processor.AbstractPE
 
-import io.s4.example.twittertopiccount.event._
+import org.apache.s4.example.twittertopiccount.event._
 
 class TopicExtractorPE extends AbstractPE {
   @BeanProperty var dispatcher: EventDispatcher = _ 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/169653cd/s4-examples/twittertopiccount-scala/src/main/scala/util/TwitterStreamClient.scala
----------------------------------------------------------------------
diff --git a/s4-examples/twittertopiccount-scala/src/main/scala/util/TwitterStreamClient.scala b/s4-examples/twittertopiccount-scala/src/main/scala/util/TwitterStreamClient.scala
index 798991f..88186f2 100644
--- a/s4-examples/twittertopiccount-scala/src/main/scala/util/TwitterStreamClient.scala
+++ b/s4-examples/twittertopiccount-scala/src/main/scala/util/TwitterStreamClient.scala
@@ -14,7 +14,7 @@
  * License. See accompanying LICENSE file. 
  */
 
-package io.s4.example.twittertopiccount.util
+package org.apache.s4.example.twittertopiccount.util
 
 import java.io.InputStream
 import java.io.InputStreamReader


Mime
View raw message