incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [2/2] git commit: S4-7 Guaranteed, ordered, robust TCP protocol - send() queues the messages - an asynchronous thread pool sends it using Netty - on success, the delivered message is notified - added close() method to Emitter and Listener - added tests t
Date Wed, 01 Feb 2012 18:31:19 GMT
S4-7 Guaranteed, ordered, robust TCP protocol - send() queues the messages - an asynchronous thread pool sends it using Netty - on success, the delivered message is notified - added close() method to Emitter and Listener - added tests to check the above-mentioned properties


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/8e2427b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/8e2427b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/8e2427b0

Branch: refs/heads/S4-7
Commit: 8e2427b02a09460891bc6d01f9fc41b42304fb58
Parents: 5e38aa2
Author: Karthik Kambatla <kkambatl@cs.purdue.edu>
Authored: Wed Jan 25 19:00:00 2012 -0500
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Tue Jan 31 18:55:17 2012 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/base/Emitter.java  |   18 +-
 .../src/main/java/org/apache/s4/base/Listener.java |   20 +-
 .../java/org/apache/s4/comm/QueueingEmitter.java   |  177 ++++----
 .../java/org/apache/s4/comm/QueueingListener.java  |   16 +-
 .../apache/s4/comm/loopback/LoopBackEmitter.java   |   10 +-
 .../apache/s4/comm/loopback/LoopBackListener.java  |   17 +-
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |  329 +++++++++------
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |   56 ++-
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |   13 +-
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |    8 +-
 .../java/org/apache/s4/comm/udp/UDPListener.java   |   14 +-
 .../java/org/apache/s4/comm/DeliveryTestUtil.java  |  143 -------
 .../s4/comm/tcp/MultiPartitionDeliveryTest.java    |   28 ++
 .../org/apache/s4/comm/tcp/NetworkGlitchTest.java  |   32 ++
 .../org/apache/s4/comm/tcp/SimpleDeliveryTest.java |   23 +
 .../java/org/apache/s4/comm/tcp/TCPBasedTest.java  |   33 ++
 .../java/org/apache/s4/comm/tcp/TCPCommTest.java   |   69 ---
 .../s4/comm/topology/TopologyFromZKTest.java       |    1 +
 .../s4/comm/udp/MultiPartitionDeliveryTest.java    |   30 ++
 .../org/apache/s4/comm/udp/SimpleDeliveryTest.java |   30 ++
 .../java/org/apache/s4/comm/udp/UDPBasedTest.java  |   32 ++
 .../java/org/apache/s4/comm/udp/UDPCommTest.java   |   65 ---
 .../org/apache/s4/comm/util/PartitionInfo.java     |  139 ++++++
 .../org/apache/s4/comm/util/ProtocolTestUtil.java  |   76 ++++
 .../FileBasedClusterManagementTestModule.java      |   11 +-
 .../ZkBasedClusterManagementTestModule.java        |   21 +-
 .../java/org/apache/s4/fixtures/ZkBasedTest.java   |   36 +-
 27 files changed, 865 insertions(+), 582 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index 0570c14..ad511c9 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -1,13 +1,17 @@
 package org.apache.s4.base;
 
 public interface Emitter {
-	
-	/*
-	 * @param partitionId - destination partition
-	 * @param message - message payload that needs to be sent
-	 * @return - true - if message is sent across successfully
-	 *         - false - if send fails
-	 */
+
+    /*
+     * @param partitionId - destination partition
+     * 
+     * @param message - message payload that needs to be sent
+     * 
+     * @return - true - if message is sent across successfully - false - if send fails
+     */
     boolean send(int partitionId, byte[] message);
+
     int getPartitionCount();
+
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
index c0b63ef..be60597 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
@@ -7,15 +7,15 @@ package org.apache.s4.base;
  */
 public interface Listener {
 
-	/*
-	 * Perform blocking receive on the appropriate communication channel
-	 * 
-	 * @return
-	 * <ul><li> byte[] message returned by the channel </li>
-	 * <li> null if the associated blocking thread is interrupted </li>
-	 * </ul>
-	 */
-	byte[] recv();
+    /*
+     * Perform blocking receive on the appropriate communication channel
+     * 
+     * @return <ul><li> byte[] message returned by the channel </li> <li> null if the associated blocking thread is
+     * interrupted </li> </ul>
+     */
+    byte[] recv();
 
-	public int getPartitionId();
+    public int getPartitionId();
+
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
index 089503d..7218afe 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
@@ -9,92 +9,95 @@ import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
 public class QueueingEmitter implements Emitter, Runnable {
-	private Emitter emitter;
-	private BlockingQueue<MessageHolder> queue;
-	private long dropCount = 0;
-	private volatile Thread thread;
-
-	@Inject
-	public QueueingEmitter(@Named("ll") Emitter emitter,
-			@Named("comm.queue_emmiter_size") int queueSize) {
-		this.emitter = emitter;
-		queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
-	}
-
-	public long getDropCount() {
-		return dropCount;
-	}
-
-	public void start() {
-		if (thread != null) {
-			throw new IllegalStateException(
-					"QueueingEmitter is already started");
-		}
-		thread = new Thread(this, "QueueingEmitter");
-		thread.start();
-	}
-
-	public void stop() {
-		if (thread == null) {
-			throw new IllegalStateException(
-					"QueueingEmitter is already stopped");
-		}
-		thread.interrupt();
-		thread = null;
-	}
-
-	@Override
-	public boolean send(int partitionId, byte[] message) {
-		MessageHolder mh = new MessageHolder(partitionId, message);
-		if (!queue.offer(mh)) {
-			dropCount++;
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	public void run() {
-		while (!Thread.interrupted()) {
-			try {
-				MessageHolder mh = queue.take();
-				// System.out.println("QueueingEmitter: Sending message on low-level emitter");
-				emitter.send(mh.getPartitionId(), mh.getMessage());
-			} catch (InterruptedException ie) {
-				Thread.currentThread().interrupt();
-			}
-		}
-	}
-
-	public int getPartitionCount() {
-		return emitter.getPartitionCount();
-	}
-
-	class MessageHolder {
-		private int partitionId;
-		private byte[] message;
-
-		public int getPartitionId() {
-			return partitionId;
-		}
-
-		public void setPartitionId(int partitionId) {
-			this.partitionId = partitionId;
-		}
-
-		public byte[] getMessage() {
-			return message;
-		}
-
-		public void setMessage(byte[] message) {
-			this.message = message;
-		}
-
-		public MessageHolder(int partitionId, byte[] message) {
-			super();
-			this.partitionId = partitionId;
-			this.message = message;
-		}
-	}
+    private Emitter emitter;
+    private BlockingQueue<MessageHolder> queue;
+    private long dropCount = 0;
+    private volatile Thread thread;
+
+    @Inject
+    public QueueingEmitter(@Named("ll") Emitter emitter, @Named("comm.queue_emmiter_size") int queueSize) {
+        this.emitter = emitter;
+        queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
+    }
+
+    public long getDropCount() {
+        return dropCount;
+    }
+
+    public void start() {
+        if (thread != null) {
+            throw new IllegalStateException("QueueingEmitter is already started");
+        }
+        thread = new Thread(this, "QueueingEmitter");
+        thread.start();
+    }
+
+    public void stop() {
+        if (thread == null) {
+            throw new IllegalStateException("QueueingEmitter is already stopped");
+        }
+        thread.interrupt();
+        thread = null;
+    }
+
+    @Override
+    public boolean send(int partitionId, byte[] message) {
+        MessageHolder mh = new MessageHolder(partitionId, message);
+        if (!queue.offer(mh)) {
+            dropCount++;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public void run() {
+        while (!Thread.interrupted()) {
+            try {
+                MessageHolder mh = queue.take();
+                // System.out.println("QueueingEmitter: Sending message on low-level emitter");
+                emitter.send(mh.getPartitionId(), mh.getMessage());
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    public int getPartitionCount() {
+        return emitter.getPartitionCount();
+    }
+
+    class MessageHolder {
+        private int partitionId;
+        private byte[] message;
+
+        public int getPartitionId() {
+            return partitionId;
+        }
+
+        public void setPartitionId(int partitionId) {
+            this.partitionId = partitionId;
+        }
+
+        public byte[] getMessage() {
+            return message;
+        }
+
+        public void setMessage(byte[] message) {
+            this.message = message;
+        }
+
+        public MessageHolder(int partitionId, byte[] message) {
+            super();
+            this.partitionId = partitionId;
+            this.message = message;
+        }
+    }
+
+    @Override
+    public void close() {
+        // TODO Auto-generated method stub
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
index 8d57a9e..46f0a8e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
@@ -1,6 +1,5 @@
 package org.apache.s4.comm;
 
-
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -16,8 +15,7 @@ public class QueueingListener implements Listener, Runnable {
     private volatile Thread thread;
 
     @Inject
-    public QueueingListener(@Named("ll") Listener listener,
-            @Named("comm.queue_listener_size") int queueSize) {
+    public QueueingListener(@Named("ll") Listener listener, @Named("comm.queue_listener_size") int queueSize) {
         this.listener = listener;
         queue = new LinkedBlockingQueue<byte[]>(queueSize);
     }
@@ -28,8 +26,7 @@ public class QueueingListener implements Listener, Runnable {
 
     public void start() {
         if (thread != null) {
-            throw new IllegalStateException(
-                    "QueueingListener is already started");
+            throw new IllegalStateException("QueueingListener is already started");
         }
         thread = new Thread(this, "QueueingListener");
         thread.start();
@@ -37,8 +34,7 @@ public class QueueingListener implements Listener, Runnable {
 
     public void stop() {
         if (thread == null) {
-            throw new IllegalStateException(
-                    "QueueingListener is already stopped");
+            throw new IllegalStateException("QueueingListener is already stopped");
         }
         thread.interrupt();
         thread = null;
@@ -69,4 +65,10 @@ public class QueueingListener implements Listener, Runnable {
             }
         }
     }
+
+    @Override
+    public void close() {
+        // TODO Auto-generated method stub
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
index d59eabe..74b7345 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
@@ -4,11 +4,11 @@ import org.apache.s4.base.Emitter;
 
 public class LoopBackEmitter implements Emitter {
     private LoopBackListener listener;
-    
+
     public LoopBackEmitter(LoopBackListener listener) {
         this.listener = listener;
     }
-    
+
     @Override
     public boolean send(int partitionId, byte[] message) {
         listener.put(message);
@@ -20,4 +20,10 @@ public class LoopBackEmitter implements Emitter {
         return 1;
     }
 
+    @Override
+    public void close() {
+        // TODO Auto-generated method stub
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java
index b312485..8077231 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java
@@ -1,19 +1,18 @@
 package org.apache.s4.comm.loopback;
 
-
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 
 import org.apache.s4.base.Listener;
 
 public class LoopBackListener implements Listener {
-    
+
     private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
 
     @Override
     public byte[] recv() {
         try {
-            //System.out.println("LoopBackListener: Taking message from handoff queue");
+            // System.out.println("LoopBackListener: Taking message from handoff queue");
             return handoffQueue.take();
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
@@ -24,14 +23,20 @@ public class LoopBackListener implements Listener {
     public int getPartitionId() {
         return 0;
     }
-    
+
     public void put(byte[] message) {
         try {
-            //System.out.println("LoopBackListener: putting message into handoffqueue");
+            // System.out.println("LoopBackListener: putting message into handoffqueue");
             handoffQueue.put(message);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
-        }        
+        }
+    }
+
+    @Override
+    public void close() {
+        // TODO Auto-generated method stub
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5960fc1..ba48a28 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -2,9 +2,10 @@ package org.apache.s4.comm.tcp;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
 import java.util.Hashtable;
 import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.s4.base.Emitter;
@@ -25,6 +26,8 @@ import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
 import org.slf4j.Logger;
@@ -32,76 +35,57 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashBiMap;
 import com.google.inject.Inject;
+import com.google.inject.name.Named;
 
-public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
-    private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
-    private static final int BUFFER_SIZE = 10;
-    private static final int NUM_RETRIES = 10;
+/*
+ * TCPEmitter - Uses TCP to send messages across to other partitions.
+ *            - Message ordering between partitions is preserved.
+ *            - For efficiency, NettyEmitter.send() queues the messages partition-wise, 
+ *              a threadPool sends the messages asynchronously; message dequeued only on success.
+ *            - Tolerates topology changes, partition re-mapping, and network glitches.
+ */
 
+public class TCPEmitter implements Emitter, TopologyChangeListener {
+    private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
+    private final int numRetries = 10;
+    private final int bufferSize;
     private Topology topology;
     private final ClientBootstrap bootstrap;
 
-    static class MessageQueuesPerPartition {
-        private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
-        private boolean bounded;
-
-        MessageQueuesPerPartition(boolean bounded) {
-            this.bounded = bounded;
-        }
-
-        private boolean add(int partitionId, byte[] message) {
-            Queue<byte[]> messages = queues.get(partitionId);
-
-            if (messages == null) {
-                messages = new ArrayDeque<byte[]>();
-                queues.put(partitionId, messages);
-            }
-
-            if (bounded && messages.size() >= BUFFER_SIZE) {
-                // Too many messages already queued
-                return false;
-            }
-
-            messages.offer(message);
-            return true;
-        }
-
-        private byte[] peek(int partitionId) {
-            Queue<byte[]> messages = queues.get(partitionId);
-
-            try {
-                return messages.peek();
-            } catch (NullPointerException npe) {
-                return null;
-            }
-        }
+    /*
+     * Channel used to send messages to each partition
+     */
+    private HashBiMap<Integer, Channel> partitionChannelMap;
 
-        private void remove(int partitionId) {
-            Queue<byte[]> messages = queues.get(partitionId);
+    /*
+     * Node hosting each partition
+     */
+    private HashBiMap<Integer, ClusterNode> partitionNodeMap;
 
-            if (messages.isEmpty()) {
-                logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
-                return;
-            }
+    /*
+     * Messages to be sent, stored per partition
+     */
+    private Hashtable<Integer, SendQueue> sendQueues;
 
-            if (messages != null)
-                messages.remove();
-        }
-    }
-
-    private HashBiMap<Integer, Channel> partitionChannelMap;
-    private HashBiMap<Integer, ClusterNode> partitionNodeMap;
-    private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
+    /*
+     * Thread pool to actually send messages
+     */
+    private ExecutorService sendService = Executors.newCachedThreadPool();
 
     @Inject
-    public TCPEmitter(Topology topology) throws InterruptedException {
+    public TCPEmitter(Topology topology, @Named("tcp.partition.queue_size") int bufferSize) throws InterruptedException {
+        this.bufferSize = bufferSize;
         this.topology = topology;
         topology.addListener(this);
         int clusterSize = this.topology.getTopology().getNodes().size();
 
         partitionChannelMap = HashBiMap.create(clusterSize);
         partitionNodeMap = HashBiMap.create(clusterSize);
+        sendQueues = new Hashtable<Integer, SendQueue>(clusterSize);
 
+        /*
+         * Initialize netty related structures
+         */
         ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                 Executors.newCachedThreadPool());
 
@@ -112,13 +96,126 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
             public ChannelPipeline getPipeline() {
                 ChannelPipeline p = Channels.pipeline();
                 p.addLast("1", new LengthFieldPrepender(4));
-                p.addLast("2", new TestHandler());
+                p.addLast("2", new NotifyChannelInterestChange());
                 return p;
             }
         });
 
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("keepAlive", true);
+        bootstrap.setOption("reuseAddress", true);
+        bootstrap.setOption("connectTimeoutMillis", 100);
+    }
+
+    private static class Message implements ChannelFutureListener {
+        private final SendQueue sendQ;
+        private final byte[] message;
+        private boolean sendInProcess;
+
+        Message(SendQueue sendQ, byte[] message) {
+            this.sendQ = sendQ;
+            this.message = message;
+            this.sendInProcess = false;
+        }
+
+        private void sendMessage() {
+            if (sendInProcess)
+                return;
+
+            sendQ.emitter.sendMessage(sendQ.partitionId, this);
+            sendInProcess = true;
+        }
+
+        @Override
+        public synchronized void operationComplete(ChannelFuture future) throws Exception {
+            if (future.isSuccess()) {
+                synchronized (sendQ.messages) {
+                    sendQ.messages.remove(this);
+                }
+                return;
+            }
+
+            if (future.isCancelled()) {
+                logger.error("Send I/O cancelled to " + future.getChannel().getRemoteAddress());
+            }
+
+            // failed operation
+            sendInProcess = false;
+            future.getChannel().close().awaitUninterruptibly();
+            sendQ.spawnSendThread();
+        }
+    }
+
+    private static class SendQueue {
+        private final TCPEmitter emitter;
+        private final int partitionId;
+        private final int bufferSize;
+        private Queue<Message> messages;
+
+        private boolean sendThreadRecheck = false;
+        private Boolean sendThreadActive = false;
+
+        SendQueue(TCPEmitter emitter, int partitionId, int bufferSize) {
+            this.emitter = emitter;
+            this.partitionId = partitionId;
+            this.bufferSize = bufferSize;
+            this.messages = new ArrayBlockingQueue<Message>(this.bufferSize);
+        }
+
+        private void spawnSendThread() {
+            synchronized (sendThreadActive) {
+                if (sendThreadActive) {
+                    sendThreadRecheck = true;
+                } else {
+                    sendThreadActive = true;
+                    emitter.sendService.execute(new SendThread(this));
+                }
+            }
+        }
+
+        private boolean offer(byte[] message) {
+            Message m = new Message(this, message);
+            synchronized (messages) {
+                if (messages.offer(m)) {
+                    spawnSendThread();
+                    return true;
+                } else
+                    return false;
+            }
+        }
+
+        private void sendMessagesInQueue() {
+            synchronized (messages) {
+                for (Message message : messages) {
+                    message.sendMessage();
+                }
+            }
+        }
+    }
+
+    private static class SendThread extends Thread {
+        private final SendQueue sendQ;
+
+        SendThread(SendQueue sendQ) {
+            this.sendQ = sendQ;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                sendQ.sendMessagesInQueue();
+
+                synchronized (sendQ.sendThreadActive) {
+                    if (sendQ.sendThreadRecheck) {
+                        sendQ.sendThreadRecheck = false;
+                        continue;
+                    } else {
+                        sendQ.sendThreadActive = false;
+                        return;
+                    }
+                }
+            }
+        }
     }
 
     private boolean connectTo(Integer partitionId) {
@@ -134,7 +231,7 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
             return false;
         }
 
-        for (int retries = 0; retries < NUM_RETRIES; retries++) {
+        for (int retries = 0; retries < numRetries; retries++) {
             ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(), clusterNode
                     .getPort()));
             f.awaitUninterruptibly();
@@ -153,90 +250,77 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
         return false;
     }
 
-    private void writeMessageToChannel(Channel channel, int partitionId, byte[] message) {
-        ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
-        buffer.writeBytes(message);
-        ChannelFuture f = channel.write(buffer);
-        f.addListener(this);
-    }
+    private void sendMessage(int partitionId, Message m) {
+        ChannelBuffer buffer = ChannelBuffers.buffer(m.message.length);
+        buffer.writeBytes(m.message);
 
-    private final Object sendLock = new Object();
-
-    @Override
-    public boolean send(int partitionId, byte[] message) {
-        Channel channel = partitionChannelMap.get(partitionId);
-        if (channel == null) {
-            if (connectTo(partitionId)) {
-                channel = partitionChannelMap.get(partitionId);
-            } else {
-                // could not connect, queue to the partitionBuffer
-                return queuedMessages.add(partitionId, message);
+        while (true) {
+            if (!partitionChannelMap.containsKey(partitionId)) {
+                connectTo(partitionId);
+                continue;
             }
-        }
 
-        /*
-         * Try limiting the size of the send queue inside Netty
-         */
-        if (!channel.isWritable()) {
-            synchronized (sendLock) {
-                // check again now that we have the lock
-                while (!channel.isWritable()) {
+            SendQueue sendQ = sendQueues.get(partitionId);
+            synchronized (sendQ) {
+                if (!partitionChannelMap.get(partitionId).isWritable()) {
                     try {
-                        sendLock.wait();
-                    } catch (InterruptedException ie) {
-                        return false;
+                        logger.debug("Waiting for channel to partition {} to become writable", partitionId);
+                        sendQ.wait();
+                    } catch (InterruptedException e) {
+                        continue;
                     }
                 }
             }
+
+            Channel c = partitionChannelMap.get(partitionId);
+            if (c != null && c.isWritable()) {
+                c.write(buffer).addListener(m);
+                break;
+            }
         }
+    }
 
-        /*
-         * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
-         * buffered messages, and (3) the Current Message
-         * 
-         * Once the channel returns success delete from the messagesOnTheWire
-         */
-        byte[] messageBeingSent = null;
-        // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
-        // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
-        // }
-
-        while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
-            writeMessageToChannel(channel, partitionId, messageBeingSent);
-            queuedMessages.remove(partitionId);
+    @Override
+    public boolean send(int partitionId, byte[] message) {
+        if (!sendQueues.containsKey(partitionId)) {
+            SendQueue sendQueue = new SendQueue(this, partitionId, this.bufferSize);
+            sendQueues.put(partitionId, sendQueue);
         }
 
-        writeMessageToChannel(channel, partitionId, message);
-        return true;
+        SendQueue sendQueue = sendQueues.get(partitionId);
+        return sendQueue.offer(message);
     }
 
-    @Override
-    public void operationComplete(ChannelFuture f) {
-        int partitionId = partitionChannelMap.inverse().get(f.getChannel());
-        if (f.isSuccess()) {
-            // messagesOnTheWire.remove(partitionId);
+    public void close() {
+        ChannelGroup cg = new DefaultChannelGroup();
+        synchronized (partitionChannelMap) {
+            cg.addAll(partitionChannelMap.values());
+            partitionChannelMap.clear();
         }
+        cg.close().awaitUninterruptibly();
+        bootstrap.releaseExternalResources();
+    }
 
-        if (f.isCancelled()) {
-            logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
-        } else if (!f.isSuccess()) {
-            logger.error("Exception on I/O operation", f.getCause());
-            logger.error(String.format("I/O on partition %d failed!", partitionId));
-            partitionChannelMap.remove(partitionId);
+    protected void closeChannel(int partition) {
+        Channel c = partitionChannelMap.remove(partition);
+        if (c != null) {
+            c.close().awaitUninterruptibly();
         }
     }
 
     @Override
     public void onChange() {
-        /*
-         * Close the channels that correspond to changed partitions and update partitionNodeMap
-         */
         for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
             Integer partition = clusterNode.getPartition();
+            if (partition == null) {
+                logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
+                return;
+            }
+
             ClusterNode oldNode = partitionNodeMap.get(partition);
 
             if (oldNode != null && !oldNode.equals(clusterNode)) {
-                partitionChannelMap.remove(partition).close();
+                closeChannel(partition);
             }
 
             partitionNodeMap.forcePut(partition, clusterNode);
@@ -249,14 +333,14 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
         return topology.getTopology().getPartitionCount();
     }
 
-    class TestHandler extends SimpleChannelHandler {
+    class NotifyChannelInterestChange extends SimpleChannelHandler {
         @Override
         public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
-            // logger.info(String.format("%08x %08x %08x", e.getValue(),
-            // e.getChannel().getInterestOps(), Channel.OP_WRITE));
-            synchronized (sendLock) {
-                if (e.getChannel().isWritable()) {
-                    sendLock.notify();
+            Channel c = e.getChannel();
+            SendQueue sendQ = sendQueues.get(partitionChannelMap.inverse().get(c));
+            synchronized (sendQ) {
+                if (c.isWritable()) {
+                    sendQ.notify();
                 }
             }
             ctx.sendUpstream(e);
@@ -266,9 +350,10 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
         public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
             Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
             if (partitionId == null) {
-                logger.error("Error on mystery channel!!");
+                return;
             }
             logger.error("Error on channel to partition " + partitionId);
+            partitionChannelMap.remove(partitionId);
 
             try {
                 throw event.getCause();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index b3776e1..ee50019 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -10,6 +10,7 @@ import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
@@ -18,6 +19,8 @@ import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 import org.slf4j.Logger;
@@ -25,61 +28,70 @@ import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 
-
 public class TCPListener implements Listener {
+    private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
     private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
     private ClusterNode node;
-    private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
-    
+    private ServerBootstrap bootstrap;
+    private final ChannelGroup channelGroup = new DefaultChannelGroup();
+
     @Inject
     public TCPListener(Assignment assignment) {
         // wait for an assignment
         node = assignment.assignClusterNode();
-        
-        ChannelFactory factory =
-            new NioServerSocketChannelFactory(
-                    Executors.newCachedThreadPool(),
-                    Executors.newCachedThreadPool());
 
-        ServerBootstrap bootstrap = new ServerBootstrap(factory);
+        ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool());
+
+        bootstrap = new ServerBootstrap(factory);
 
         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
             public ChannelPipeline getPipeline() {
                 ChannelPipeline p = Channels.pipeline();
                 p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
                 p.addLast("2", new ChannelHandler(handoffQueue));
-                
+
                 return p;
             }
         });
 
         bootstrap.setOption("child.tcpNoDelay", true);
         bootstrap.setOption("child.keepAlive", true);
-        
-        bootstrap.bind(new InetSocketAddress(node.getPort()));
+        bootstrap.setOption("child.reuseAddress", true);
+        bootstrap.setOption("child.connectTimeoutMillis", 100);
+        bootstrap.setOption("readWriteFair", true);
+
+        Channel c = bootstrap.bind(new InetSocketAddress(node.getPort()));
+        channelGroup.add(c);
     }
-    
+
     public byte[] recv() {
         try {
-            return handoffQueue.take();
+            byte[] msg = handoffQueue.take();
+            return msg;
         } catch (InterruptedException e) {
-        	return null;
+            return null;
         }
     }
-    
+
     public int getPartitionId() {
         return node.getPartition();
     }
-    
+
+    public void close() {
+        channelGroup.close().awaitUninterruptibly();
+        bootstrap.releaseExternalResources();
+    }
+
     public class ChannelHandler extends SimpleChannelHandler {
         private BlockingQueue<byte[]> handoffQueue;
-        
+
         public ChannelHandler(BlockingQueue<byte[]> handOffQueue) {
             this.handoffQueue = handOffQueue;
         }
-        
-        public void messageReceived(ChannelHandlerContext ctx,
-                MessageEvent e) {
+
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+            channelGroup.add(e.getChannel());
             ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
             try {
                 handoffQueue.put(buffer.array()); // this holds up the Netty upstream I/O thread if
@@ -88,7 +100,7 @@ public class TCPListener implements Listener {
                 Thread.currentThread().interrupt();
             }
         }
-        
+
         public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
             logger.error("Error", event.getCause());
             if (context.getChannel().isOpen()) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index ab14449..f3928b8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -33,8 +33,7 @@ public class TaskSetup {
             record.putSimpleField("port", String.valueOf(1300 + i));
             record.putSimpleField("partition", String.valueOf(i));
             record.putSimpleField("cluster", clusterName);
-            zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
-                    record);
+            zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId, record);
         }
     }
 
@@ -45,13 +44,11 @@ public class TaskSetup {
         taskSetup.setup(clusterName, 10);
         String zookeeperAddress = "localhost:2181";
         for (int i = 0; i < 10; i++) {
-            AssignmentFromZK assignmentFromZK = new AssignmentFromZK(
-                    clusterName, zookeeperAddress, 30000, 30000);
-            ClusterNode assignClusterNode = assignmentFromZK
-                    .assignClusterNode();
-            System.out.println(i+"-->"+assignClusterNode);
+            AssignmentFromZK assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
+            ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+            System.out.println(i + "-->" + assignClusterNode);
         }
-        TopologyFromZK topologyFromZK=new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
+        TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
         Thread.sleep(3000);
         Cluster topology = topologyFromZK.getTopology();
         System.out.println(topology.getNodes().size());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 85b164c..1e1f229 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -77,8 +77,14 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
         synchronized (nodes) {
             for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
                 Integer partition = clusterNode.getPartition();
-                nodes.put(partition, clusterNode);
+                nodes.forcePut(partition, clusterNode);
             }
         }
     }
+
+    @Override
+    public void close() {
+        // TODO Auto-generated method stub
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index 84aa7fa..5cd62de 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -12,11 +12,10 @@ import org.apache.s4.comm.topology.ClusterNode;
 
 import com.google.inject.Inject;
 
-
 /**
  * 
  * Implementation of a simple UDP listener.
- *
+ * 
  */
 public class UDPListener implements Listener, Runnable {
 
@@ -54,8 +53,7 @@ public class UDPListener implements Listener, Runnable {
             while (!Thread.interrupted()) {
                 socket.receive(datagram);
                 byte[] data = new byte[datagram.getLength()];
-                System.arraycopy(datagram.getData(), datagram.getOffset(),
-                        data, 0, data.length);
+                System.arraycopy(datagram.getData(), datagram.getOffset(), data, 0, data.length);
                 datagram.setLength(BUFFER_LENGTH);
                 try {
                     handoffQueue.put(data);
@@ -72,7 +70,7 @@ public class UDPListener implements Listener, Runnable {
         try {
             return handoffQueue.take();
         } catch (InterruptedException e) {
-        	return null;
+            return null;
         }
     }
 
@@ -80,4 +78,10 @@ public class UDPListener implements Listener, Runnable {
         return node.getPartition();
     }
 
+    @Override
+    public void close() {
+        // TODO Auto-generated method stub
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
deleted file mode 100644
index 19b4de1..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
+++ /dev/null
@@ -1,143 +0,0 @@
-package org.apache.s4.comm;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/*
- * Test util for communication protocols.
- *
- * <ul>
- * <li> The util defines Send and Receive Threads </li>
- * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
- * <li> ReceiveThread receives all/most of these messages </li>
- * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would 
- * interrupt after a pre-defined but long enough interval </li>
- * </ul>
- * 
- */
-public class DeliveryTestUtil {
-
-    private final Emitter emitter;
-    private final Listener listener;
-    private final int interval;
-    private int numMessages;
-    private int sleepCount;
-
-    // public Thread sendThread, receiveThread;
-    private final int messagesExpected;
-
-    @Inject
-    public DeliveryTestUtil(Emitter emitter, Listener listener, @Named("emitter.send.interval") int interval,
-            @Named("emitter.send.numMessages") int numMessages, @Named("listener.recv.sleepCount") int sleepCount) {
-        this.emitter = emitter;
-        this.listener = listener;
-        this.interval = interval;
-        this.numMessages = numMessages;
-        this.sleepCount = sleepCount;
-        this.messagesExpected = numMessages * this.emitter.getPartitionCount();
-
-        // this.sendThread = new SendThread();
-        // this.receiveThread = new ReceiveThread();
-    }
-
-    public class SendThread extends Thread {
-        @Override
-        public void run() {
-            try {
-                for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
-                    for (int i = 0; i < numMessages; i++) {
-                        byte[] message = (new String("message-" + i)).getBytes();
-                        emitter.send(partition, message);
-                        Thread.sleep(interval);
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                return;
-            }
-        }
-    }
-
-    /*
-     * TimerThread - interrupts the passed thread, after specified time-interval.
-     */
-    class TimerThread extends Thread {
-        private final Thread watchThread;
-        private Integer sleepCounter;
-
-        TimerThread(Thread watchThread) {
-            this.watchThread = watchThread;
-            this.sleepCounter = new Integer(sleepCount);
-        }
-
-        public void resetSleepCounter() {
-            synchronized (this.sleepCounter) {
-                this.sleepCounter = sleepCount;
-            }
-        }
-
-        public void clearSleepCounter() {
-            synchronized (this.sleepCounter) {
-                this.sleepCounter = 0;
-            }
-        }
-
-        private int getCounter() {
-            synchronized (this.sleepCounter) {
-                return this.sleepCounter--;
-            }
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (getCounter() > 0) {
-                    Thread.sleep(interval);
-                }
-                watchThread.interrupt();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    class ReceiveThread extends Thread {
-        private int messagesReceived = 0;
-
-        @Override
-        public void run() {
-
-            // start the timer thread to interrupt if blocked for too long
-            TimerThread timer = new TimerThread(this);
-            timer.start();
-            while (messagesReceived < messagesExpected) {
-                byte[] message = listener.recv();
-                timer.resetSleepCounter();
-                if (message != null)
-                    messagesReceived++;
-                else
-                    break;
-            }
-            timer.clearSleepCounter();
-        }
-
-        private boolean moreMessages() {
-            return ((messagesExpected - messagesReceived) > 0);
-        }
-    }
-
-    public Thread newSendThread() {
-        return new SendThread();
-    }
-
-    public Thread newReceiveThread() {
-        return new ReceiveThread();
-    }
-
-    public boolean moreMessages(Thread recvThread) {
-        return ((ReceiveThread) recvThread).moreMessages();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
new file mode 100644
index 0000000..0752874
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
@@ -0,0 +1,28 @@
+package org.apache.s4.comm.tcp;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiPartitionDeliveryTest extends TCPBasedTest {
+    private static final Logger logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
+
+    public MultiPartitionDeliveryTest() {
+        super(4);
+    }
+
+    @Test
+    public void testMultiPartitionDelivery() throws InterruptedException {
+
+        startThreads();
+        waitForThreads();
+
+        int messagesNotReceived = notDeliveredMessages();
+        logger.info("# Messages not received = " + messagesNotReceived);
+        Assert.assertEquals("Guaranteed message delivery", messagesNotReceived, 0);
+
+        logger.info("Message ordering - " + messageOrdering());
+        Assert.assertTrue("Pairwise message ordering", messageOrdering());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
new file mode 100644
index 0000000..5f26f89
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
@@ -0,0 +1,32 @@
+package org.apache.s4.comm.tcp;
+
+import org.apache.s4.comm.util.PartitionInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetworkGlitchTest extends TCPBasedTest {
+    private static final Logger logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
+
+    @Test
+    public void testResilienceToNetworkGlitches() throws InterruptedException {
+        PartitionInfo util = partitions[0];
+
+        startThreads();
+
+        for (int i = 1; i < 10; i++) {
+            Thread.sleep(100);
+            ((TCPEmitter) util.emitter).closeChannel(0);
+        }
+
+        waitForThreads();
+
+        int messagesNotReceived = notDeliveredMessages();
+        logger.info("# Messages not received = " + messagesNotReceived);
+        Assert.assertEquals("Guaranteed message delivery", messagesNotReceived, 0);
+
+        logger.info("Message ordering - " + messageOrdering());
+        Assert.assertTrue("Pairwise message ordering", messageOrdering());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
new file mode 100644
index 0000000..93638a4
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
@@ -0,0 +1,23 @@
+package org.apache.s4.comm.tcp;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleDeliveryTest extends TCPBasedTest {
+    private static final Logger logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
+
+    @Test
+    public void testSimpleDelivery() throws InterruptedException {
+        startThreads();
+        waitForThreads();
+
+        int messagesNotReceived = notDeliveredMessages();
+        logger.info("# Messages not received = " + messagesNotReceived);
+        Assert.assertEquals("Guaranteed message delivery", messagesNotReceived, 0);
+
+        logger.info("Message ordering - " + messageOrdering());
+        Assert.assertTrue("Pairwise message ordering", messageOrdering());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasedTest.java
new file mode 100644
index 0000000..3c5a4bc
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasedTest.java
@@ -0,0 +1,33 @@
+package org.apache.s4.comm.tcp;
+
+import org.apache.s4.comm.util.ProtocolTestUtil;
+import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+
+import com.google.inject.Guice;
+import com.google.inject.name.Names;
+
+public abstract class TCPBasedTest extends ProtocolTestUtil {
+    protected TCPBasedTest() {
+        super();
+        super.injector = Guice.createInjector(new TCPTestModule());
+    }
+
+    protected TCPBasedTest(int numTasks) {
+        super(numTasks);
+        super.injector = Guice.createInjector(new TCPTestModule());
+    }
+
+    class TCPTestModule extends ZkBasedClusterManagementTestModule {
+        TCPTestModule() {
+            super(TCPEmitter.class, TCPListener.class);
+        }
+
+        @Override
+        protected void configure() {
+            super.configure();
+            bind(Integer.class).annotatedWith(Names.named("tcp.partition.queue_size")).toInstance(256);
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(2);
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(100);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
deleted file mode 100644
index f02bf64..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.s4.comm.tcp;
-
-import java.io.IOException;
-import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.name.Names;
-
-public class TCPCommTest extends ZkBasedTest {
-    DeliveryTestUtil util;
-
-    @Before
-    public void setup() throws IOException, InterruptedException, KeeperException {
-        Injector injector = Guice.createInjector(new TCPCommTestModule());
-        util = injector.getInstance(DeliveryTestUtil.class);
-    }
-
-    class TCPCommTestModule extends ZkBasedClusterManagementTestModule {
-        TCPCommTestModule() {
-            super(TCPEmitter.class, TCPListener.class);
-        }
-
-        @Override
-        protected void configure() {
-            super.configure();
-            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
-            bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
-            bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
-        }
-    }
-
-    /**
-     * Tests the protocol. If all components function without throwing exceptions, the test passes. The test also
-     * reports the loss of messages, if any.
-     * 
-     * @throws InterruptedException
-     */
-    @Test
-    public void testTCPDelivery() throws InterruptedException {
-        try {
-            Thread sendThread = util.newSendThread();
-            Thread receiveThread = util.newReceiveThread();
-
-            // start send and receive threads
-            sendThread.start();
-            receiveThread.start();
-
-            // wait for them to finish
-            sendThread.join();
-            receiveThread.join();
-
-            Assert.assertTrue("Guaranteed message delivery", !util.moreMessages(receiveThread));
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail("TCP has failed basic functionality test");
-        }
-
-        System.out.println("Done");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
index eb5f42c..1b47285 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
@@ -8,6 +8,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import static org.junit.Assert.*;
 
 import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.ZkBasedTest;
 import org.junit.Test;
 
 public class TopologyFromZKTest extends ZKBaseTest {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
new file mode 100644
index 0000000..4e1f175
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
@@ -0,0 +1,30 @@
+package org.apache.s4.comm.udp;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiPartitionDeliveryTest extends UDPBasedTest {
+    private static final Logger logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
+
+    public MultiPartitionDeliveryTest() {
+        super(2);
+    }
+
+    /**
+     * Tests the protocol. If all components function without throwing exceptions, the test passes.
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testUDPDelivery() {
+        try {
+            startThreads();
+            waitForThreads();
+            logger.info("# Messages not received = " + notDeliveredMessages());
+        } catch (Exception e) {
+            Assert.fail("UDP Simple DeliveryTest");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
new file mode 100644
index 0000000..82b4be4
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
@@ -0,0 +1,30 @@
+package org.apache.s4.comm.udp;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleDeliveryTest extends UDPBasedTest {
+    private static final Logger logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
+
+    public SimpleDeliveryTest() {
+        super();
+    }
+
+    /**
+     * Tests the protocol. If all components function without throwing exceptions, the test passes.
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testUDPDelivery() {
+        try {
+            startThreads();
+            waitForThreads();
+            logger.info("# Messages not received = " + notDeliveredMessages());
+        } catch (Exception e) {
+            Assert.fail("UDP Simple DeliveryTest");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasedTest.java
new file mode 100644
index 0000000..9f8d8b0
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasedTest.java
@@ -0,0 +1,32 @@
+package org.apache.s4.comm.udp;
+
+import org.apache.s4.comm.util.ProtocolTestUtil;
+import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+
+import com.google.inject.Guice;
+import com.google.inject.name.Names;
+
+public abstract class UDPBasedTest extends ProtocolTestUtil {
+    protected UDPBasedTest() {
+        super();
+        super.injector = Guice.createInjector(new UDPTestModule());
+    }
+
+    protected UDPBasedTest(int numTasks) {
+        super(numTasks);
+        super.injector = Guice.createInjector(new UDPTestModule());
+    }
+
+    class UDPTestModule extends ZkBasedClusterManagementTestModule {
+        UDPTestModule() {
+            super(UDPEmitter.class, UDPListener.class);
+        }
+
+        @Override
+        protected void configure() {
+            super.configure();
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
deleted file mode 100644
index 808a357..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package org.apache.s4.comm.udp;
-
-import java.io.IOException;
-
-import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.name.Names;
-
-public class UDPCommTest extends ZkBasedTest {
-    DeliveryTestUtil util;
-
-    @Before
-    public void setup() throws IOException, InterruptedException, KeeperException {
-        Injector injector = Guice.createInjector(new UDPCommTestModule());
-        util = injector.getInstance(DeliveryTestUtil.class);
-    }
-
-    class UDPCommTestModule extends ZkBasedClusterManagementTestModule {
-        UDPCommTestModule() {
-            super(UDPEmitter.class, UDPListener.class);
-        }
-
-        @Override
-        protected void configure() {
-            super.configure();
-            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
-            bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
-            bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
-        }
-    }
-
-    /**
-     * Tests the protocol. If all components function without throwing exceptions, the test passes.
-     * 
-     * @throws InterruptedException
-     */
-    @Test
-    public void testUDPDelivery() throws InterruptedException {
-        try {
-            Thread sendThread = util.newSendThread();
-            Thread receiveThread = util.newReceiveThread();
-
-            // start send and receive threads
-            sendThread.start();
-            receiveThread.start();
-
-            // wait for them to finish
-            sendThread.join();
-            receiveThread.join();
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail("UDP has failed basic functionality test");
-        }
-        System.out.println("Done");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
new file mode 100644
index 0000000..47d1aa6
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
@@ -0,0 +1,139 @@
+package org.apache.s4.comm.util;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/*
+ * Test util for communication protocols.
+ *
+ * <ul>
+ * <li> The util defines Send and Receive Threads </li>
+ * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
+ * <li> ReceiveThread receives all/most of these messages </li>
+ * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would 
+ * interrupt after a pre-defined but long enough interval </li>
+ * </ul>
+ * 
+ */
+public class PartitionInfo {
+    private static final Logger logger = LoggerFactory.getLogger(PartitionInfo.class);
+    public Emitter emitter;
+    public Listener listener;
+    private final int interval;
+    private int numMessages;
+    private int partitionId;
+
+    public SendThread sendThread;
+    public ReceiveThread receiveThread;
+    private int messagesExpected;
+
+    @Inject
+    public PartitionInfo(Emitter emitter, Listener listener, @Named("emitter.send.interval") int interval,
+            @Named("emitter.send.numMessages") int numMessages) {
+        this.emitter = emitter;
+        this.listener = listener;
+        this.partitionId = this.listener.getPartitionId();
+        logger.debug("# Partitions = {}; Current partition = {}", this.emitter.getPartitionCount(),
+                this.listener.getPartitionId());
+
+        this.interval = interval;
+        this.numMessages = numMessages;
+        this.messagesExpected = numMessages * this.emitter.getPartitionCount();
+
+        this.sendThread = new SendThread();
+        this.receiveThread = new ReceiveThread();
+    }
+
+    public class SendThread extends Thread {
+        @Override
+        public void run() {
+            try {
+                for (int i = 0; i < numMessages; i++) {
+                    for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
+                        byte[] message = new String(partitionId + " " + i).getBytes();
+                        while (!emitter.send(partition, message)) {
+                            logger.debug("SendThread {}: Resending message to {}", partitionId, partition);
+                            Thread.sleep(interval);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                return;
+            }
+
+            logger.debug("SendThread {}: Exiting", partitionId);
+        }
+    }
+
+    private static class TimelyInterrupt extends TimerTask {
+        private final Thread watchThread;
+
+        TimelyInterrupt(Thread watchThread) {
+            this.watchThread = watchThread;
+        }
+
+        @Override
+        public void run() {
+            watchThread.interrupt();
+        }
+    }
+
+    public class ReceiveThread extends Thread {
+        private int sleepCount = 10;
+        private int messagesReceived = 0;
+        private boolean ordered = true;
+        private int receivedMessages[];
+        private Timer timer;
+
+        ReceiveThread() {
+            receivedMessages = new int[emitter.getPartitionCount()];
+            for (int i = 0; i < receivedMessages.length; i++)
+                receivedMessages[i] = -1;
+            timer = new Timer();
+        }
+
+        @Override
+        public void run() {
+            timer.schedule(new TimelyInterrupt(this), 10 * interval);
+
+            while (messagesReceived < messagesExpected) {
+                byte[] message = listener.recv();
+                if (message != null) {
+                    messagesReceived++;
+                    String msgString = new String(message);
+                    String[] msgTokens = msgString.split(" ");
+                    int senderPartition = Integer.parseInt(msgTokens[0]);
+                    int receivedMsg = Integer.parseInt(msgTokens[1]);
+                    if (receivedMsg < receivedMessages[senderPartition])
+                        ordered = false;
+                    receivedMessages[senderPartition] = receivedMsg;
+                } else if (sleepCount-- > 0) {
+                    continue;
+                } else {
+                    break;
+                }
+            }
+            timer.cancel();
+            timer.purge();
+
+            logger.debug("ReceiveThread {}: Exiting with {} messages left", partitionId, moreMessages());
+        }
+
+        public boolean ordered() {
+            return ordered;
+        }
+
+        public int moreMessages() {
+            return (messagesExpected - messagesReceived);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
new file mode 100644
index 0000000..60c5bba
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
@@ -0,0 +1,76 @@
+package org.apache.s4.comm.util;
+
+import java.io.IOException;
+
+import org.apache.s4.comm.util.PartitionInfo;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.Before;
+
+import com.google.inject.Injector;
+
+public abstract class ProtocolTestUtil extends ZkBasedTest {
+    protected PartitionInfo[] partitions;
+    protected Injector injector;
+
+    protected ProtocolTestUtil() {
+        super();
+    }
+
+    protected ProtocolTestUtil(int numTasks) {
+        super(numTasks);
+    }
+
+    @Before
+    public void setup() throws IOException, InterruptedException, KeeperException {
+        partitions = new PartitionInfo[super.numTasks];
+        for (int i = 0; i < this.numTasks; i++) {
+            partitions[i] = injector.getInstance(PartitionInfo.class);
+        }
+    }
+
+    protected void startThreads() {
+        for (PartitionInfo partition : partitions) {
+            partition.sendThread.start();
+            partition.receiveThread.start();
+        }
+    }
+
+    protected void waitForThreads() throws InterruptedException {
+        for (PartitionInfo partition : partitions) {
+            partition.sendThread.join();
+            partition.receiveThread.join();
+        }
+    }
+
+    protected int notDeliveredMessages() {
+        int sum = 0;
+        for (PartitionInfo partition : partitions) {
+            sum += partition.receiveThread.moreMessages();
+        }
+        return sum;
+    }
+
+    protected boolean messageOrdering() {
+        for (PartitionInfo partition : partitions) {
+            if (!partition.receiveThread.ordered())
+                return false;
+        }
+        return true;
+    }
+
+    @After
+    public void tearDown() {
+        for (PartitionInfo partition : partitions) {
+            if (partition.emitter != null) {
+                partition.emitter.close();
+                partition.emitter = null;
+            }
+            if (partition.listener != null) {
+                partition.listener.close();
+                partition.listener = null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
index ebcf388..272f9b7 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
@@ -14,13 +14,13 @@ import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromFile;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.Topology;
 import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
@@ -69,9 +69,8 @@ public abstract class FileBasedClusterManagementTestModule<T> extends AbstractMo
         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
         bind(Assignment.class).to(AssignmentFromFile.class);
         bind(Topology.class).to(TopologyFromFile.class);
-        bind(Emitter.class).to(UDPEmitter.class);
-        bind(Listener.class).to(UDPListener.class);
-
+        bind(Emitter.class).to(TCPEmitter.class);
+        bind(Listener.class).to(TCPListener.class);
+        bind(Integer.class).annotatedWith(Names.named("tcp.partition.queue_size")).toInstance(256);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
index a368db8..1e8cf2f 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -19,19 +19,23 @@ import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.Topology;
 import org.apache.s4.comm.topology.TopologyFromZK;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.name.Names;
 
 public class ZkBasedClusterManagementTestModule extends AbstractModule {
-
+    private static final Logger logger = LoggerFactory.getLogger(ZkBasedClusterManagementTestModule.class);
     protected PropertiesConfiguration config = null;
 
     private Class<? extends Emitter> emitterClass = null;
     private Class<? extends Listener> listenerClass = null;
 
     protected ZkBasedClusterManagementTestModule() {
+        this.emitterClass = TCPEmitter.class;
+        this.listenerClass = TCPListener.class;
     }
 
     protected ZkBasedClusterManagementTestModule(Class<? extends Emitter> emitterClass,
@@ -72,16 +76,13 @@ public class ZkBasedClusterManagementTestModule extends AbstractModule {
         bind(Assignment.class).to(AssignmentFromZK.class);
         bind(Topology.class).to(TopologyFromZK.class);
 
-        if (this.emitterClass != null) {
-            bind(Emitter.class).to(this.emitterClass);
-        } else {
-            bind(Emitter.class).to(TCPEmitter.class);
-        }
+        bind(Emitter.class).to(this.emitterClass);
+        bind(Listener.class).to(this.listenerClass);
 
-        if (this.listenerClass != null) {
-            bind(Listener.class).to(this.listenerClass);
-        } else {
-            bind(Listener.class).to(TCPListener.class);
+        if (this.emitterClass.equals(TCPEmitter.class)) {
+            bind(Integer.class).annotatedWith(Names.named("tcp.partition.queue_size")).toInstance(256);
         }
+
+        logger.info("Emitter set to - {}; Listener set to - {}", this.emitterClass, this.listenerClass);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index 02c5467..3b1e45a 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -6,6 +6,7 @@ import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.s4.comm.tools.TaskSetup;
+import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -13,15 +14,26 @@ import org.slf4j.LoggerFactory;
 public abstract class ZkBasedTest {
     private static final Logger logger = LoggerFactory.getLogger(ZkBasedTest.class);
     private ZkServer zkServer;
+    protected String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
+
+    private static final String clusterName = "s4-test-cluster";
+    protected final int numTasks;
+
+    protected ZkBasedTest() {
+        this.numTasks = 1;
+    }
+
+    protected ZkBasedTest(int numTasks) {
+        this.numTasks = numTasks;
+    }
 
     @Before
-    public void prepare() {
+    public void setupZkBasedTest() {
         String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
         String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
         CommTestUtils.cleanupTmpDirs();
 
         IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
-
             @Override
             public void createDefaultNameSpace(ZkClient zkClient) {
 
@@ -32,17 +44,17 @@ public abstract class ZkBasedTest {
         zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
         zkServer.start();
 
-        logger.info("Starting Zookeeper Client 1");
-        String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
-        @SuppressWarnings("unused")
-        ZkClient zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
-
-        ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
-        zkClient2.getCreationTime("/");
-
         TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
-        final String clusterName = "s4-test-cluster";
         taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 1);
+        taskSetup.setup(clusterName, this.numTasks);
+    }
+
+    @After
+    public void cleanupZkBasedTest() {
+        if (zkServer != null) {
+            zkServer.shutdown();
+            zkServer = null;
+        }
+        CommTestUtils.cleanupTmpDirs();
     }
 }


Mime
View raw message