incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [8/50] [abbrv] git commit: Remove troublesome carriage return
Date Tue, 03 Jan 2012 11:19:14 GMT
Remove troublesome carriage return


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/f4a2c594
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/f4a2c594
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/f4a2c594

Branch: refs/heads/dev
Commit: f4a2c594f1ab8140946a7f6e0087c7797b7dcc3c
Parents: 8eb4ad6
Author: Bruce Robbins <robbins@everychoose-lm.corp.yahoo.com>
Authored: Mon Nov 28 10:37:46 2011 -0800
Committer: Bruce Robbins <robbins@everychoose-lm.corp.yahoo.com>
Committed: Mon Nov 28 10:37:46 2011 -0800

----------------------------------------------------------------------
 .../org/apache/s4/comm/core/CommEventCallback.java |   48 +-
 .../apache/s4/comm/core/CommServiceFactory.java    |  148 ++--
 .../java/org/apache/s4/comm/core/Deserializer.java |   42 +-
 .../org/apache/s4/comm/core/GenericListener.java   |  192 +++---
 .../org/apache/s4/comm/core/GenericSender.java     |  378 +++++-----
 .../org/apache/s4/comm/core/GenericSerDeser.java   |   60 +-
 .../org/apache/s4/comm/core/ListenerProcess.java   |  182 +++---
 .../org/apache/s4/comm/core/MulticastSender.java   |  158 ++--
 .../org/apache/s4/comm/core/ProcessMonitor.java    |   66 +-
 .../java/org/apache/s4/comm/core/SendMode.java     |   40 +-
 .../org/apache/s4/comm/core/SenderProcess.java     |  270 ++++----
 .../java/org/apache/s4/comm/core/Serializer.java   |   42 +-
 .../java/org/apache/s4/comm/core/TaskManager.java  |   48 +-
 .../apache/s4/comm/file/StaticProcessMonitor.java  |  156 ++--
 .../apache/s4/comm/test/ProcessMonitorTest.java    |  158 ++--
 .../org/apache/s4/comm/test/TaskManagerTest.java   |  164 +++---
 .../org/apache/s4/comm/test/TestTaskSetupApp.java  |  244 ++++----
 .../org/apache/s4/comm/tools/TaskSetupApp.java     |  172 +++---
 .../java/org/apache/s4/comm/util/CommUtil.java     |   76 +-
 .../java/org/apache/s4/comm/util/SystemUtils.java  |   78 +-
 .../java/org/apache/s4/comm/zk/ThreadTest.java     |  128 ++--
 .../org/apache/s4/comm/zk/ZkProcessMonitor.java    |  294 ++++----
 .../java/org/apache/s4/comm/zk/ZkTaskSetup.java    |  564 +++++++-------
 .../main/java/org/apache/s4/comm/zk/ZkUtil.java    |  288 ++++----
 s4-comm/src/main/resources/log4j.xml               |   66 +-
 .../resources/sample_static_task_manager_test.xml  |   30 +-
 s4-comm/src/main/resources/sample_task_setup.xml   |   46 +-
 s4-comm/src/main/resources/taskManagerTest.xml     |   30 +-
 28 files changed, 2084 insertions(+), 2084 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/CommEventCallback.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/CommEventCallback.java b/s4-comm/src/main/java/org/apache/s4/comm/core/CommEventCallback.java
