incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [22/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/util/ConfigParser.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/util/ConfigParser.java b/s4-comm/src/main/java/io/s4/comm/util/ConfigParser.java
deleted file mode 100644
index d742fa9..0000000
--- a/s4-comm/src/main/java/io/s4/comm/util/ConfigParser.java
+++ /dev/null
@@ -1,439 +0,0 @@
-package io.s4.comm.util;
-
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilderFactory;
-
-import org.apache.log4j.Logger;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-import org.xml.sax.SAXParseException;
-
-public class ConfigParser {
-	private static Logger logger =  Logger.getLogger(ConfigParser.class);
-	
-	public ConfigParser() {
-	}
-	
-	public Config parse(String configFilename) {
-		Config config = null;
-
-		Document document = createDocument(configFilename);
-		NodeList topLevelNodeList = document.getChildNodes();
-		for (int i = 0; i < topLevelNodeList.getLength(); i++) {
-			Node node = topLevelNodeList.item(i);
-			if (node.getNodeType() == Node.ELEMENT_NODE && node.getNodeName().equals("config")) {
-				config = processConfigElement(node);
-			}
-		}
-		verifyConfig(config);
-		return config;
-	}
-	
-	private void verifyConfig(Config config) {
-		if (config.getClusters().size() == 0) {
-			throw new VerifyError("No clusters specified");
-		}
-		
-		for (Cluster cluster : config.getClusters()) {
-			verifyCluster(cluster);
-			
-		}
-	}
-	
-	public void verifyCluster(Cluster cluster) {
-		if (cluster.getNodes().size() == 0) {
-			throw new VerifyError("No nodes in cluster " + cluster.getName());
-		}
-		
-		Set<String> taskSet = new HashSet<String>();
-		for (ClusterNode node : cluster.getNodes()) {
-			if (taskSet.contains(node.getTaskId())) {
-				throw new VerifyError("Duplicate task id " + node.getTaskId());
-			}
-			if (node.getTaskId() == null) {
-				throw new VerifyError("Missing task id");
-			}
-			taskSet.add(node.getTaskId());			
-		}
-		
-		if (cluster.getType().equals(ClusterType.S4)) {
-			verifyS4Cluster(cluster);
-		}
-		else {
-			verifyAdapterCluster(cluster);
-		}
-	}
-	
-	public void verifyS4Cluster(Cluster cluster) {
-		/*
-		 * rules:
-		 * 1)	if any node has a partition id,
-		 * 		a)	all must have partition ids
-		 * 		b)	the partition ids must be 0-n, where n is the number of nodes
-		 * 			minus 1
-		*/
-		int nodeCount = cluster.getNodes().size();
-		Set<Integer> idSet = new HashSet<Integer>();
-		for (ClusterNode node : cluster.getNodes()) {			
-			int partitionId = node.getPartition();
-			if (partitionId == -1) {
-				throw new VerifyError("No partition specified on node " + node.getTaskId());
-			}
-			if (partitionId < 0 || partitionId > (nodeCount-1)) {
-				throw new VerifyError("Bad partition specified " + partitionId);
-			}
-			if (idSet.contains(new Integer(partitionId))) {
-				throw new VerifyError("Duplicate partition in cluster: " + partitionId);
-			}
-			idSet.add(partitionId);
-			
-			if (node.getPort() == -1) {
-				throw new VerifyError("Missing port number on node " + node.getTaskId());
-			}
-		}
-		
-		if (idSet.size() != nodeCount && idSet.size() != 0) {
-			throw new VerifyError("Bad partition ids in cluster " + idSet);
-		}		
-	}
-	
-	public void verifyAdapterCluster(Cluster cluster) {
-		for (ClusterNode node : cluster.getNodes()) {			
-			if (node.getPartition() != -1) {
-				throw new VerifyError("Cannot specify partition for adapter node");
-			}
-		}
-	}
-	
-	public Config processConfigElement(Node configElement) {
-	    String version = ((Element)configElement).getAttribute("version");
-	    if (version == null ||     version.length() > 0) {
-	        version = "-1";
-	    }
-	    
-		NodeList nodeList = configElement.getChildNodes();
-		
-		Config config = new Config(version);
-		for (int i = 0; i < nodeList.getLength(); i++) {
-			Node node = nodeList.item(i);
-			if (node.getNodeType() == Node.ELEMENT_NODE && node.getNodeName().equals("cluster")) {
-				config.addCluster(processClusterElement(node));
-			}
-		}
-		
-		return config;
-	}
-	
-	public Cluster processClusterElement(Node clusterElement) {
-		Cluster cluster = new Cluster();
-		 
-		String mode = ((Element)clusterElement).getAttribute("mode");
-		if (mode != null) {
-			cluster.setMode(mode);
-		}
-		String name = ((Element)clusterElement).getAttribute("name");
-		if (name != null) {
-			cluster.setName(name);
-		}
-		String typeString = ((Element)clusterElement).getAttribute("type");
-		if (typeString != null) {
-			if (typeString.equals("adapter")) {
-				cluster.setType(ClusterType.ADAPTER);
-			}
-			else if (typeString.equals("s4")) {
-				cluster.setType(ClusterType.S4);
-			}			
-		}
-		
-		NodeList nodeList = clusterElement.getChildNodes();
-		for (int i = 0; i < nodeList.getLength(); i++) {
-			Node node = nodeList.item(i);
-			if (node.getNodeType() == Node.ELEMENT_NODE && node.getNodeName().equals("node")) {
-				cluster.addNode(processClusterNodeElement(node));
-			}
-		}
-		return cluster;
-	}
-
-	public ClusterNode processClusterNodeElement(Node clusterNodeElement) {
-		int partition = -1;
-		int port = 0;
-		String machineName = null;
-		String taskId = null;
-		
-		NodeList nodeList = clusterNodeElement.getChildNodes();
-		for (int i = 0; i < nodeList.getLength(); i++) {
-			Node node = nodeList.item(i);
-			
-			if (node.getNodeType() != Node.ELEMENT_NODE) {
-				continue;
-			}
-			
-			if (node.getNodeName().equals("partition")) {
-				try {
-					partition = Integer.parseInt(getElementContentText(node));
-					
-				}
-				catch (NumberFormatException nfe) {
-					throw new VerifyError("Bad partition specified " + getElementContentText(node));
-				}
-			}
-			else if (node.getNodeName().equals("port")) {
-				try {
-					port = Integer.parseInt(getElementContentText(node));
-					
-				}
-				catch (NumberFormatException nfe) {
-					throw new VerifyError("Bad port specified " + getElementContentText(node));
-				}
-			}
-			else if (node.getNodeName().equals("machine")) {
-				machineName = getElementContentText(node);
-			}
-			else if (node.getNodeName().equals("taskId")) {
-				taskId = getElementContentText(node);
-			}
-		}
-		
-		return new ClusterNode(partition, port, machineName, taskId);
-	}
-	
-	private static Document createDocument(String configFilename) {
-		try {
-			Document document;
-			// Get a JAXP parser factory object
-			javax.xml.parsers.DocumentBuilderFactory dbf = DocumentBuilderFactory
-					.newInstance();
-			// Tell the factory what kind of parser we want
-			dbf.setValidating(false);
-			dbf.setIgnoringComments(true);
-			dbf.setIgnoringElementContentWhitespace(true);
-			// Use the factory to get a JAXP parser object
-			javax.xml.parsers.DocumentBuilder parser = dbf.newDocumentBuilder();
-
-			// Tell the parser how to handle errors. Note that in the JAXP API,
-			// DOM parsers rely on the SAX API for error handling
-			parser.setErrorHandler(new org.xml.sax.ErrorHandler() {
-				public void warning(SAXParseException e) {
-					logger.warn("WARNING: " + e.getMessage(), e);
-				}
-
-				public void error(SAXParseException e) {
-					logger.error("ERROR: " + e.getMessage(),e);
-				}
-
-				public void fatalError(SAXParseException e) throws SAXException {
-					logger.error("FATAL ERROR: " + e.getMessage(), e);
-					throw e; // re-throw the error
-				}
-			});
-
-			// Finally, use the JAXP parser to parse the file. This call returns
-			// A Document object. Now that we have this object, the rest of this
-			// class uses the DOM API to work with it; JAXP is no longer
-			// required.
-			InputStream is = getResourceStream(configFilename);
-			if(is == null){
-				throw new RuntimeException("Unable to find config file:"+ configFilename);
-			}
-			document = parser.parse(is);
-			return document;
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-	
-	public String getElementContentText(Node node) {
-		if (node.getNodeType() != Node.ELEMENT_NODE) {
-			return "";
-		}
-		NodeList children = node.getChildNodes();
-		for (int i = 0; i < children.getLength(); i++) {
-			Node child = children.item(i);
-			if (child.getNodeType() == Node.TEXT_NODE) {
-				return child.getNodeValue();
-			}
-		}
-		
-		return "";
-	}
-	
-	public static void main(String[] args) {
-		ConfigParser parser = new ConfigParser();
-		Config config = parser.parse(args[0]);
-		System.out.println(config);
-	}
-
-	private static InputStream getResourceStream(String configfile) {
-		try {
-			File f = new File(configfile);
-			if (f.exists()) {
-				if (f.isFile()) {
-					return new FileInputStream(configfile);
-				} else {
-					throw new RuntimeException("configFile " + configfile
-							+ "  is not a regular file:");
-				}
-			}
-			InputStream is = Thread.currentThread().getContextClassLoader()
-					.getResourceAsStream(configfile);
-			if (is != null) {
-				return is;
-			}
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		return null;
-	}
-	
-	static public class Config {
-		List<Cluster> clusters = new ArrayList<Cluster>();
-		private String version = "-1";
-		
-		public String getVersion() {
-            return version;
-        }
-
-        public void setVersion(String version) {
-            this.version = version;
-        }
-
-        public Config(String version) {
-		    this.version = version;
-		}
-		
-		public Config() {
-		}
-		
-		public void addCluster(Cluster cluster) {
-			clusters.add(cluster);
-		}
-
-		public List<Cluster> getClusters() {
-			return Collections.unmodifiableList(clusters);
-		}
-		
-		public String toString() {
-			return "{version="+version+",clusters="+clusters+"}";
-		}
-	}
-	
-	static public class Cluster {
-		public enum ClusterType {
-			S4("s4"),
-			ADAPTER("adapter");
-			
-			private final String clusterTypeString;
-			
-			private ClusterType(String eventShortName){
-				this.clusterTypeString = eventShortName;
-			}
-			
-		    public String toString() {
-		        return clusterTypeString;
-		    }			
-		}
-		
-		List<ConfigParser.ClusterNode> nodes = new ArrayList<ConfigParser.ClusterNode>();
-		String mode = "unicast";
-		String name = "unknown";
-		ClusterType type = ClusterType.S4;
-		
-		public void addNode(ConfigParser.ClusterNode node) {
-			nodes.add(node);
-		}
-		
-		public List<ConfigParser.ClusterNode> getNodes() {
-			return Collections.unmodifiableList(nodes);
-		}
-
-		public String getMode() {
-			return mode;
-		}
-
-		public void setMode(String mode) {
-			this.mode = mode;
-		}
-		
-		public String getName() {
-			return name;
-		}
-
-		public void setName(String name) {
-			this.name = name;
-		}
-
-		public ClusterType getType() {
-			return type;
-		}
-
-		public void setType(ClusterType type) {
-			this.type = type;
-		}
-
-		public String toString() {
-			StringBuffer sb = new StringBuffer();
-			sb.append("{name=").append(name).
-				append(",mode=").append(mode).
-				append(",type=").append(type).
-				append(",nodes=").append(nodes).append("}");
-			return sb.toString();
-		}
-		
-	}
-	
-	static public class ClusterNode {
-		private int partition;
-		private int port;
-		private String machineName;
-		private String taskId;
-		
-		public ClusterNode(int partition, int port, String machineName, String taskId) {
-			this.partition = partition;
-			this.port = port;
-			this.machineName = machineName;
-			this.taskId = taskId;
-		}
-		
-		public int getPartition() {
-			return partition;
-		}
-		public int getPort() {
-			return port;
-		}
-		public String getMachineName() {
-			return machineName;
-		}
-		public String getTaskId() {
-			return taskId;
-		}
-		
-		public String toString() {
-			StringBuffer sb = new StringBuffer();
-			sb.append("{").append("partition=").append(partition).
-				append(",port=").append(port).
-				append(",machineName=").append(machineName).
-				append(",taskId=").append(taskId).append("}");
-			return sb.toString();
-		}
-	}
-	
-	public class VerifyError extends RuntimeException {
-		public VerifyError(String message) {
-			super(message);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/util/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/util/ConfigUtils.java b/s4-comm/src/main/java/io/s4/comm/util/ConfigUtils.java
deleted file mode 100644
index 1ca813f..0000000
--- a/s4-comm/src/main/java/io/s4/comm/util/ConfigUtils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package io.s4.comm.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import io.s4.comm.util.ConfigParser.Cluster;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import io.s4.comm.util.ConfigParser.ClusterNode;
-
-public class ConfigUtils {
-    public static List<Map<String, String>> readConfig(String configFilename,
-            String clusterName, ClusterType clusterType, boolean isStatic) {
-        ConfigParser parser = new ConfigParser();
-        ConfigParser.Config config = parser.parse(configFilename);
-
-        // find the requested cluster
-        Cluster cluster = null;
-        for (Cluster checkCluster : config.getClusters()) {
-            if (checkCluster.getName().equals(clusterName)
-                    && checkCluster.getType().equals(clusterType)) {
-                cluster = checkCluster;
-                break;
-            }
-        }
-        if (cluster == null) {
-            throw new RuntimeException("Cluster " + clusterName + " of type "
-                    + clusterType + " not configured");
-        }
-        return readConfig(cluster, clusterName, clusterType, isStatic);
-    }
-
-    public static List<Map<String, String>> readConfig(Cluster cluster,
-            String clusterName, ClusterType clusterType, boolean isStatic) {
-
-        List<Map<String, String>> processSet = new ArrayList<Map<String, String>>();
-        for (ClusterNode node : cluster.getNodes()) {
-            Map<String, String> nodeInfo = new HashMap<String, String>();
-            if (node.getPartition() != -1) {
-                nodeInfo.put("partition", String.valueOf(node.getPartition()));
-            }
-            if (node.getPort() != -1) {
-                nodeInfo.put("port", String.valueOf(node.getPort()));
-            }
-            nodeInfo.put("cluster.type", String.valueOf(clusterType));
-            nodeInfo.put("cluster.name", clusterName);
-            if (isStatic) {
-                nodeInfo.put("address", node.getMachineName());
-                nodeInfo.put("process.host", node.getMachineName());
-            }
-            nodeInfo.put("mode", cluster.getMode());
-            nodeInfo.put("ID", node.getTaskId());
-            processSet.add(nodeInfo);
-        }
-        return processSet;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/util/IOUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/util/IOUtil.java b/s4-comm/src/main/java/io/s4/comm/util/IOUtil.java
deleted file mode 100644
index 86a19ab..0000000
--- a/s4-comm/src/main/java/io/s4/comm/util/IOUtil.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-public class IOUtil {
-
-    public static void save(Object obj, String path) throws Exception {
-        File f = new File(path);
-        FileOutputStream fos = new FileOutputStream(f);
-        ObjectOutputStream objectOutputStream = new ObjectOutputStream(fos);
-        objectOutputStream.writeObject(obj);
-        objectOutputStream.close();
-    }
-
-    public static Object read(String path) throws Exception {
-        File f = new File(path);
-        FileInputStream fos = new FileInputStream(f);
-        ObjectInputStream objectInputStream = new ObjectInputStream(fos);
-        Object readObject = objectInputStream.readObject();
-        objectInputStream.close();
-        return readObject;
-    }
-
-    public static byte[] serializeToBytes(Object obj) {
-        try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos);
-            objectOutputStream.writeObject(obj);
-            objectOutputStream.close();
-            return bos.toByteArray();
-        } catch (Exception e) {
-            throw new RuntimeException("Exception trying to serialize to bytes, obj="
-                                               + obj,
-                                       e);
-        }
-    }
-
-    public static Object deserializeToObject(byte[] bytes) {
-        try {
-            ByteArrayInputStream bos = new ByteArrayInputStream(bytes);
-            ObjectInputStream objectInputStream = new ObjectInputStream(bos);
-            Object readObject = objectInputStream.readObject();
-            objectInputStream.close();
-            return readObject;
-        } catch (Exception e) {
-            throw new RuntimeException("Exception trying to deserialize bytes to obj, bytes="
-                                               + new String(bytes),
-                                       e);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/util/JSONUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/util/JSONUtil.java b/s4-comm/src/main/java/io/s4/comm/util/JSONUtil.java
deleted file mode 100644
index b33d2eb..0000000
--- a/s4-comm/src/main/java/io/s4/comm/util/JSONUtil.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.util;
-
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.List;
-import java.util.Iterator;
-import java.util.ArrayList;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class JSONUtil {
-    static Set<Class> knownTypes = new HashSet<Class>();
-    static {
-        knownTypes.add(String.class);
-        knownTypes.add(Double.class);
-        knownTypes.add(Integer.class);
-        knownTypes.add(Float.class);
-        knownTypes.add(Long.class);
-        knownTypes.add(Boolean.class);
-    }
-
-    public static String toJsonString(Object obj) {
-        Map<String, Object> map = getMap(obj);
-        JSONObject jsonObject = toJSONObject(map);
-        return jsonObject.toString();
-    }
-
-    public static Map<String, Object> getMapFromJson(String str) {
-        return getRawRecord(fromJsonString(str));
-    }
-
-    public static Map<String, Object> getRawRecord(JSONObject jsonRecord) {
-        Map<String, Object> record = new HashMap<String, Object>();
-        for (Iterator it = jsonRecord.keys(); it.hasNext();) {
-            try {
-                String key = (String) it.next();
-                Object value = jsonRecord.get(key);
-                record.put(key, fixValue(value));
-            } catch (Exception e) {
-                continue;
-            }
-        }
-        return record;
-    }
-
-    public static List<Map<String, Object>> getRawList(JSONArray jsonList) {
-        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
-        int length = jsonList.length();
-        for (int i = 0; i < length; i++) {
-            try {
-                Object value = jsonList.get(i);
-                value = fixValue(value);
-                if (!(value instanceof Map)) {
-                    Map<String, Object> mapValue = new HashMap<String, Object>();
-                    mapValue.put("value", value);
-                    value = mapValue;
-                }
-                list.add((Map<String, Object>) value);
-            } catch (Exception e) {
-                continue;
-            }
-        }
-        return list;
-    }
-
-    public static Object fixValue(Object originalValue) {
-        Object value = null;
-        if (originalValue instanceof Float) {
-            value = new Double((Float) originalValue);
-        } else if (originalValue instanceof Integer) {
-            value = new Long((Integer) originalValue);
-        } else if (originalValue instanceof JSONArray) {
-            value = getRawList((JSONArray) originalValue);
-        } else if (originalValue instanceof JSONObject) {
-            value = getRawRecord((JSONObject) originalValue);
-        } else {
-            value = originalValue;
-        }
-        return value;
-    }
-
-    public static JSONObject fromJsonString(String str) {
-        JSONObject object;
-        try {
-            object = new JSONObject(str);
-        } catch (JSONException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-        }
-        return object;
-    }
-
-    public static JSONObject toJSONObject(Map<String, Object> map) {
-        JSONObject jsonObject = new JSONObject();
-        try {
-            for (String key : map.keySet()) {
-                Object val = map.get(key);
-                if (val instanceof Map) {
-                    jsonObject.put(key, toJSONObject((Map<String, Object>) val));
-                } else if (val instanceof List) {
-                    jsonObject.put(key, toJSONList((List) val));
-                } else {
-                    jsonObject.put(key, val);
-                }
-            }
-        } catch (JSONException je) {
-            je.printStackTrace();
-            return null;
-        }
-        return jsonObject;
-    }
-
-    private static JSONArray toJSONList(List list) {
-        JSONArray arr = new JSONArray();
-        for (Object val : list) {
-            if (val instanceof Map) {
-                arr.put(toJSONObject((Map<String, Object>) val));
-            } else if (val instanceof List) {
-                arr.put(toJSONList((List) val));
-            } else {
-                arr.put(val);
-            }
-        }
-        return arr;
-
-    }
-
-    public static Map<String, Object> getMap(Object obj) {
-        Map<String, Object> map = new HashMap<String, Object>();
-        if (obj != null) {
-            if (Map.class.isAssignableFrom(obj.getClass())) {
-                return (Map) obj;
-            } else {
-
-                Field[] fields = obj.getClass().getDeclaredFields();
-                for (int i = 0; i < fields.length; i++) {
-                    if (!fields[i].isAccessible()) {
-                        fields[i].setAccessible(true);
-                    }
-                    try {
-                        String name = fields[i].getName();
-                        Object val = fields[i].get(obj);
-                        if (!Modifier.isStatic(fields[i].getModifiers())
-                                && !Modifier.isTransient(fields[i].getModifiers())) {
-                            if (fields[i].getType().isPrimitive()
-                                    || knownTypes.contains(fields[i].getType())) {
-                                map.put(name, val);
-                            } else if (fields[i].getType().isArray()) {
-                                int length = Array.getLength(val);
-                                Object vals[] = new Object[length];
-                                for (int j = 0; j < length; j++) {
-                                    Object arrVal = Array.get(val, j);
-                                    if (arrVal.getClass().isPrimitive()
-                                            || knownTypes.contains(arrVal.getClass())) {
-                                        vals[j] = arrVal;
-                                    } else {
-                                        vals[j] = getMap(arrVal);
-                                    }
-                                }
-                                map.put(name, vals);
-                            } else {
-                                map.put(name, getMap(val));
-                            }
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException("Exception while getting value of "
-                                                           + fields[i],
-                                                   e);
-                    }
-                }
-            }
-        }
-        return map;
-    }
-
-    public static void main(String[] args) {
-        Map<String, Object> outerMap = new HashMap<String, Object>();
-
-        outerMap.put("doubleValue", 0.3456d);
-        outerMap.put("integerValue", 175647);
-        outerMap.put("longValue", 0x0000005000067000l);
-        outerMap.put("stringValue", "Hello there");
-
-        Map<String, Object> innerMap = null;
-        List<Map<String, Object>> innerList1 = new ArrayList<Map<String, Object>>();
-
-        innerMap = new HashMap<String, Object>();
-        innerMap.put("name", "kishore");
-        innerMap.put("count", 1787265);
-        innerList1.add(innerMap);
-        innerMap = new HashMap<String, Object>();
-        innerMap.put("name", "fred");
-        innerMap.put("count", 11);
-        innerList1.add(innerMap);
-
-        outerMap.put("innerList1", innerList1);
-
-        List<Integer> innerList2 = new ArrayList<Integer>();
-        innerList2.add(65);
-        innerList2.add(2387894);
-        innerList2.add(456);
-
-        outerMap.put("innerList2", innerList2);
-
-        JSONObject jsonObject = toJSONObject(outerMap);
-
-        String flatJSONString = null;
-        try {
-            System.out.println(jsonObject.toString(3));
-            flatJSONString = jsonObject.toString();
-            Object o = jsonObject.get("innerList1");
-            if (!(o instanceof JSONArray)) {
-                System.out.println("Unexpected type of list "
-                        + o.getClass().getName());
-            } else {
-                JSONArray jsonArray = (JSONArray) o;
-                o = jsonArray.get(0);
-                if (!(o instanceof JSONObject)) {
-                    System.out.println("Unexpected type of map "
-                            + o.getClass().getName());
-                } else {
-                    JSONObject innerJSONObject = (JSONObject) o;
-                    System.out.println(innerJSONObject.get("name"));
-                }
-            }
-        } catch (JSONException je) {
-            je.printStackTrace();
-        }
-
-        if (!flatJSONString.equals(toJsonString(outerMap))) {
-            System.out.println("JSON strings don't match!!");
-        }
-
-        Map<String, Object> map = getMapFromJson(flatJSONString);
-
-        Object o = map.get("doubleValue");
-        if (!(o instanceof Double)) {
-            System.out.println("Expected type Double, got "
-                    + o.getClass().getName());
-            Double doubleValue = (Double) o;
-            if (doubleValue != 0.3456d) {
-                System.out.println("Expected 0.3456, got " + doubleValue);
-            }
-        }
-
-        o = map.get("innerList1");
-        if (!(o instanceof List)) {
-            System.out.println("Expected implementation of List, got "
-                    + o.getClass().getName());
-        } else {
-            List innerList = (List) o;
-            o = innerList.get(0);
-            if (!(o instanceof Map)) {
-                System.out.println("Expected implementation of Map, got "
-                        + o.getClass().getName());
-            } else {
-                innerMap = (Map) o;
-                System.out.println(innerMap.get("name"));
-            }
-        }
-        System.out.println(map);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/util/SystemUtils.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/util/SystemUtils.java b/s4-comm/src/main/java/io/s4/comm/util/SystemUtils.java
deleted file mode 100644
index 0d29a28..0000000
--- a/s4-comm/src/main/java/io/s4/comm/util/SystemUtils.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.util;
-
-public class SystemUtils {
-
-    private SystemUtils() {
-    }
-
-    public static long getPID() {
-        String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
-                                                                   .getName();
-        return Long.parseLong(processName.split("@")[0]);
-    }
-
-    public static void main(String[] args) {
-        String msg = "My PID is " + SystemUtils.getPID();
-
-        javax.swing.JOptionPane.showConfirmDialog((java.awt.Component) null,
-                                                  msg,
-                                                  "SystemUtils",
-                                                  javax.swing.JOptionPane.DEFAULT_OPTION);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/zk/ThreadTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/zk/ThreadTest.java b/s4-comm/src/main/java/io/s4/comm/zk/ThreadTest.java
deleted file mode 100644
index 4395e38..0000000
--- a/s4-comm/src/main/java/io/s4/comm/zk/ThreadTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.zk;
-
-public class ThreadTest {
-    static Object lock = new Object();
-
-    public static void main(String[] args) {
-
-        Thread t1 = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                while (true) {
-                    synchronized (lock) {
-                        System.out.println("In thread T1");
-                        try {
-                            System.out.println("Going to wait");
-                            long start = System.currentTimeMillis();
-                            lock.wait();
-                            long end = System.currentTimeMillis();
-                            System.out.println("Woke up T1 after :"
-                                    + (end - start) / 1000 + "secs");
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            }
-        });
-        t1.start();
-        Thread t2 = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                while (true) {
-                    synchronized (lock) {
-                        try {
-                            Thread.sleep(10000);
-                        } catch (InterruptedException e) {
-                            // TODO Auto-generated catch block
-                            e.printStackTrace();
-                        }
-                        System.out.println("In thread T2");
-                        lock.notify();
-                        break;
-                    }
-                }
-            }
-        });
-        t2.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/zk/ZkProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/zk/ZkProcessMonitor.java b/s4-comm/src/main/java/io/s4/comm/zk/ZkProcessMonitor.java
deleted file mode 100644
index d082def..0000000
--- a/s4-comm/src/main/java/io/s4/comm/zk/ZkProcessMonitor.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.zk;
-
-import io.s4.comm.core.CommEventCallback;
-import io.s4.comm.core.DefaultWatcher;
-import io.s4.comm.core.ProcessMonitor;
-import io.s4.comm.util.JSONUtil;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkProcessMonitor extends DefaultWatcher implements Runnable,
-        ProcessMonitor {
-    static Logger logger = Logger.getLogger(ZkProcessMonitor.class);
-    private List<Object> destinationList;
-    private Map<Integer, Object> destinationMap;
-    private String processZNode;
-    private Object lock = new Object();
-    private volatile boolean updateMode = false;
-    private String taskZNode;
-    private int taskCount;
-
-    public ZkProcessMonitor(String address, String clusterName, ClusterType clusterType) {
-        this(address, clusterName, clusterType, null);
-    }
-
-    public ZkProcessMonitor(String address, String ClusterName, ClusterType clusterType,
-            CommEventCallback callbackHandler) {
-        super(address, callbackHandler);
-        String root = "/" + ClusterName + "/" + clusterType.toString();
-        this.taskZNode = root + "/task";
-        this.processZNode = root + "/process";
-        destinationMap = new HashMap<Integer, Object>();
-        destinationList = new ArrayList<Object>();
-    }
-
-    public void monitor() {
-        synchronized (mutex) {
-            readConfig();
-        }
-        new Thread(this).start();
-    }
-
-    private void readConfig() {
-        try {
-            synchronized (lock) {
-                Map<Integer, Object> tempDestinationMap = new HashMap<Integer, Object>();
-                List<Object> tempDestinationList = new ArrayList<Object>();
-                updateMode = true;
-                List<String> tasks = zk.getChildren(taskZNode, false);
-                this.taskCount = tasks.size();
-                List<String> children = zk.getChildren(processZNode, false);
-                for (String name : children) {
-                    Stat stat = zk.exists(processZNode + "/" + name, false);
-                    if (stat != null) {
-                        byte[] data = zk.getData(processZNode + "/" + name,
-                                                 false,
-                                                 stat);
-                        Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(data));
-                        String key = (String) map.get("partition");
-                        if (key != null) {
-                            tempDestinationMap.put(Integer.parseInt(key), map);
-                        }
-                        tempDestinationList.add(map);
-                    }
-                }
-                destinationList.clear();
-                destinationMap.clear();
-                destinationList.addAll(tempDestinationList);
-                destinationMap.putAll(tempDestinationMap);
-                logger.info("Updated Destination List to" + destinationList);
-                logger.info("Updated Destination Map to" + destinationMap);
-            }
-        } catch (KeeperException e) {
-            logger.warn("Ignorable exception if it happens once in a while", e);
-        } catch (InterruptedException e) {
-            logger.error("Interrupted exception cause while reading process znode",
-                         e);
-        } finally {
-            updateMode = false;
-        }
-    }
-
-    public void run() {
-        try {
-            while (true) {
-                synchronized (mutex) {
-                    // set watch
-                    logger.info("Setting watch on " + processZNode);
-                    zk.getChildren(processZNode, true);
-                    readConfig();
-                    mutex.wait();
-                }
-            }
-        } catch (KeeperException e) {
-            logger.warn("KeeperException in ProcessMonitor.run", e);
-        } catch (InterruptedException e) {
-            logger.warn("InterruptedException in ProcessMonitor.run", e);
-        }
-    }
-
-    public List<Object> getDestinationList() {
-        if (updateMode) {
-            synchronized (lock) {
-                return destinationList;
-            }
-        } else {
-            return destinationList;
-        }
-    }
-
-    public Map<Integer, Object> getDestinationMap() {
-        if (updateMode) {
-            synchronized (lock) {
-                return destinationMap;
-            }
-        } else {
-            return destinationMap;
-        }
-    }
-
-    @Override
-    public int getTaskCount() {
-        return taskCount;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/zk/ZkQueue.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/zk/ZkQueue.java b/s4-comm/src/main/java/io/s4/comm/zk/ZkQueue.java
deleted file mode 100644
index 229a968..0000000
--- a/s4-comm/src/main/java/io/s4/comm/zk/ZkQueue.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.zk;
-
-import io.s4.comm.core.DefaultWatcher;
-import io.s4.comm.util.IOUtil;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkQueue extends DefaultWatcher {
-    /**
-     * Constructor of producer-consumer queue
-     * 
-     * @param address
-     * @param name
-     */
-    public ZkQueue(String address, String name) {
-        super(address);
-        this.root = name;
-        // Create ZK node name
-        if (zk != null) {
-            try {
-                Stat s = zk.exists(root, false);
-                if (s == null) {
-                    zk.create(root,
-                              new byte[0],
-                              Ids.OPEN_ACL_UNSAFE,
-                              CreateMode.PERSISTENT);
-                }
-            } catch (KeeperException e) {
-                System.out.println("Keeper exception when instantiating queue: "
-                        + e.toString());
-            } catch (InterruptedException e) {
-                System.out.println("Interrupted exception");
-            }
-        }
-    }
-
-    /**
-     * Add element to the queue.
-     * 
-     * @param obj element to add
-     * @return true if add successful, false otherwise
-     */
-
-    public boolean produce(Object obj) throws KeeperException,
-            InterruptedException {
-        byte[] value = IOUtil.serializeToBytes(obj);
-        zk.create(root + "/element",
-                  value,
-                  Ids.OPEN_ACL_UNSAFE,
-                  CreateMode.PERSISTENT_SEQUENTIAL);
-        return true;
-    }
-
-    /**
-     * Remove first element from the queue.
-     * 
-     * @return first element from the queue
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    public Object consume() throws KeeperException, InterruptedException {
-        Object retvalue = -1;
-        Stat stat = null;
-
-        // Get the first element available
-        while (true) {
-            synchronized (mutex) {
-                List<String> list = zk.getChildren(root, true);
-                if (list.size() == 0) {
-                    System.out.println("Going to wait");
-                    mutex.wait();
-                } else {
-                    Integer min = new Integer(list.get(0).substring(7));
-                    String name = list.get(0);
-                    for (String s : list) {
-                        Integer tempValue = new Integer(s.substring(7));
-                        // System.out.println("Temporary value: " + s);
-                        if (tempValue < min) {
-                            min = tempValue;
-                            name = s;
-                        }
-                    }
-                    String zNode = root + "/" + name;
-                    System.out.println("Temporary value: " + zNode);
-                    byte[] b = zk.getData(zNode, false, stat);
-                    zk.delete(zNode, 0);
-                    retvalue = IOUtil.deserializeToObject(b);
-                    return retvalue;
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/zk/ZkTaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/zk/ZkTaskManager.java b/s4-comm/src/main/java/io/s4/comm/zk/ZkTaskManager.java
deleted file mode 100644
index 8c0c120..0000000
--- a/s4-comm/src/main/java/io/s4/comm/zk/ZkTaskManager.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.zk;
-
-import io.s4.comm.core.CommEventCallback;
-import io.s4.comm.core.DefaultWatcher;
-import io.s4.comm.core.TaskManager;
-import io.s4.comm.util.JSONUtil;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkTaskManager extends DefaultWatcher implements TaskManager {
-    static Logger logger = Logger.getLogger(ZkTaskManager.class);
-    String tasksListRoot;
-    String processListRoot;
-
-    public ZkTaskManager(String address, String ClusterName, ClusterType clusterType) {
-        this(address, ClusterName, clusterType, null);
-    }
-
-    /**
-     * Constructor of TaskManager
-     * 
-     * @param address
-     * @param ClusterName
-     */
-    public ZkTaskManager(String address, String ClusterName, ClusterType clusterType,
-            CommEventCallback callbackHandler) {
-        super(address, callbackHandler);
-        this.root = "/" + ClusterName + "/" + clusterType.toString();
-        this.tasksListRoot = root + "/task";
-        this.processListRoot = root + "/process";
-    }
-
-    /**
-     * This will block the process thread from starting the task, when it is
-     * unblocked it will return the data stored in the task node. This data can
-     * be used by the This call assumes that the tasks are already set up
-     * 
-     * @return Object containing data related to the task
-     */
-    @Override
-    public Object acquireTask(Map<String, String> customTaskData) {
-        while (true) {
-            synchronized (mutex) {
-                try {
-                    Stat tExists = zk.exists(tasksListRoot, false);
-                    if (tExists == null) {
-                        logger.error("Tasks znode:" + tasksListRoot
-                                + " not setup.Going to wait");
-                        tExists = zk.exists(tasksListRoot, true);
-                        if (tExists == null) {
-                            mutex.wait();
-                        }
-                        continue;
-                    }
-                    Stat pExists = zk.exists(processListRoot, false);
-                    if (pExists == null) {
-                        logger.error("Process root znode:" + processListRoot
-                                + " not setup.Going to wait");
-                        pExists = zk.exists(processListRoot, true);
-                        if (pExists == null) {
-                            mutex.wait();
-                        }
-                        continue;
-                    }
-                    // setting watch true to tasks node will trigger call back
-                    // if there is any change to task node,
-                    // this is useful to add additional tasks
-                    List<String> tasks = zk.getChildren(tasksListRoot, true);
-                    List<String> processes = zk.getChildren(processListRoot,
-                                                            true);
-                    if (processes.size() < tasks.size()) {
-                        ArrayList<String> tasksAvailable = new ArrayList<String>();
-                        for (int i = 0; i < tasks.size(); i++) {
-                            tasksAvailable.add("" + i);
-                        }
-                        if (processes != null) {
-                            for (String s : processes) {
-                                String taskId = s.split("-")[1];
-                                tasksAvailable.remove(taskId);
-                            }
-                        }
-                        // try pick up a random task
-                        Random random = new Random();
-                        int id = Integer.parseInt(tasksAvailable.get(random.nextInt(tasksAvailable.size())));
-                        String pNode = processListRoot + "/" + "task-" + id;
-                        String tNode = tasksListRoot + "/" + "task-" + id;
-                        Stat pNodeStat = zk.exists(pNode, false);
-                        if (pNodeStat == null) {
-                            Stat tNodeStat = zk.exists(tNode, false);
-                            byte[] bytes = zk.getData(tNode, false, tNodeStat);
-                            Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(bytes));
-                            // if(!map.containsKey("address")){
-                            // map.put("address",
-                            // InetAddress.getLocalHost().getHostName());
-                            // }
-                            if (customTaskData != null) {
-                                for (String key : customTaskData.keySet()) {
-                                    if (!map.containsKey(key)) {
-                                        map.put(key, customTaskData.get(key));
-                                    }
-                                }
-
-                            }
-                            map.put("taskSize", "" + tasks.size());
-                            map.put("tasksRootNode", tasksListRoot);
-                            map.put("processRootNode", processListRoot);
-                            String create = zk.create(pNode,
-                                                      JSONUtil.toJsonString(map)
-                                                              .getBytes(),
-                                                      Ids.OPEN_ACL_UNSAFE,
-                                                      CreateMode.EPHEMERAL);
-                            logger.info("Created process Node:" + pNode + " :"
-                                    + create);
-                            return map;
-                        }
-                    } else {
-                        // all the tasks are taken up, will wait for the
-                        logger.info("No task available to take up. Going to wait");
-                        mutex.wait();
-                    }
-                } catch (KeeperException e) {
-                    logger.info("Warn:mostly ignorable " + e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    logger.info("Warn:mostly ignorable " + e.getMessage(), e);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/zk/ZkTaskSetup.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/zk/ZkTaskSetup.java b/s4-comm/src/main/java/io/s4/comm/zk/ZkTaskSetup.java
deleted file mode 100644
index 90e6a12..0000000
--- a/s4-comm/src/main/java/io/s4/comm/zk/ZkTaskSetup.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.zk;
-
-import io.s4.comm.core.CommEventCallback;
-import io.s4.comm.core.DefaultWatcher;
-import io.s4.comm.util.CommUtil;
-import io.s4.comm.util.JSONUtil;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkTaskSetup extends DefaultWatcher {
-    static Logger logger = Logger.getLogger(ZkTaskSetup.class);
-    String tasksListRoot;
-    String processListRoot;
-
-    public ZkTaskSetup(String address, String clusterName, ClusterType clusterType) {
-        this(address, clusterName, clusterType, null);
-    }
-
-    /**
-     * Constructor of ZkTaskSetup
-     * 
-     * @param address
-     * @param clusterName
-     */
-    public ZkTaskSetup(String address, String clusterName, ClusterType clusterType,
-            CommEventCallback callbackHandler) {
-        super(address, callbackHandler);
-        
-        this.root = "/" + clusterName + "/" + clusterType.toString();
-        this.tasksListRoot = root + "/task";
-        this.processListRoot = root + "/process";
-    }
-
-    public void setUpTasks(Object[] data) {
-        setUpTasks("-1", data);
-    }
-
-    /**
-     * Creates task nodes.
-     * 
-     * @param version
-     * @param data
-     */
-    public void setUpTasks(String version, Object[] data) {
-        try {
-            logger.info("Trying to set up configuration with new version:"
-                    + version);
-            if (!version.equals("-1")) {
-                if (!isConfigVersionNewer(version)) {
-                    logger.info("Config version not newer than current version");
-                    return;
-                } else {
-                    cleanUp();
-                }
-            } else {
-                logger.info("Not checking version number since it is set to -1");
-            }
-
-            // check if config data newer
-            if (!isConfigDataNewer(data)) {
-                logger.info("Config data not newer than current version");
-                return;
-            } else {
-                logger.info("Found newer Config data. Cleaning old data");
-                cleanUp();
-            }
-
-            // Create ZK node name
-            if (zk != null) {
-                Stat s;
-                s = zk.exists(root, false);
-                if (s == null) {
-                    String parent = new File(root).getParent()
-                                                  .replace(File.separatorChar,
-                                                           '/');
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("parent:" + parent);
-                    }
-                    Stat exists = zk.exists(parent, false);
-                    if (exists == null) {
-                        zk.create(parent,
-                                  new byte[0],
-                                  Ids.OPEN_ACL_UNSAFE,
-                                  CreateMode.PERSISTENT);
-                    }
-                    zk.create(root,
-                              new byte[0],
-                              Ids.OPEN_ACL_UNSAFE,
-                              CreateMode.PERSISTENT);
-                }
-            }
-            Stat s;
-            s = zk.exists(tasksListRoot, false);
-            if (s == null) {
-                Map<String, String> map = new HashMap<String, String>();
-                map.put("config.version", version);
-                String jsonString = JSONUtil.toJsonString(map);
-                zk.create(tasksListRoot,
-                          jsonString.getBytes(),
-                          Ids.OPEN_ACL_UNSAFE,
-                          CreateMode.PERSISTENT);
-            }
-            s = zk.exists(processListRoot, false);
-            if (s == null) {
-                zk.create(processListRoot,
-                          new byte[0],
-                          Ids.OPEN_ACL_UNSAFE,
-                          CreateMode.PERSISTENT);
-
-            }
-
-            for (int i = 0; i < data.length; i++) {
-                String nodeName = tasksListRoot + "/" + "task" + "-" + i;
-                Stat sTask = zk.exists(nodeName, false);
-                if (sTask == null) {
-                    logger.info("Creating taskNode: " + nodeName);
-                    byte[] byteBuffer = JSONUtil.toJsonString(data[i])
-                                                .getBytes();
-                    zk.create(nodeName,
-                              byteBuffer,
-                              Ids.OPEN_ACL_UNSAFE,
-                              CreateMode.PERSISTENT);
-                } else {
-                    logger.warn("TaskNode already exisits: " + nodeName);
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Keeper exception when creating task nodes: "
-                                 + e.toString(),
-                         e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private boolean isConfigDataNewer(Object[] data) {
-        try {
-            Stat s;
-            s = zk.exists(tasksListRoot, false);
-            if (s != null) {
-                List<String> children = zk.getChildren(tasksListRoot, false);
-                if (children.size() != data.length) {
-                    return true;
-                }
-                boolean[] matched = new boolean[data.length];
-                for (String child : children) {
-                    String childPath = tasksListRoot + "/" + child;
-                    Stat sTemp = zk.exists(childPath, false);
-                    byte[] tempData = zk.getData(tasksListRoot + "/" + child,
-                                                 false,
-                                                 sTemp);
-                    Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
-
-                    // check if it matches any of the data
-                    for (int i = 0; i < data.length; i++) {
-                        Map<String, Object> newData = (Map<String, Object>) data[i];
-                        if (!matched[i] && CommUtil.compareMaps(newData, map)) {
-                            matched[i] = true;
-                            break;
-                        }
-                    }
-                }
-                for (int i = 0; i < matched.length; i++) {
-                    if (!matched[i]) {
-                        return true;
-                    }
-                }
-            } else {
-                return true;
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(" Exception in isConfigDataNewer method ",
-                                       e);
-        }
-        return false;
-    }
-
-    private boolean isConfigVersionNewer(String version) throws Exception {
-        Stat s;
-        s = zk.exists(tasksListRoot, false);
-        if (s != null) {
-            byte[] data = zk.getData(tasksListRoot, false, s);
-            if (data != null && data.length > 0) {
-                String jsonString = new String(data);
-                if (jsonString != null) {
-                    Map<String, Object> map = JSONUtil.getMapFromJson(jsonString);
-                    if (map.containsKey("config.version")) {
-                        boolean update = false;
-                        String currentVersion = map.get("config.version")
-                                                   .toString();
-                        logger.info("Current config version:" + currentVersion);
-                        String[] curV = currentVersion.split("\\.");
-                        String[] newV = version.split("\\.");
-                        for (int i = 0; i < Math.max(curV.length, newV.length); i++) {
-                            if (Integer.parseInt(newV[i]) > Integer.parseInt(curV[i])) {
-                                update = true;
-                                break;
-                            }
-                        }
-                        if (!update) {
-                            logger.info("Current config version is newer. Config will not be updated");
-                        }
-                        return update;
-                    }
-                } else {
-                    logger.info("No data at znode " + tasksListRoot
-                            + " so version checking will not be done");
-                }
-            } else {
-                logger.info("No data at znode " + tasksListRoot
-                        + " so version checking will not be done");
-            }
-        } else {
-            logger.info("znode " + tasksListRoot
-                    + " does not exist, so creating new one is fine");
-        }
-        return true;
-    }
-
-    /**
-     * Will clean up taskList Node and process List Node
-     */
-    public boolean cleanUp() {
-        try {
-            logger.info("Cleaning :" + tasksListRoot);
-            Stat exists = zk.exists(tasksListRoot, false);
-            if (exists != null) {
-                List<String> children = zk.getChildren(tasksListRoot, false);
-                if (children.size() > 0) {
-                    for (String child : children) {
-                        logger.info("Cleaning :" + tasksListRoot + "/" + child);
-                        zk.delete(tasksListRoot + "/" + child, 0);
-                    }
-                }
-                zk.delete(tasksListRoot, 0);
-            }
-
-            exists = zk.exists(processListRoot, false);
-            if (exists != null) {
-                List<String> children = zk.getChildren(processListRoot, false);
-                if (children.size() > 0) {
-                    logger.warn("Some processes are already running. Cleaning them up. Might result in unpredictable behavior");
-                    for (String child : children) {
-                        logger.info("Cleaning :" + processListRoot + "/"
-                                + child);
-                        zk.delete(processListRoot + "/" + child, 0);
-                    }
-                }
-                logger.info("Finished cleaning :" + processListRoot);
-                zk.delete(processListRoot, 0);
-            }
-            return true;
-        } catch (Exception e) {
-            logger.error("Exception while cleaning up: " + e.getMessage(), e);
-            return false;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/zk/ZkUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/zk/ZkUtil.java b/s4-comm/src/main/java/io/s4/comm/zk/ZkUtil.java
deleted file mode 100644
index 49a63a7..0000000
--- a/s4-comm/src/main/java/io/s4/comm/zk/ZkUtil.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.zk;
-
-import io.s4.comm.core.DefaultWatcher;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
-
-public class ZkUtil extends DefaultWatcher {
-
-    public ZkUtil(String address) {
-        super(address);
-
-    }
-
-    public int getChildCount(String path) {
-        try {
-            List<String> children = zk.getChildren(path, false);
-            return children.size();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<String> getChildren(String path) {
-        try {
-            List<String> children = zk.getChildren(path, false);
-            return children;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public byte[] getData(String path) {
-        try {
-            byte[] data = zk.getData(path, false, null);
-            return data;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    public void create(String path) {
-        create(path, "");
-    }
-
-    public void create(String path, String data) {
-        try {
-            zk.create(path,
-                      data.getBytes(),
-                      Ids.OPEN_ACL_UNSAFE,
-                      CreateMode.PERSISTENT);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    public void deleteRecursive(String path) {
-        List<String> children = getChildren(path);
-        for (String child : children) {
-            deleteRecursive(path + "/" + child);
-        }
-        delete(path);
-    }
-
-    public void delete(String path) {
-        try {
-            zk.delete(path, -1);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        if (args.length == 0) {
-            printUsage();
-
-        }
-        String address = args[0];
-        String methodName = args[1];
-
-        String[] methodArgs = new String[args.length - 2];
-        for (int i = 2; i < args.length; i++) {
-            methodArgs[i - 2] = args[i];
-        }
-        Method[] methods = ZkUtil.class.getMethods();
-        Method method = null;
-        for (Method met : methods) {
-            if (met.getName().equals(methodName)
-                    && met.getParameterTypes().length == methodArgs.length) {
-                method = met;
-                break;
-            }
-        }
-
-        if (method != null) {
-            ZkUtil zkUtil = new ZkUtil(address);
-            Object ret = method.invoke(zkUtil, (Object[]) methodArgs);
-            if (ret != null) {
-                System.out.println("**********");
-                System.out.println(ret);
-                System.out.println("**********");
-            }
-        } else {
-            printUsage();
-        }
-        // zkUtil.deleteRecursive("/s4/listener/process/task-0");
-        // zkUtil.create("/s4_apps_test/sender/process");
-    }
-
-    private static void printUsage() {
-        System.out.println("USAGE");
-        System.out.println("java <zkadress> methodName arguments");
-        Method[] methods = ZkUtil.class.getMethods();
-        for (Method met : methods) {
-            System.out.println(met.getName() + ":"
-                    + Arrays.toString(met.getParameterTypes()));
-        }
-        System.exit(1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/CommEventCallback.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/CommEventCallback.java b/s4-comm/src/main/java/org/apache/s4/comm/core/CommEventCallback.java
new file mode 100644
index 0000000..29df9cb
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/CommEventCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.util.Map;
+
+public interface CommEventCallback {
+
+    public void handleCallback(Map<String, Object> eventData);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/CommLayerState.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/CommLayerState.java b/s4-comm/src/main/java/org/apache/s4/comm/core/CommLayerState.java
new file mode 100644
index 0000000..820b0c2
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/CommLayerState.java
@@ -0,0 +1,5 @@
+package org.apache.s4.comm.core;
+
+public enum CommLayerState {
+    INITIALIZED, BROKEN
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/CommServiceFactory.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/CommServiceFactory.java b/s4-comm/src/main/java/org/apache/s4/comm/core/CommServiceFactory.java
new file mode 100644
index 0000000..c82658f
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/CommServiceFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.file.StaticProcessMonitor;
+import org.apache.s4.comm.file.StaticTaskManager;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkProcessMonitor;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Common Factory class to provide appropriate implementations
+ * 
+ * @author kishoreg
+ * 
+ */
+public class CommServiceFactory {
+    private static Logger logger = Logger.getLogger(CommServiceFactory.class);
+
+    public static TaskManager getTaskManager(String zkaddress,
+            String clusterName, ClusterType clusterType,
+            CommEventCallback callbackHandler) {
+        String mode = System.getProperty("commlayer.mode");
+        TaskManager taskManager = null;
+        if (mode != null && mode.equalsIgnoreCase("static")) {
+            logger.info("Comm layer mode is set to static");
+            taskManager = new StaticTaskManager(zkaddress,
+                                                clusterName,
+                                                clusterType,
+                                                callbackHandler);
+        } else {
+            taskManager = new ZkTaskManager(zkaddress,
+                                            clusterName,
+                                            clusterType,
+                                            callbackHandler);
+        }
+
+        return taskManager;
+    }
+
+    public static ProcessMonitor getProcessMonitor(String zkaddress,
+            String clusterName, CommEventCallback callbackHandler) {
+        ProcessMonitor processMonitor = null;
+        String mode = System.getProperty("commlayer.mode");
+        if (mode != null && mode.equalsIgnoreCase("static")) {
+            logger.info("Comm layer mode is set to static");
+            processMonitor = new StaticProcessMonitor(zkaddress,
+                                                      clusterName,
+                                                      ClusterType.S4);
+        } else {
+            processMonitor = new ZkProcessMonitor(zkaddress,
+                                                  clusterName,
+                                                  ClusterType.S4,
+                                                  callbackHandler);
+        }
+        return processMonitor;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/DefaultWatcher.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/DefaultWatcher.java b/s4-comm/src/main/java/org/apache/s4/comm/core/DefaultWatcher.java
new file mode 100644
index 0000000..5201fa5
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/DefaultWatcher.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class DefaultWatcher implements Watcher {
+
+    public static List<KeeperState> interestingStates = new ArrayList<KeeperState>();
+    static {
+        interestingStates.add(KeeperState.Expired);
+        interestingStates.add(KeeperState.SyncConnected);
+    }
+    public static Logger logger = Logger.getLogger(DefaultWatcher.class);
+    protected ZooKeeper zk = null;
+    protected Integer mutex;
+    protected String root;
+    protected WatchedEvent currentEvent;
+    protected CommEventCallback callbackHandler;
+    private String zkAddress;
+    volatile boolean connected = false;
+
+    protected DefaultWatcher(String address) {
+        this(address, null);
+    }
+
+    protected DefaultWatcher(String address, CommEventCallback callbackHandler) {
+        this.zkAddress = address;
+        this.callbackHandler = callbackHandler;
+        if (zk == null) {
+            try {
+                logger.info("Connecting to  zookeeper server:" + address);
+                String sTimeout = System.getProperty("zk.session.timeout");
+                System.out.println("sTimeout=" + sTimeout);
+                int timeout = 30000;
+                if (sTimeout != null) {
+                    try {
+                        timeout = Integer.parseInt(sTimeout);
+                    } catch (Exception e) {
+                        // ignore will use default
+                    }
+                }
+                mutex = new Integer(-1);
+                synchronized (mutex) {
+                    zk = new ZooKeeper(address, timeout, this);
+                    while (!connected) {
+                        logger.info("Waiting for connection to be established ");
+                        mutex.wait();
+                    }
+                }
+                logger.info("Connected to zookeeper with sessionid: "
+                        + zk.getSessionId() + " and session timeout(ms): "
+                        + timeout);
+
+            } catch (Exception e) {
+                logger.error("Failed to connect to zookeeper:" + e.getMessage(),
+                             e);
+                zk = null;
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    synchronized public void process(WatchedEvent event) {
+        logger.info("Received zk event:" + event);
+        synchronized (mutex) {
+            currentEvent = event;
+            if (event.getState() == KeeperState.SyncConnected) {
+                connected = true;
+            }
+            if (callbackHandler != null
+                    && interestingStates.contains(event.getState())) {
+                Map<String, Object> eventData = new HashMap<String, Object>();
+                if (event.getState() == KeeperState.SyncConnected) {
+                    eventData.put("state", CommLayerState.INITIALIZED);
+                } else if (event.getState() == KeeperState.Expired) {
+                    eventData.put("state", CommLayerState.BROKEN);
+                }
+                eventData.put("source", event);
+                callbackHandler.handleCallback(eventData);
+            }
+            mutex.notify();
+        }
+    }
+
+    public String getZkAddress() {
+        return zkAddress;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/Deserializer.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/Deserializer.java b/s4-comm/src/main/java/org/apache/s4/comm/core/Deserializer.java
new file mode 100644
index 0000000..d8af0f8
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/Deserializer.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+public interface Deserializer {
+
+    public Object deserialize(byte[] buffer);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/GenericListener.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/GenericListener.java b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericListener.java
new file mode 100644
index 0000000..2bfb613
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericListener.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class GenericListener {
+    private static Logger logger = Logger.getLogger(GenericListener.class);
+    private String zkAddress;
+    private DatagramSocket socket;
+    int BUFFER_LENGTH = 65507;
+    private DatagramPacket dgram;
+    private byte[] bs;
+    final Deserializer deserializer;
+
+    public GenericListener(String zkaddress, String appName,
+            Object listenerConfig) {
+        this(zkaddress, appName, listenerConfig, new GenericSerDeser());
+    }
+
+    public GenericListener(String zkaddress, String appName,
+            Object listenerConfig, Deserializer deserializer) {
+        this.zkAddress = zkAddress;
+        this.deserializer = deserializer;
+        try {
+            Map<String, String> map = (Map<String, String>) listenerConfig;
+            String mode = map.get("mode");
+            int port = Integer.parseInt(map.get("port"));
+            if (mode.equals("multicast")) {
+                InetAddress inetAddress = InetAddress.getByName(map.get("channel"));
+                socket = new MulticastSocket(port);
+                ((MulticastSocket) socket).joinGroup(inetAddress);
+            }
+            if (mode.equals("unicast")) {
+                socket = new DatagramSocket(port);
+            }
+            String udpBufferSize = System.getProperty("udp.buffer.size");
+            if (udpBufferSize == null) {
+                udpBufferSize = "4194302";
+            }
+            socket.setReceiveBufferSize(Integer.parseInt(udpBufferSize));
+            bs = new byte[BUFFER_LENGTH];
+            dgram = new DatagramPacket(bs, bs.length);
+        } catch (IOException e) {
+            logger.error("error creating listener", e);
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public Object receive() {
+        try {
+            socket.receive(dgram);
+            byte[] data = new byte[dgram.getLength()];
+            System.arraycopy(dgram.getData(),
+                             dgram.getOffset(),
+                             data,
+                             0,
+                             data.length);
+            Object object = deserializer.deserialize(data);
+            dgram.setLength(BUFFER_LENGTH);
+            return object;
+        } catch (IOException e) {
+            logger.error("error receiving message", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /*
+     * There is nothing much to do for multicast and unicast
+     */
+    public void start() {
+
+    }
+
+}


Mime
View raw message