index 29df9cb..340b547 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/CommEventCallback.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/CommEventCallback.java
@@ -1,24 +1,24 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import java.util.Map;
-
-public interface CommEventCallback {
-
-    public void handleCallback(Map<String, Object> eventData);
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.util.Map;
+
+public interface CommEventCallback {
+
+    public void handleCallback(Map<String, Object> eventData);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/CommServiceFactory.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/CommServiceFactory.java b/s4-comm/src/main/java/org/apache/s4/comm/core/CommServiceFactory.java
index c82658f..da5a94b 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/CommServiceFactory.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/CommServiceFactory.java
@@ -1,74 +1,74 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import org.apache.s4.comm.file.StaticProcessMonitor;
-import org.apache.s4.comm.file.StaticTaskManager;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import org.apache.s4.comm.zk.ZkProcessMonitor;
-import org.apache.s4.comm.zk.ZkTaskManager;
-
-import org.apache.log4j.Logger;
-
-/**
- * Common Factory class to provide appropriate implementations
- * 
- * @author kishoreg
- * 
- */
-public class CommServiceFactory {
-    private static Logger logger = Logger.getLogger(CommServiceFactory.class);
-
-    public static TaskManager getTaskManager(String zkaddress,
-            String clusterName, ClusterType clusterType,
-            CommEventCallback callbackHandler) {
-        String mode = System.getProperty("commlayer.mode");
-        TaskManager taskManager = null;
-        if (mode != null && mode.equalsIgnoreCase("static")) {
-            logger.info("Comm layer mode is set to static");
-            taskManager = new StaticTaskManager(zkaddress,
-                                                clusterName,
-                                                clusterType,
-                                                callbackHandler);
-        } else {
-            taskManager = new ZkTaskManager(zkaddress,
-                                            clusterName,
-                                            clusterType,
-                                            callbackHandler);
-        }
-
-        return taskManager;
-    }
-
-    public static ProcessMonitor getProcessMonitor(String zkaddress,
-            String clusterName, CommEventCallback callbackHandler) {
-        ProcessMonitor processMonitor = null;
-        String mode = System.getProperty("commlayer.mode");
-        if (mode != null && mode.equalsIgnoreCase("static")) {
-            logger.info("Comm layer mode is set to static");
-            processMonitor = new StaticProcessMonitor(zkaddress,
-                                                      clusterName,
-                                                      ClusterType.S4);
-        } else {
-            processMonitor = new ZkProcessMonitor(zkaddress,
-                                                  clusterName,
-                                                  ClusterType.S4,
-                                                  callbackHandler);
-        }
-        return processMonitor;
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.file.StaticProcessMonitor;
+import org.apache.s4.comm.file.StaticTaskManager;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkProcessMonitor;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Common Factory class to provide appropriate implementations
+ * 
+ * @author kishoreg
+ * 
+ */
+public class CommServiceFactory {
+    private static Logger logger = Logger.getLogger(CommServiceFactory.class);
+
+    public static TaskManager getTaskManager(String zkaddress,
+            String clusterName, ClusterType clusterType,
+            CommEventCallback callbackHandler) {
+        String mode = System.getProperty("commlayer.mode");
+        TaskManager taskManager = null;
+        if (mode != null && mode.equalsIgnoreCase("static")) {
+            logger.info("Comm layer mode is set to static");
+            taskManager = new StaticTaskManager(zkaddress,
+                                                clusterName,
+                                                clusterType,
+                                                callbackHandler);
+        } else {
+            taskManager = new ZkTaskManager(zkaddress,
+                                            clusterName,
+                                            clusterType,
+                                            callbackHandler);
+        }
+
+        return taskManager;
+    }
+
+    public static ProcessMonitor getProcessMonitor(String zkaddress,
+            String clusterName, CommEventCallback callbackHandler) {
+        ProcessMonitor processMonitor = null;
+        String mode = System.getProperty("commlayer.mode");
+        if (mode != null && mode.equalsIgnoreCase("static")) {
+            logger.info("Comm layer mode is set to static");
+            processMonitor = new StaticProcessMonitor(zkaddress,
+                                                      clusterName,
+                                                      ClusterType.S4);
+        } else {
+            processMonitor = new ZkProcessMonitor(zkaddress,
+                                                  clusterName,
+                                                  ClusterType.S4,
+                                                  callbackHandler);
+        }
+        return processMonitor;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/Deserializer.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/Deserializer.java b/s4-comm/src/main/java/org/apache/s4/comm/core/Deserializer.java
index d8af0f8..0e6bde5 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/Deserializer.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/Deserializer.java
@@ -1,21 +1,21 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-public interface Deserializer {
-
-    public Object deserialize(byte[] buffer);
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+public interface Deserializer {
+
+    public Object deserialize(byte[] buffer);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/GenericListener.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/GenericListener.java b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericListener.java
index 2bfb613..ec562a0 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/GenericListener.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericListener.java
@@ -1,96 +1,96 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class GenericListener {
-    private static Logger logger = Logger.getLogger(GenericListener.class);
-    private String zkAddress;
-    private DatagramSocket socket;
-    int BUFFER_LENGTH = 65507;
-    private DatagramPacket dgram;
-    private byte[] bs;
-    final Deserializer deserializer;
-
-    public GenericListener(String zkaddress, String appName,
-            Object listenerConfig) {
-        this(zkaddress, appName, listenerConfig, new GenericSerDeser());
-    }
-
-    public GenericListener(String zkaddress, String appName,
-            Object listenerConfig, Deserializer deserializer) {
-        this.zkAddress = zkAddress;
-        this.deserializer = deserializer;
-        try {
-            Map<String, String> map = (Map<String, String>) listenerConfig;
-            String mode = map.get("mode");
-            int port = Integer.parseInt(map.get("port"));
-            if (mode.equals("multicast")) {
-                InetAddress inetAddress = InetAddress.getByName(map.get("channel"));
-                socket = new MulticastSocket(port);
-                ((MulticastSocket) socket).joinGroup(inetAddress);
-            }
-            if (mode.equals("unicast")) {
-                socket = new DatagramSocket(port);
-            }
-            String udpBufferSize = System.getProperty("udp.buffer.size");
-            if (udpBufferSize == null) {
-                udpBufferSize = "4194302";
-            }
-            socket.setReceiveBufferSize(Integer.parseInt(udpBufferSize));
-            bs = new byte[BUFFER_LENGTH];
-            dgram = new DatagramPacket(bs, bs.length);
-        } catch (IOException e) {
-            logger.error("error creating listener", e);
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    public Object receive() {
-        try {
-            socket.receive(dgram);
-            byte[] data = new byte[dgram.getLength()];
-            System.arraycopy(dgram.getData(),
-                             dgram.getOffset(),
-                             data,
-                             0,
-                             data.length);
-            Object object = deserializer.deserialize(data);
-            dgram.setLength(BUFFER_LENGTH);
-            return object;
-        } catch (IOException e) {
-            logger.error("error receiving message", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    /*
-     * There is nothing much to do for multicast and unicast
-     */
-    public void start() {
-
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class GenericListener {
+    private static Logger logger = Logger.getLogger(GenericListener.class);
+    private String zkAddress;
+    private DatagramSocket socket;
+    int BUFFER_LENGTH = 65507;
+    private DatagramPacket dgram;
+    private byte[] bs;
+    final Deserializer deserializer;
+
+    public GenericListener(String zkaddress, String appName,
+            Object listenerConfig) {
+        this(zkaddress, appName, listenerConfig, new GenericSerDeser());
+    }
+
+    public GenericListener(String zkaddress, String appName,
+            Object listenerConfig, Deserializer deserializer) {
+        this.zkAddress = zkAddress;
+        this.deserializer = deserializer;
+        try {
+            Map<String, String> map = (Map<String, String>) listenerConfig;
+            String mode = map.get("mode");
+            int port = Integer.parseInt(map.get("port"));
+            if (mode.equals("multicast")) {
+                InetAddress inetAddress = InetAddress.getByName(map.get("channel"));
+                socket = new MulticastSocket(port);
+                ((MulticastSocket) socket).joinGroup(inetAddress);
+            }
+            if (mode.equals("unicast")) {
+                socket = new DatagramSocket(port);
+            }
+            String udpBufferSize = System.getProperty("udp.buffer.size");
+            if (udpBufferSize == null) {
+                udpBufferSize = "4194302";
+            }
+            socket.setReceiveBufferSize(Integer.parseInt(udpBufferSize));
+            bs = new byte[BUFFER_LENGTH];
+            dgram = new DatagramPacket(bs, bs.length);
+        } catch (IOException e) {
+            logger.error("error creating listener", e);
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public Object receive() {
+        try {
+            socket.receive(dgram);
+            byte[] data = new byte[dgram.getLength()];
+            System.arraycopy(dgram.getData(),
+                             dgram.getOffset(),
+                             data,
+                             0,
+                             data.length);
+            Object object = deserializer.deserialize(data);
+            dgram.setLength(BUFFER_LENGTH);
+            return object;
+        } catch (IOException e) {
+            logger.error("error receiving message", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /*
+     * There is nothing much to do for multicast and unicast
+     */
+    public void start() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSender.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSender.java b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSender.java
index 0fff992..711f261 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSender.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSender.java
@@ -1,189 +1,189 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.List;
-import java.util.Map;
-import org.apache.log4j.Logger;
-
-public class GenericSender {
-    static Logger logger = Logger.getLogger(GenericSender.class);
-    Map<String, String> map;
-    private DatagramSocket socket;
-    private final String zkAddress;
-    ProcessMonitor listenerMonitor;
-    int rotationCounter = 0;
-    private final Serializer serializer;
-    private final int listenerTaskCount;
-    private CommEventCallback callbackHandler;
-    private String mode;
-
-    public GenericSender(String zkAddress, String appName,
-            Object senderConfigData) {
-        this(zkAddress, appName, appName, senderConfigData);
-    }
-
-    @SuppressWarnings("unchecked")
-    public GenericSender(String zkAddress, String adapterClusterName,
-            String s4ClusterName, Object senderConfigData, Serializer serializer) {
-        this.zkAddress = zkAddress;
-        this.serializer = serializer;
-        try {
-            map = (Map<String, String>) senderConfigData;
-            mode = map.get("mode");
-            if (mode.equals("multicast")) {
-                socket = new MulticastSocket();
-            }
-            if (mode.equals("unicast")) {
-                socket = new DatagramSocket();
-            }
-            listenerMonitor = CommServiceFactory.getProcessMonitor(this.zkAddress,
-                                                                   s4ClusterName,
-                                                                   callbackHandler);
-            if (callbackHandler != null) {
-                // listenerMonitor.setCallbackHandler(callbackHandler);
-            }
-            listenerMonitor.monitor();
-            this.listenerTaskCount = listenerMonitor.getTaskCount();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public GenericSender(String zkAddress, String senderAppName,
-            String listenerAppName, Object senderConfigData) {
-        this(zkAddress,
-             senderAppName,
-             listenerAppName,
-             senderConfigData,
-             new GenericSerDeser());
-    }
-
-    /**
-     * This method will send the data to receivers in a round robin fashion
-     * 
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    @SuppressWarnings("unchecked")
-    public boolean send(Object data) {
-        try {
-            List<Object> destinationList = listenerMonitor.getDestinationList();
-            if (destinationList == null || destinationList.size() == 0) {
-                logger.error("Failed to send message: No destination available"
-                        + data);
-                return false;
-            }
-            byte[] byteBuffer = serializer.serialize(data);
-            rotationCounter = rotationCounter + 1;
-
-            int index = rotationCounter % destinationList.size();
-            Map<String, String> dest = (Map<String, String>) destinationList.get(Math.abs(index));
-            InetAddress inetAddress;
-            int port;
-            if (mode.equals("unicast")) {
-                inetAddress = InetAddress.getByName(dest.get("address"));
-                port = Integer.parseInt((dest.get("port")));
-            } else if (mode.equals("multicast")) {
-                inetAddress = InetAddress.getByName(dest.get("channel"));
-                port = Integer.parseInt((dest.get("port")));
-            } else {
-                logger.error("Failed to send message unknown mode: " + mode);
-                return false;
-            }
-            DatagramPacket dp = new DatagramPacket(byteBuffer,
-                                                   byteBuffer.length,
-                                                   inetAddress,
-                                                   port);
-            socket.send(dp);
-        } catch (IOException e) {
-            // add retry
-            logger.error("Failed to send message: " + data, e);
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * This will send the data to a specific channel/receiver/partition
-     * 
-     * @param partition
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    @SuppressWarnings("unchecked")
-    public boolean sendToPartition(int partition, Object data) {
-        try {
-            byte[] byteBuffer = serializer.serialize(data);
-            Map<Integer, Object> destinationMap = listenerMonitor.getDestinationMap();
-            if (logger.isDebugEnabled()) {
-                logger.debug("Destination Map:" + destinationMap);
-            }
-            Map<String, String> dest = (Map<String, String>) destinationMap.get(partition);
-            if (dest != null) {
-                InetAddress inetAddress = InetAddress.getByName(dest.get("address"));
-                int port = Integer.parseInt((dest.get("port")));
-                DatagramPacket dp = new DatagramPacket(byteBuffer,
-                                                       byteBuffer.length,
-                                                       inetAddress,
-                                                       port);
-                socket.send(dp);
-            } else {
-                logger.warn("Destination not available for partition:"
-                        + partition + " Skipping message:" + data);
-                return false;
-            }
-        } catch (IOException e) {
-            // add retry
-            logger.error("Failed to send message: " + data, e);
-            return false;
-        }
-
-        return true;
-
-    }
-
-    /**
-     * compute partition using hashcode and send to appropriate partition
-     * 
-     * @param hashcode
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean sendUsingHashCode(int hashcode, Object data) {
-        int partition = (hashcode & Integer.MAX_VALUE) % listenerTaskCount;
-        return sendToPartition(partition, data);
-
-    }
-
-    public CommEventCallback getCallbackHandler() {
-        return callbackHandler;
-    }
-
-    public void setCallbackHandler(CommEventCallback callbackHandler) {
-        this.callbackHandler = callbackHandler;
-    }
-
-    public int getListenerTaskCount() {
-        return listenerTaskCount;
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.List;
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+public class GenericSender {
+    static Logger logger = Logger.getLogger(GenericSender.class);
+    Map<String, String> map;
+    private DatagramSocket socket;
+    private final String zkAddress;
+    ProcessMonitor listenerMonitor;
+    int rotationCounter = 0;
+    private final Serializer serializer;
+    private final int listenerTaskCount;
+    private CommEventCallback callbackHandler;
+    private String mode;
+
+    public GenericSender(String zkAddress, String appName,
+            Object senderConfigData) {
+        this(zkAddress, appName, appName, senderConfigData);
+    }
+
+    @SuppressWarnings("unchecked")
+    public GenericSender(String zkAddress, String adapterClusterName,
+            String s4ClusterName, Object senderConfigData, Serializer serializer) {
+        this.zkAddress = zkAddress;
+        this.serializer = serializer;
+        try {
+            map = (Map<String, String>) senderConfigData;
+            mode = map.get("mode");
+            if (mode.equals("multicast")) {
+                socket = new MulticastSocket();
+            }
+            if (mode.equals("unicast")) {
+                socket = new DatagramSocket();
+            }
+            listenerMonitor = CommServiceFactory.getProcessMonitor(this.zkAddress,
+                                                                   s4ClusterName,
+                                                                   callbackHandler);
+            if (callbackHandler != null) {
+                // listenerMonitor.setCallbackHandler(callbackHandler);
+            }
+            listenerMonitor.monitor();
+            this.listenerTaskCount = listenerMonitor.getTaskCount();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public GenericSender(String zkAddress, String senderAppName,
+            String listenerAppName, Object senderConfigData) {
+        this(zkAddress,
+             senderAppName,
+             listenerAppName,
+             senderConfigData,
+             new GenericSerDeser());
+    }
+
+    /**
+     * This method will send the data to receivers in a round robin fashion
+     * 
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    @SuppressWarnings("unchecked")
+    public boolean send(Object data) {
+        try {
+            List<Object> destinationList = listenerMonitor.getDestinationList();
+            if (destinationList == null || destinationList.size() == 0) {
+                logger.error("Failed to send message: No destination available"
+                        + data);
+                return false;
+            }
+            byte[] byteBuffer = serializer.serialize(data);
+            rotationCounter = rotationCounter + 1;
+
+            int index = rotationCounter % destinationList.size();
+            Map<String, String> dest = (Map<String, String>) destinationList.get(Math.abs(index));
+            InetAddress inetAddress;
+            int port;
+            if (mode.equals("unicast")) {
+                inetAddress = InetAddress.getByName(dest.get("address"));
+                port = Integer.parseInt((dest.get("port")));
+            } else if (mode.equals("multicast")) {
+                inetAddress = InetAddress.getByName(dest.get("channel"));
+                port = Integer.parseInt((dest.get("port")));
+            } else {
+                logger.error("Failed to send message unknown mode: " + mode);
+                return false;
+            }
+            DatagramPacket dp = new DatagramPacket(byteBuffer,
+                                                   byteBuffer.length,
+                                                   inetAddress,
+                                                   port);
+            socket.send(dp);
+        } catch (IOException e) {
+            // add retry
+            logger.error("Failed to send message: " + data, e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * This will send the data to a specific channel/receiver/partition
+     * 
+     * @param partition
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    @SuppressWarnings("unchecked")
+    public boolean sendToPartition(int partition, Object data) {
+        try {
+            byte[] byteBuffer = serializer.serialize(data);
+            Map<Integer, Object> destinationMap = listenerMonitor.getDestinationMap();
+            if (logger.isDebugEnabled()) {
+                logger.debug("Destination Map:" + destinationMap);
+            }
+            Map<String, String> dest = (Map<String, String>) destinationMap.get(partition);
+            if (dest != null) {
+                InetAddress inetAddress = InetAddress.getByName(dest.get("address"));
+                int port = Integer.parseInt((dest.get("port")));
+                DatagramPacket dp = new DatagramPacket(byteBuffer,
+                                                       byteBuffer.length,
+                                                       inetAddress,
+                                                       port);
+                socket.send(dp);
+            } else {
+                logger.warn("Destination not available for partition:"
+                        + partition + " Skipping message:" + data);
+                return false;
+            }
+        } catch (IOException e) {
+            // add retry
+            logger.error("Failed to send message: " + data, e);
+            return false;
+        }
+
+        return true;
+
+    }
+
+    /**
+     * compute partition using hashcode and send to appropriate partition
+     * 
+     * @param hashcode
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean sendUsingHashCode(int hashcode, Object data) {
+        int partition = (hashcode & Integer.MAX_VALUE) % listenerTaskCount;
+        return sendToPartition(partition, data);
+
+    }
+
+    public CommEventCallback getCallbackHandler() {
+        return callbackHandler;
+    }
+
+    public void setCallbackHandler(CommEventCallback callbackHandler) {
+        this.callbackHandler = callbackHandler;
+    }
+
+    public int getListenerTaskCount() {
+        return listenerTaskCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSerDeser.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSerDeser.java b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSerDeser.java
index eb9af81..bb8512e 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSerDeser.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSerDeser.java
@@ -1,30 +1,30 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import org.apache.s4.comm.util.IOUtil;
-
-public class GenericSerDeser implements Serializer, Deserializer {
-
-    public byte[] serialize(Object obj) {
-        return IOUtil.serializeToBytes(obj);
-    }
-
-    public Object deserialize(byte[] buffer) {
-        return IOUtil.deserializeToObject(buffer);
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.util.IOUtil;
+
+public class GenericSerDeser implements Serializer, Deserializer {
+
+    public byte[] serialize(Object obj) {
+        return IOUtil.serializeToBytes(obj);
+    }
+
+    public Object deserialize(byte[] buffer) {
+        return IOUtil.deserializeToObject(buffer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/ListenerProcess.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/ListenerProcess.java b/s4-comm/src/main/java/org/apache/s4/comm/core/ListenerProcess.java
index 65f8c91..cab4f8b 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/ListenerProcess.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/ListenerProcess.java
@@ -1,91 +1,91 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class ListenerProcess {
-    static Logger logger = Logger.getLogger(ListenerProcess.class);
-    private final String zkaddress;
-    private final String clusterName;
-    private String listenerRoot;
-    private GenericListener genericListener;
-    private Deserializer deserializer;
-    private CommEventCallback callbackHandler;
-
-    public ListenerProcess(String zkaddress, String clusterName) {
-        this.zkaddress = zkaddress;
-        this.clusterName = clusterName;
-    }
-
-    /**
-     * This will be a blocking call and will wait until it gets a task
-     * 
-     * @return listener configuration
-     */
-    public Object acquireTaskAndCreateListener(Map<String, String> map) {
-        TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
-                                                                clusterName,
-                                                                ClusterType.S4,
-                                                                callbackHandler);
-        logger.info("Waiting for task");
-        Object listenerConfig = manager.acquireTask(map);
-        createListenerFromConfig(listenerConfig);
-        return listenerConfig;
-    }
-
-    public void createListenerFromConfig(Object listenerConfig) {
-        logger.info("Starting listener with config: " + listenerConfig);
-        if (deserializer != null) {
-            genericListener = new GenericListener(zkaddress,
-                                                  clusterName,
-                                                  listenerConfig,
-                                                  deserializer);
-        } else {
-            genericListener = new GenericListener(zkaddress,
-                                                  clusterName,
-                                                  listenerConfig);
-        }
-        genericListener.start();
-
-    }
-
-    public Deserializer getDeserializer() {
-        return deserializer;
-    }
-
-    public void setDeserializer(Deserializer deserializer) {
-        this.deserializer = deserializer;
-    }
-
-    public Object listen() {
-        return genericListener.receive();
-    }
-
-    public CommEventCallback getCallbackHandler() {
-        return callbackHandler;
-    }
-
-    public void setCallbackHandler(CommEventCallback callbackHandler) {
-        this.callbackHandler = callbackHandler;
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class ListenerProcess {
+    static Logger logger = Logger.getLogger(ListenerProcess.class);
+    private final String zkaddress;
+    private final String clusterName;
+    private String listenerRoot;
+    private GenericListener genericListener;
+    private Deserializer deserializer;
+    private CommEventCallback callbackHandler;
+
+    public ListenerProcess(String zkaddress, String clusterName) {
+        this.zkaddress = zkaddress;
+        this.clusterName = clusterName;
+    }
+
+    /**
+     * This will be a blocking call and will wait until it gets a task
+     * 
+     * @return listener configuration
+     */
+    public Object acquireTaskAndCreateListener(Map<String, String> map) {
+        TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
+                                                                clusterName,
+                                                                ClusterType.S4,
+                                                                callbackHandler);
+        logger.info("Waiting for task");
+        Object listenerConfig = manager.acquireTask(map);
+        createListenerFromConfig(listenerConfig);
+        return listenerConfig;
+    }
+
+    public void createListenerFromConfig(Object listenerConfig) {
+        logger.info("Starting listener with config: " + listenerConfig);
+        if (deserializer != null) {
+            genericListener = new GenericListener(zkaddress,
+                                                  clusterName,
+                                                  listenerConfig,
+                                                  deserializer);
+        } else {
+            genericListener = new GenericListener(zkaddress,
+                                                  clusterName,
+                                                  listenerConfig);
+        }
+        genericListener.start();
+
+    }
+
+    public Deserializer getDeserializer() {
+        return deserializer;
+    }
+
+    public void setDeserializer(Deserializer deserializer) {
+        this.deserializer = deserializer;
+    }
+
+    public Object listen() {
+        return genericListener.receive();
+    }
+
+    public CommEventCallback getCallbackHandler() {
+        return callbackHandler;
+    }
+
+    public void setCallbackHandler(CommEventCallback callbackHandler) {
+        this.callbackHandler = callbackHandler;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/MulticastSender.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/MulticastSender.java b/s4-comm/src/main/java/org/apache/s4/comm/core/MulticastSender.java
index 3b2a72c..4d36e63 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/MulticastSender.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/MulticastSender.java
@@ -1,79 +1,79 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import org.apache.s4.comm.util.IOUtil;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.Map;
-
-public class MulticastSender {
-
-    Map<String, String> map;
-    private MulticastSocket ms;
-    private InetAddress inetAddress;
-    private int port;
-
-    @SuppressWarnings("unchecked")
-    public MulticastSender(Object senderConfigData) {
-        try {
-            map = (Map<String, String>) senderConfigData;
-            ms = new MulticastSocket();
-            inetAddress = InetAddress.getByName(map.get("multicast.address"));
-            ms.joinGroup(inetAddress);
-            this.port = Integer.parseInt(map.get("multicast.port"));
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * This method will send the data to receivers in a round robin fashion
-     * 
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean send(Object data) {
-        try {
-            byte[] byteBuffer = IOUtil.serializeToBytes(data);
-            DatagramPacket dp = new DatagramPacket(byteBuffer,
-                                                   byteBuffer.length,
-                                                   inetAddress,
-                                                   port);
-            ms.send(dp);
-        } catch (IOException e) {
-            // add retry
-            e.printStackTrace();
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * This will send the data to a specific channel/receiver
-     * 
-     * @param partition
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean send(int partition, Object data) {
-        return true;
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.util.IOUtil;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.Map;
+
+public class MulticastSender {
+
+    Map<String, String> map;
+    private MulticastSocket ms;
+    private InetAddress inetAddress;
+    private int port;
+
+    @SuppressWarnings("unchecked")
+    public MulticastSender(Object senderConfigData) {
+        try {
+            map = (Map<String, String>) senderConfigData;
+            ms = new MulticastSocket();
+            inetAddress = InetAddress.getByName(map.get("multicast.address"));
+            ms.joinGroup(inetAddress);
+            this.port = Integer.parseInt(map.get("multicast.port"));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * This method will send the data to receivers in a round robin fashion
+     * 
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean send(Object data) {
+        try {
+            byte[] byteBuffer = IOUtil.serializeToBytes(data);
+            DatagramPacket dp = new DatagramPacket(byteBuffer,
+                                                   byteBuffer.length,
+                                                   inetAddress,
+                                                   port);
+            ms.send(dp);
+        } catch (IOException e) {
+            // add retry
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * This will send the data to a specific channel/receiver
+     * 
+     * @param partition
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean send(int partition, Object data) {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/ProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/ProcessMonitor.java b/s4-comm/src/main/java/org/apache/s4/comm/core/ProcessMonitor.java
index 8331e5b..53962ca 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/ProcessMonitor.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/ProcessMonitor.java
@@ -1,33 +1,33 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import java.util.List;
-import java.util.Map;
-
-public interface ProcessMonitor {
-
-    // void setCallbackHandler(CommEventCallback callbackHandler);
-
-    void monitor();
-
-    List<Object> getDestinationList();
-
-    Map<Integer, Object> getDestinationMap();
-
-    int getTaskCount();
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.util.List;
+import java.util.Map;
+
+public interface ProcessMonitor {
+
+    // void setCallbackHandler(CommEventCallback callbackHandler);
+
+    void monitor();
+
+    List<Object> getDestinationList();
+
+    Map<Integer, Object> getDestinationMap();
+
+    int getTaskCount();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/SendMode.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/SendMode.java b/s4-comm/src/main/java/org/apache/s4/comm/core/SendMode.java
index 0e186a1..6358bc7 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/SendMode.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/SendMode.java
@@ -1,20 +1,20 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-public enum SendMode {
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+public enum SendMode {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/SenderProcess.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/SenderProcess.java b/s4-comm/src/main/java/org/apache/s4/comm/core/SenderProcess.java
index 0334203..a345f77 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/SenderProcess.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/SenderProcess.java
@@ -1,135 +1,135 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.Map;
-
-public class SenderProcess {
-    protected final String zkaddress;
-    protected final String adapterClusterName;
-    protected final String s4ClusterName;
-    protected Serializer serializer;
-    protected CommEventCallback callbackHandler;
-
-    public SenderProcess(String zkaddress, String clusterName) {
-        this(zkaddress, clusterName, clusterName);
-    }
-
-    public SenderProcess(String zkaddress, String adapterClusterName,
-            String s4ClusterName) {
-        this.zkaddress = zkaddress;
-        this.adapterClusterName = adapterClusterName;
-        this.s4ClusterName = s4ClusterName;
-    }
-
-    public void setSerializer(Serializer serializer) {
-        this.serializer = serializer;
-    }
-
-    public CommEventCallback getCallbackHandler() {
-        return callbackHandler;
-    }
-
-    public void setCallbackHandler(CommEventCallback callbackHandler) {
-        this.callbackHandler = callbackHandler;
-    }
-
-    protected GenericSender genericSender;
-
-    /**
-     * This will be a blocking call and will wait until it gets a task
-     * 
-     * @return senderConfig object, currently its map
-     */
-
-    public Object acquireTaskAndCreateSender(Map<String, String> map) {
-        TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
-                                                                adapterClusterName,
-                                                                ClusterType.ADAPTER,
-                                                                callbackHandler);
-        if (callbackHandler != null) {
-            // manager.setCallbackHandler(callbackHandler);
-        }
-        Object senderConfig = manager.acquireTask(map);
-        createSenderFromConfig(senderConfig);
-        return senderConfig;
-    }
-
-    public void createSenderFromConfig(Object senderConfig) {
-        if (serializer != null) {
-            this.genericSender = new GenericSender(zkaddress,
-                                                   adapterClusterName,
-                                                   s4ClusterName,
-                                                   senderConfig,
-                                                   serializer);
-        } else {
-            this.genericSender = new GenericSender(zkaddress,
-                                                   adapterClusterName,
-                                                   s4ClusterName,
-                                                   senderConfig);
-        }
-        if (callbackHandler != null) {
-            this.genericSender.setCallbackHandler(callbackHandler);
-        }
-
-    }
-
-    /**
-     * This method will send the data to receivers in a round robin fashion
-     * 
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean send(Object data) {
-        return genericSender.send(data);
-    }
-
-    /**
-     * This will send the data to a specific channel/receiver/partition
-     * 
-     * @param partition
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean sendToPartition(int partition, Object data) {
-        return genericSender.sendToPartition(partition, data);
-    }
-
-    /**
-     * compute partition using hashcode and send to appropriate partition
-     * 
-     * @param hashcode
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-
-    public boolean sendUsingHashCode(int hashcode, Object data) {
-        return genericSender.sendUsingHashCode(hashcode, data);
-    }
-
-    /**
-     * Returns the number of partitions on the receiver app side TODO: Currently
-     * it returns the number of tasks on the listener side. It works for now
-     * since numofPartitions=taskCount
-     */
-
-    public int getNumOfPartitions() {
-        return genericSender.getListenerTaskCount();
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.Map;
+
+public class SenderProcess {
+    protected final String zkaddress;
+    protected final String adapterClusterName;
+    protected final String s4ClusterName;
+    protected Serializer serializer;
+    protected CommEventCallback callbackHandler;
+
+    public SenderProcess(String zkaddress, String clusterName) {
+        this(zkaddress, clusterName, clusterName);
+    }
+
+    public SenderProcess(String zkaddress, String adapterClusterName,
+            String s4ClusterName) {
+        this.zkaddress = zkaddress;
+        this.adapterClusterName = adapterClusterName;
+        this.s4ClusterName = s4ClusterName;
+    }
+
+    public void setSerializer(Serializer serializer) {
+        this.serializer = serializer;
+    }
+
+    public CommEventCallback getCallbackHandler() {
+        return callbackHandler;
+    }
+
+    public void setCallbackHandler(CommEventCallback callbackHandler) {
+        this.callbackHandler = callbackHandler;
+    }
+
+    protected GenericSender genericSender;
+
+    /**
+     * This will be a blocking call and will wait until it gets a task
+     * 
+     * @return senderConfig object, currently its map
+     */
+
+    public Object acquireTaskAndCreateSender(Map<String, String> map) {
+        TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
+                                                                adapterClusterName,
+                                                                ClusterType.ADAPTER,
+                                                                callbackHandler);
+        if (callbackHandler != null) {
+            // manager.setCallbackHandler(callbackHandler);
+        }
+        Object senderConfig = manager.acquireTask(map);
+        createSenderFromConfig(senderConfig);
+        return senderConfig;
+    }
+
+    public void createSenderFromConfig(Object senderConfig) {
+        if (serializer != null) {
+            this.genericSender = new GenericSender(zkaddress,
+                                                   adapterClusterName,
+                                                   s4ClusterName,
+                                                   senderConfig,
+                                                   serializer);
+        } else {
+            this.genericSender = new GenericSender(zkaddress,
+                                                   adapterClusterName,
+                                                   s4ClusterName,
+                                                   senderConfig);
+        }
+        if (callbackHandler != null) {
+            this.genericSender.setCallbackHandler(callbackHandler);
+        }
+
+    }
+
+    /**
+     * This method will send the data to receivers in a round robin fashion
+     * 
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean send(Object data) {
+        return genericSender.send(data);
+    }
+
+    /**
+     * This will send the data to a specific channel/receiver/partition
+     * 
+     * @param partition
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean sendToPartition(int partition, Object data) {
+        return genericSender.sendToPartition(partition, data);
+    }
+
+    /**
+     * compute partition using hashcode and send to appropriate partition
+     * 
+     * @param hashcode
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+
+    public boolean sendUsingHashCode(int hashcode, Object data) {
+        return genericSender.sendUsingHashCode(hashcode, data);
+    }
+
+    /**
+     * Returns the number of partitions on the receiver app side TODO: Currently
+     * it returns the number of tasks on the listener side. It works for now
+     * since numofPartitions=taskCount
+     */
+
+    public int getNumOfPartitions() {
+        return genericSender.getListenerTaskCount();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/Serializer.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/Serializer.java b/s4-comm/src/main/java/org/apache/s4/comm/core/Serializer.java
index 1e337e2..d3eb134 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/Serializer.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/Serializer.java
@@ -1,21 +1,21 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-public interface Serializer {
-
-    public byte[] serialize(Object obj);
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+public interface Serializer {
+
+    public byte[] serialize(Object obj);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/core/TaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/TaskManager.java b/s4-comm/src/main/java/org/apache/s4/comm/core/TaskManager.java
index 705e150..d3dcad0 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/core/TaskManager.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/TaskManager.java
@@ -1,24 +1,24 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.core;
-
-import java.util.Map;
-
-public interface TaskManager {
-
-    Object acquireTask(Map<String, String> customTaskData);
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.util.Map;
+
+public interface TaskManager {
+
+    Object acquireTask(Map<String, String> customTaskData);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/file/StaticProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/file/StaticProcessMonitor.java b/s4-comm/src/main/java/org/apache/s4/comm/file/StaticProcessMonitor.java
index 5852f63..6ea5577 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/file/StaticProcessMonitor.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/file/StaticProcessMonitor.java
@@ -1,78 +1,78 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.file;
-
-import org.apache.s4.comm.core.ProcessMonitor;
-import org.apache.s4.comm.util.ConfigUtils;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class StaticProcessMonitor implements ProcessMonitor {
-    static Logger logger = Logger.getLogger(StaticProcessMonitor.class);
-    private List<Object> destinationList = new ArrayList<Object>();
-    private Map<Integer, Object> destinationMap = new HashMap<Integer, Object>();
-    private int taskCount;
-    private final String clusterName;
-    private final ClusterType clusterType;
-
-    public StaticProcessMonitor(String address, String clusterName,
-            ClusterType clusterType) {
-        this.clusterName = clusterName;
-        this.clusterType = clusterType;
-    }
-
-    public void monitor() {
-        readConfig();
-    }
-
-    private void readConfig() {
-        List<Map<String, String>> processList = ConfigUtils.readConfig("clusters.xml",
-                                                                             clusterName,
-                                                                             clusterType,
-                                                                             true);
-        for (Map<String, String> processMap : processList) {
-            destinationList.add(processMap);
-            String key = (String) processMap.get("partition");
-            if (key != null) {
-                destinationMap.put(Integer.parseInt(key), processMap);
-            }
-        }
-        taskCount = destinationList.size();
-        logger.info("Destination List: " + destinationList);
-        logger.info("Destination Map: " + destinationMap);
-        logger.info("TaskCount: " + taskCount);
-    }
-
-    public List<Object> getDestinationList() {
-        return destinationList;
-    }
-
-    public Map<Integer, Object> getDestinationMap() {
-        return destinationMap;
-    }
-
-    @Override
-    public int getTaskCount() {
-        return taskCount;
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.file;
+
+import org.apache.s4.comm.core.ProcessMonitor;
+import org.apache.s4.comm.util.ConfigUtils;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class StaticProcessMonitor implements ProcessMonitor {
+    static Logger logger = Logger.getLogger(StaticProcessMonitor.class);
+    private List<Object> destinationList = new ArrayList<Object>();
+    private Map<Integer, Object> destinationMap = new HashMap<Integer, Object>();
+    private int taskCount;
+    private final String clusterName;
+    private final ClusterType clusterType;
+
+    public StaticProcessMonitor(String address, String clusterName,
+            ClusterType clusterType) {
+        this.clusterName = clusterName;
+        this.clusterType = clusterType;
+    }
+
+    public void monitor() {
+        readConfig();
+    }
+
+    private void readConfig() {
+        List<Map<String, String>> processList = ConfigUtils.readConfig("clusters.xml",
+                                                                             clusterName,
+                                                                             clusterType,
+                                                                             true);
+        for (Map<String, String> processMap : processList) {
+            destinationList.add(processMap);
+            String key = (String) processMap.get("partition");
+            if (key != null) {
+                destinationMap.put(Integer.parseInt(key), processMap);
+            }
+        }
+        taskCount = destinationList.size();
+        logger.info("Destination List: " + destinationList);
+        logger.info("Destination Map: " + destinationMap);
+        logger.info("TaskCount: " + taskCount);
+    }
+
+    public List<Object> getDestinationList() {
+        return destinationList;
+    }
+
+    public Map<Integer, Object> getDestinationMap() {
+        return destinationMap;
+    }
+
+    @Override
+    public int getTaskCount() {
+        return taskCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/test/ProcessMonitorTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/ProcessMonitorTest.java b/s4-comm/src/main/java/org/apache/s4/comm/test/ProcessMonitorTest.java
index 7780a3e..c30936f 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/test/ProcessMonitorTest.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/test/ProcessMonitorTest.java
@@ -1,79 +1,79 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.test;
-
-import org.apache.s4.comm.file.StaticProcessMonitor;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import org.apache.s4.comm.zk.ZkTaskSetup;
-import org.apache.s4.comm.zk.ZkTaskManager;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ProcessMonitorTest {
-    public static void main(String[] args) throws Exception {
-        // testZkProcessMonitor(args);
-        testStaticProcessMonitor(args);
-        Thread.sleep(10000);
-    }
-
-    private static void testStaticProcessMonitor(String[] args) {
-        String address = null;
-        address = "localhost:2181";
-        StaticProcessMonitor monitor = new StaticProcessMonitor(address,
-                                                                "taskmanagerTest",
-                                                                ClusterType.S4);
-        monitor.monitor();
-        System.out.println(monitor.getDestinationList());
-        System.out.println(monitor.getDestinationMap());
-    }
-
-    private static void testZkProcessMonitor(String[] args) {
-        System.out.println("Hereh");
-        // "effortfell.greatamerica.corp.yahoo.com:2181"
-        String address = args[0];
-        address = "localhost:2181";
-        String processName = args[1];
-        ZkTaskSetup zkTaskSetup = new ZkTaskSetup(address,
-                                                        "/taskmanagerTest",
-                                                        ClusterType.S4);
-        zkTaskSetup.cleanUp();
-        zkTaskSetup.setUpTasks("1.0.0.", new String[] { "task0", "task1" });
-        Object obj;
-        System.out.println(processName + " Going to Wait for a task");
-        HashMap<String, String> map = new HashMap<String, String>();
-        ZkTaskManager taskManager = new ZkTaskManager(address,
-                                                      "/taskmanagerTest",
-                                                      ClusterType.S4);
-        obj = taskManager.acquireTask(map);
-        System.out.println(processName + "taking up task: " + obj);
-        File f = new File("c:/" + obj + ".file");
-        f.delete();
-        while (true) {
-            if (f.exists()) {
-                break;
-            }
-            System.out.println(processName + " processing task: " + obj);
-            try {
-                Thread.sleep(10000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-        System.out.println("Exiting task:" + obj);
-    }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.test;
+
+import org.apache.s4.comm.file.StaticProcessMonitor;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProcessMonitorTest {
+    public static void main(String[] args) throws Exception {
+        // testZkProcessMonitor(args);
+        testStaticProcessMonitor(args);
+        Thread.sleep(10000);
+    }
+
+    private static void testStaticProcessMonitor(String[] args) {
+        String address = null;
+        address = "localhost:2181";
+        StaticProcessMonitor monitor = new StaticProcessMonitor(address,
+                                                                "taskmanagerTest",
+                                                                ClusterType.S4);
+        monitor.monitor();
+        System.out.println(monitor.getDestinationList());
+        System.out.println(monitor.getDestinationMap());
+    }
+
+    private static void testZkProcessMonitor(String[] args) {
+        System.out.println("Hereh");
+        // "effortfell.greatamerica.corp.yahoo.com:2181"
+        String address = args[0];
+        address = "localhost:2181";
+        String processName = args[1];
+        ZkTaskSetup zkTaskSetup = new ZkTaskSetup(address,
+                                                        "/taskmanagerTest",
+                                                        ClusterType.S4);
+        zkTaskSetup.cleanUp();
+        zkTaskSetup.setUpTasks("1.0.0.", new String[] { "task0", "task1" });
+        Object obj;
+        System.out.println(processName + " Going to Wait for a task");
+        HashMap<String, String> map = new HashMap<String, String>();
+        ZkTaskManager taskManager = new ZkTaskManager(address,
+                                                      "/taskmanagerTest",
+                                                      ClusterType.S4);
+        obj = taskManager.acquireTask(map);
+        System.out.println(processName + "taking up task: " + obj);
+        File f = new File("c:/" + obj + ".file");
+        f.delete();
+        while (true) {
+            if (f.exists()) {
+                break;
+            }
+            System.out.println(processName + " processing task: " + obj);
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        System.out.println("Exiting task:" + obj);
+    }
+}


Mime
View raw message