S4-7 Guaranteed, ordered, robust TCP protocol - send() queues the messages - an asynchronous thread pool sends it using Netty - on success, the delivered message is notified - added close() method to Emitter and Listener - added tests to check the above-mentioned properties
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/8e2427b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/8e2427b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/8e2427b0
Branch: refs/heads/S4-7
Commit: 8e2427b02a09460891bc6d01f9fc41b42304fb58
Parents: 5e38aa2
Author: Karthik Kambatla <kkambatl@cs.purdue.edu>
Authored: Wed Jan 25 19:00:00 2012 -0500
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Tue Jan 31 18:55:17 2012 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/base/Emitter.java | 18 +-
.../src/main/java/org/apache/s4/base/Listener.java | 20 +-
.../java/org/apache/s4/comm/QueueingEmitter.java | 177 ++++----
.../java/org/apache/s4/comm/QueueingListener.java | 16 +-
.../apache/s4/comm/loopback/LoopBackEmitter.java | 10 +-
.../apache/s4/comm/loopback/LoopBackListener.java | 17 +-
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 329 +++++++++------
.../java/org/apache/s4/comm/tcp/TCPListener.java | 56 ++-
.../java/org/apache/s4/comm/tools/TaskSetup.java | 13 +-
.../java/org/apache/s4/comm/udp/UDPEmitter.java | 8 +-
.../java/org/apache/s4/comm/udp/UDPListener.java | 14 +-
.../java/org/apache/s4/comm/DeliveryTestUtil.java | 143 -------
.../s4/comm/tcp/MultiPartitionDeliveryTest.java | 28 ++
.../org/apache/s4/comm/tcp/NetworkGlitchTest.java | 32 ++
.../org/apache/s4/comm/tcp/SimpleDeliveryTest.java | 23 +
.../java/org/apache/s4/comm/tcp/TCPBasedTest.java | 33 ++
.../java/org/apache/s4/comm/tcp/TCPCommTest.java | 69 ---
.../s4/comm/topology/TopologyFromZKTest.java | 1 +
.../s4/comm/udp/MultiPartitionDeliveryTest.java | 30 ++
.../org/apache/s4/comm/udp/SimpleDeliveryTest.java | 30 ++
.../java/org/apache/s4/comm/udp/UDPBasedTest.java | 32 ++
.../java/org/apache/s4/comm/udp/UDPCommTest.java | 65 ---
.../org/apache/s4/comm/util/PartitionInfo.java | 139 ++++++
.../org/apache/s4/comm/util/ProtocolTestUtil.java | 76 ++++
.../FileBasedClusterManagementTestModule.java | 11 +-
.../ZkBasedClusterManagementTestModule.java | 21 +-
.../java/org/apache/s4/fixtures/ZkBasedTest.java | 36 +-
27 files changed, 865 insertions(+), 582 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index 0570c14..ad511c9 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -1,13 +1,17 @@
package org.apache.s4.base;
public interface Emitter {
-
- /*
- * @param partitionId - destination partition
- * @param message - message payload that needs to be sent
- * @return - true - if message is sent across successfully
- * - false - if send fails
- */
+
+ /*
+ * @param partitionId - destination partition
+ *
+ * @param message - message payload that needs to be sent
+ *
+ * @return - true - if message is sent across successfully - false - if send fails
+ */
boolean send(int partitionId, byte[] message);
+
int getPartitionCount();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
index c0b63ef..be60597 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
@@ -7,15 +7,15 @@ package org.apache.s4.base;
*/
public interface Listener {
- /*
- * Perform blocking receive on the appropriate communication channel
- *
- * @return
- * <ul><li> byte[] message returned by the channel </li>
- * <li> null if the associated blocking thread is interrupted </li>
- * </ul>
- */
- byte[] recv();
+ /*
+ * Perform blocking receive on the appropriate communication channel
+ *
+ * @return <ul><li> byte[] message returned by the channel </li> <li> null if the associated blocking thread is
+ * interrupted </li> </ul>
+ */
+ byte[] recv();
- public int getPartitionId();
+ public int getPartitionId();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
index 089503d..7218afe 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
@@ -9,92 +9,95 @@ import com.google.inject.Inject;
import com.google.inject.name.Named;
public class QueueingEmitter implements Emitter, Runnable {
- private Emitter emitter;
- private BlockingQueue<MessageHolder> queue;
- private long dropCount = 0;
- private volatile Thread thread;
-
- @Inject
- public QueueingEmitter(@Named("ll") Emitter emitter,
- @Named("comm.queue_emmiter_size") int queueSize) {
- this.emitter = emitter;
- queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
- }
-
- public long getDropCount() {
- return dropCount;
- }
-
- public void start() {
- if (thread != null) {
- throw new IllegalStateException(
- "QueueingEmitter is already started");
- }
- thread = new Thread(this, "QueueingEmitter");
- thread.start();
- }
-
- public void stop() {
- if (thread == null) {
- throw new IllegalStateException(
- "QueueingEmitter is already stopped");
- }
- thread.interrupt();
- thread = null;
- }
-
- @Override
- public boolean send(int partitionId, byte[] message) {
- MessageHolder mh = new MessageHolder(partitionId, message);
- if (!queue.offer(mh)) {
- dropCount++;
- return true;
- } else {
- return false;
- }
- }
-
- public void run() {
- while (!Thread.interrupted()) {
- try {
- MessageHolder mh = queue.take();
- // System.out.println("QueueingEmitter: Sending message on low-level emitter");
- emitter.send(mh.getPartitionId(), mh.getMessage());
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public int getPartitionCount() {
- return emitter.getPartitionCount();
- }
-
- class MessageHolder {
- private int partitionId;
- private byte[] message;
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public byte[] getMessage() {
- return message;
- }
-
- public void setMessage(byte[] message) {
- this.message = message;
- }
-
- public MessageHolder(int partitionId, byte[] message) {
- super();
- this.partitionId = partitionId;
- this.message = message;
- }
- }
+ private Emitter emitter;
+ private BlockingQueue<MessageHolder> queue;
+ private long dropCount = 0;
+ private volatile Thread thread;
+
+ @Inject
+ public QueueingEmitter(@Named("ll") Emitter emitter, @Named("comm.queue_emmiter_size") int queueSize) {
+ this.emitter = emitter;
+ queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
+ }
+
+ public long getDropCount() {
+ return dropCount;
+ }
+
+ public void start() {
+ if (thread != null) {
+ throw new IllegalStateException("QueueingEmitter is already started");
+ }
+ thread = new Thread(this, "QueueingEmitter");
+ thread.start();
+ }
+
+ public void stop() {
+ if (thread == null) {
+ throw new IllegalStateException("QueueingEmitter is already stopped");
+ }
+ thread.interrupt();
+ thread = null;
+ }
+
+ @Override
+ public boolean send(int partitionId, byte[] message) {
+ MessageHolder mh = new MessageHolder(partitionId, message);
+ if (!queue.offer(mh)) {
+ dropCount++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ MessageHolder mh = queue.take();
+ // System.out.println("QueueingEmitter: Sending message on low-level emitter");
+ emitter.send(mh.getPartitionId(), mh.getMessage());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public int getPartitionCount() {
+ return emitter.getPartitionCount();
+ }
+
+ class MessageHolder {
+ private int partitionId;
+ private byte[] message;
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public byte[] getMessage() {
+ return message;
+ }
+
+ public void setMessage(byte[] message) {
+ this.message = message;
+ }
+
+ public MessageHolder(int partitionId, byte[] message) {
+ super();
+ this.partitionId = partitionId;
+ this.message = message;
+ }
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
index 8d57a9e..46f0a8e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
@@ -1,6 +1,5 @@
package org.apache.s4.comm;
-
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -16,8 +15,7 @@ public class QueueingListener implements Listener, Runnable {
private volatile Thread thread;
@Inject
- public QueueingListener(@Named("ll") Listener listener,
- @Named("comm.queue_listener_size") int queueSize) {
+ public QueueingListener(@Named("ll") Listener listener, @Named("comm.queue_listener_size") int queueSize) {
this.listener = listener;
queue = new LinkedBlockingQueue<byte[]>(queueSize);
}
@@ -28,8 +26,7 @@ public class QueueingListener implements Listener, Runnable {
public void start() {
if (thread != null) {
- throw new IllegalStateException(
- "QueueingListener is already started");
+ throw new IllegalStateException("QueueingListener is already started");
}
thread = new Thread(this, "QueueingListener");
thread.start();
@@ -37,8 +34,7 @@ public class QueueingListener implements Listener, Runnable {
public void stop() {
if (thread == null) {
- throw new IllegalStateException(
- "QueueingListener is already stopped");
+ throw new IllegalStateException("QueueingListener is already stopped");
}
thread.interrupt();
thread = null;
@@ -69,4 +65,10 @@ public class QueueingListener implements Listener, Runnable {
}
}
}
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
index d59eabe..74b7345 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
@@ -4,11 +4,11 @@ import org.apache.s4.base.Emitter;
public class LoopBackEmitter implements Emitter {
private LoopBackListener listener;
-
+
public LoopBackEmitter(LoopBackListener listener) {
this.listener = listener;
}
-
+
@Override
public boolean send(int partitionId, byte[] message) {
listener.put(message);
@@ -20,4 +20,10 @@ public class LoopBackEmitter implements Emitter {
return 1;
}
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java
index b312485..8077231 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackListener.java
@@ -1,19 +1,18 @@
package org.apache.s4.comm.loopback;
-
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import org.apache.s4.base.Listener;
public class LoopBackListener implements Listener {
-
+
private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
@Override
public byte[] recv() {
try {
- //System.out.println("LoopBackListener: Taking message from handoff queue");
+ // System.out.println("LoopBackListener: Taking message from handoff queue");
return handoffQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -24,14 +23,20 @@ public class LoopBackListener implements Listener {
public int getPartitionId() {
return 0;
}
-
+
public void put(byte[] message) {
try {
- //System.out.println("LoopBackListener: putting message into handoffqueue");
+ // System.out.println("LoopBackListener: putting message into handoffqueue");
handoffQueue.put(message);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- }
+ }
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5960fc1..ba48a28 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -2,9 +2,10 @@ package org.apache.s4.comm.tcp;
import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
import java.util.Hashtable;
import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.s4.base.Emitter;
@@ -25,6 +26,8 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.slf4j.Logger;
@@ -32,76 +35,57 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBiMap;
import com.google.inject.Inject;
+import com.google.inject.name.Named;
-public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
- private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
- private static final int BUFFER_SIZE = 10;
- private static final int NUM_RETRIES = 10;
+/*
+ * TCPEmitter - Uses TCP to send messages across to other partitions.
+ * - Message ordering between partitions is preserved.
+ * - For efficiency, NettyEmitter.send() queues the messages partition-wise,
+ * a threadPool sends the messages asynchronously; message dequeued only on success.
+ * - Tolerates topology changes, partition re-mapping, and network glitches.
+ */
+public class TCPEmitter implements Emitter, TopologyChangeListener {
+ private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
+ private final int numRetries = 10;
+ private final int bufferSize;
private Topology topology;
private final ClientBootstrap bootstrap;
- static class MessageQueuesPerPartition {
- private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
- private boolean bounded;
-
- MessageQueuesPerPartition(boolean bounded) {
- this.bounded = bounded;
- }
-
- private boolean add(int partitionId, byte[] message) {
- Queue<byte[]> messages = queues.get(partitionId);
-
- if (messages == null) {
- messages = new ArrayDeque<byte[]>();
- queues.put(partitionId, messages);
- }
-
- if (bounded && messages.size() >= BUFFER_SIZE) {
- // Too many messages already queued
- return false;
- }
-
- messages.offer(message);
- return true;
- }
-
- private byte[] peek(int partitionId) {
- Queue<byte[]> messages = queues.get(partitionId);
-
- try {
- return messages.peek();
- } catch (NullPointerException npe) {
- return null;
- }
- }
+ /*
+ * Channel used to send messages to each partition
+ */
+ private HashBiMap<Integer, Channel> partitionChannelMap;
- private void remove(int partitionId) {
- Queue<byte[]> messages = queues.get(partitionId);
+ /*
+ * Node hosting each partition
+ */
+ private HashBiMap<Integer, ClusterNode> partitionNodeMap;
- if (messages.isEmpty()) {
- logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
- return;
- }
+ /*
+ * Messages to be sent, stored per partition
+ */
+ private Hashtable<Integer, SendQueue> sendQueues;
- if (messages != null)
- messages.remove();
- }
- }
-
- private HashBiMap<Integer, Channel> partitionChannelMap;
- private HashBiMap<Integer, ClusterNode> partitionNodeMap;
- private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
+ /*
+ * Thread pool to actually send messages
+ */
+ private ExecutorService sendService = Executors.newCachedThreadPool();
@Inject
- public TCPEmitter(Topology topology) throws InterruptedException {
+ public TCPEmitter(Topology topology, @Named("tcp.partition.queue_size") int bufferSize) throws InterruptedException {
+ this.bufferSize = bufferSize;
this.topology = topology;
topology.addListener(this);
int clusterSize = this.topology.getTopology().getNodes().size();
partitionChannelMap = HashBiMap.create(clusterSize);
partitionNodeMap = HashBiMap.create(clusterSize);
+ sendQueues = new Hashtable<Integer, SendQueue>(clusterSize);
+ /*
+ * Initialize netty related structures
+ */
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
@@ -112,13 +96,126 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
public ChannelPipeline getPipeline() {
ChannelPipeline p = Channels.pipeline();
p.addLast("1", new LengthFieldPrepender(4));
- p.addLast("2", new TestHandler());
+ p.addLast("2", new NotifyChannelInterestChange());
return p;
}
});
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("connectTimeoutMillis", 100);
+ }
+
+ private static class Message implements ChannelFutureListener {
+ private final SendQueue sendQ;
+ private final byte[] message;
+ private boolean sendInProcess;
+
+ Message(SendQueue sendQ, byte[] message) {
+ this.sendQ = sendQ;
+ this.message = message;
+ this.sendInProcess = false;
+ }
+
+ private void sendMessage() {
+ if (sendInProcess)
+ return;
+
+ sendQ.emitter.sendMessage(sendQ.partitionId, this);
+ sendInProcess = true;
+ }
+
+ @Override
+ public synchronized void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ synchronized (sendQ.messages) {
+ sendQ.messages.remove(this);
+ }
+ return;
+ }
+
+ if (future.isCancelled()) {
+ logger.error("Send I/O cancelled to " + future.getChannel().getRemoteAddress());
+ }
+
+ // failed operation
+ sendInProcess = false;
+ future.getChannel().close().awaitUninterruptibly();
+ sendQ.spawnSendThread();
+ }
+ }
+
+ private static class SendQueue {
+ private final TCPEmitter emitter;
+ private final int partitionId;
+ private final int bufferSize;
+ private Queue<Message> messages;
+
+ private boolean sendThreadRecheck = false;
+ private Boolean sendThreadActive = false;
+
+ SendQueue(TCPEmitter emitter, int partitionId, int bufferSize) {
+ this.emitter = emitter;
+ this.partitionId = partitionId;
+ this.bufferSize = bufferSize;
+ this.messages = new ArrayBlockingQueue<Message>(this.bufferSize);
+ }
+
+ private void spawnSendThread() {
+ synchronized (sendThreadActive) {
+ if (sendThreadActive) {
+ sendThreadRecheck = true;
+ } else {
+ sendThreadActive = true;
+ emitter.sendService.execute(new SendThread(this));
+ }
+ }
+ }
+
+ private boolean offer(byte[] message) {
+ Message m = new Message(this, message);
+ synchronized (messages) {
+ if (messages.offer(m)) {
+ spawnSendThread();
+ return true;
+ } else
+ return false;
+ }
+ }
+
+ private void sendMessagesInQueue() {
+ synchronized (messages) {
+ for (Message message : messages) {
+ message.sendMessage();
+ }
+ }
+ }
+ }
+
+ private static class SendThread extends Thread {
+ private final SendQueue sendQ;
+
+ SendThread(SendQueue sendQ) {
+ this.sendQ = sendQ;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ sendQ.sendMessagesInQueue();
+
+ synchronized (sendQ.sendThreadActive) {
+ if (sendQ.sendThreadRecheck) {
+ sendQ.sendThreadRecheck = false;
+ continue;
+ } else {
+ sendQ.sendThreadActive = false;
+ return;
+ }
+ }
+ }
+ }
}
private boolean connectTo(Integer partitionId) {
@@ -134,7 +231,7 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
return false;
}
- for (int retries = 0; retries < NUM_RETRIES; retries++) {
+ for (int retries = 0; retries < numRetries; retries++) {
ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(), clusterNode
.getPort()));
f.awaitUninterruptibly();
@@ -153,90 +250,77 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
return false;
}
- private void writeMessageToChannel(Channel channel, int partitionId, byte[] message) {
- ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
- buffer.writeBytes(message);
- ChannelFuture f = channel.write(buffer);
- f.addListener(this);
- }
+ private void sendMessage(int partitionId, Message m) {
+ ChannelBuffer buffer = ChannelBuffers.buffer(m.message.length);
+ buffer.writeBytes(m.message);
- private final Object sendLock = new Object();
-
- @Override
- public boolean send(int partitionId, byte[] message) {
- Channel channel = partitionChannelMap.get(partitionId);
- if (channel == null) {
- if (connectTo(partitionId)) {
- channel = partitionChannelMap.get(partitionId);
- } else {
- // could not connect, queue to the partitionBuffer
- return queuedMessages.add(partitionId, message);
+ while (true) {
+ if (!partitionChannelMap.containsKey(partitionId)) {
+ connectTo(partitionId);
+ continue;
}
- }
- /*
- * Try limiting the size of the send queue inside Netty
- */
- if (!channel.isWritable()) {
- synchronized (sendLock) {
- // check again now that we have the lock
- while (!channel.isWritable()) {
+ SendQueue sendQ = sendQueues.get(partitionId);
+ synchronized (sendQ) {
+ if (!partitionChannelMap.get(partitionId).isWritable()) {
try {
- sendLock.wait();
- } catch (InterruptedException ie) {
- return false;
+ logger.debug("Waiting for channel to partition {} to become writable", partitionId);
+ sendQ.wait();
+ } catch (InterruptedException e) {
+ continue;
}
}
}
+
+ Channel c = partitionChannelMap.get(partitionId);
+ if (c != null && c.isWritable()) {
+ c.write(buffer).addListener(m);
+ break;
+ }
}
+ }
- /*
- * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
- * buffered messages, and (3) the Current Message
- *
- * Once the channel returns success delete from the messagesOnTheWire
- */
- byte[] messageBeingSent = null;
- // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
- // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
- // }
-
- while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
- writeMessageToChannel(channel, partitionId, messageBeingSent);
- queuedMessages.remove(partitionId);
+ @Override
+ public boolean send(int partitionId, byte[] message) {
+ if (!sendQueues.containsKey(partitionId)) {
+ SendQueue sendQueue = new SendQueue(this, partitionId, this.bufferSize);
+ sendQueues.put(partitionId, sendQueue);
}
- writeMessageToChannel(channel, partitionId, message);
- return true;
+ SendQueue sendQueue = sendQueues.get(partitionId);
+ return sendQueue.offer(message);
}
- @Override
- public void operationComplete(ChannelFuture f) {
- int partitionId = partitionChannelMap.inverse().get(f.getChannel());
- if (f.isSuccess()) {
- // messagesOnTheWire.remove(partitionId);
+ public void close() {
+ ChannelGroup cg = new DefaultChannelGroup();
+ synchronized (partitionChannelMap) {
+ cg.addAll(partitionChannelMap.values());
+ partitionChannelMap.clear();
}
+ cg.close().awaitUninterruptibly();
+ bootstrap.releaseExternalResources();
+ }
- if (f.isCancelled()) {
- logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
- } else if (!f.isSuccess()) {
- logger.error("Exception on I/O operation", f.getCause());
- logger.error(String.format("I/O on partition %d failed!", partitionId));
- partitionChannelMap.remove(partitionId);
+ protected void closeChannel(int partition) {
+ Channel c = partitionChannelMap.remove(partition);
+ if (c != null) {
+ c.close().awaitUninterruptibly();
}
}
@Override
public void onChange() {
- /*
- * Close the channels that correspond to changed partitions and update partitionNodeMap
- */
for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
Integer partition = clusterNode.getPartition();
+ if (partition == null) {
+ logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
+ return;
+ }
+
ClusterNode oldNode = partitionNodeMap.get(partition);
if (oldNode != null && !oldNode.equals(clusterNode)) {
- partitionChannelMap.remove(partition).close();
+ closeChannel(partition);
}
partitionNodeMap.forcePut(partition, clusterNode);
@@ -249,14 +333,14 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
return topology.getTopology().getPartitionCount();
}
- class TestHandler extends SimpleChannelHandler {
+ class NotifyChannelInterestChange extends SimpleChannelHandler {
@Override
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
- // logger.info(String.format("%08x %08x %08x", e.getValue(),
- // e.getChannel().getInterestOps(), Channel.OP_WRITE));
- synchronized (sendLock) {
- if (e.getChannel().isWritable()) {
- sendLock.notify();
+ Channel c = e.getChannel();
+ SendQueue sendQ = sendQueues.get(partitionChannelMap.inverse().get(c));
+ synchronized (sendQ) {
+ if (c.isWritable()) {
+ sendQ.notify();
}
}
ctx.sendUpstream(e);
@@ -266,9 +350,10 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
if (partitionId == null) {
- logger.error("Error on mystery channel!!");
+ return;
}
logger.error("Error on channel to partition " + partitionId);
+ partitionChannelMap.remove(partitionId);
try {
throw event.getCause();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index b3776e1..ee50019 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -10,6 +10,7 @@ import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.ClusterNode;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
@@ -18,6 +19,8 @@ import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
@@ -25,61 +28,70 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-
public class TCPListener implements Listener {
+ private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
private ClusterNode node;
- private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
-
+ private ServerBootstrap bootstrap;
+ private final ChannelGroup channelGroup = new DefaultChannelGroup();
+
@Inject
public TCPListener(Assignment assignment) {
// wait for an assignment
node = assignment.assignClusterNode();
-
- ChannelFactory factory =
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool());
- ServerBootstrap bootstrap = new ServerBootstrap(factory);
+ ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+
+ bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
ChannelPipeline p = Channels.pipeline();
p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
p.addLast("2", new ChannelHandler(handoffQueue));
-
+
return p;
}
});
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
-
- bootstrap.bind(new InetSocketAddress(node.getPort()));
+ bootstrap.setOption("child.reuseAddress", true);
+ bootstrap.setOption("child.connectTimeoutMillis", 100);
+ bootstrap.setOption("readWriteFair", true);
+
+ Channel c = bootstrap.bind(new InetSocketAddress(node.getPort()));
+ channelGroup.add(c);
}
-
+
public byte[] recv() {
try {
- return handoffQueue.take();
+ byte[] msg = handoffQueue.take();
+ return msg;
} catch (InterruptedException e) {
- return null;
+ return null;
}
}
-
+
public int getPartitionId() {
return node.getPartition();
}
-
+
+ public void close() {
+ channelGroup.close().awaitUninterruptibly();
+ bootstrap.releaseExternalResources();
+ }
+
public class ChannelHandler extends SimpleChannelHandler {
private BlockingQueue<byte[]> handoffQueue;
-
+
public ChannelHandler(BlockingQueue<byte[]> handOffQueue) {
this.handoffQueue = handOffQueue;
}
-
- public void messageReceived(ChannelHandlerContext ctx,
- MessageEvent e) {
+
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ channelGroup.add(e.getChannel());
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
try {
handoffQueue.put(buffer.array()); // this holds up the Netty upstream I/O thread if
@@ -88,7 +100,7 @@ public class TCPListener implements Listener {
Thread.currentThread().interrupt();
}
}
-
+
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
logger.error("Error", event.getCause());
if (context.getChannel().isOpen()) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index ab14449..f3928b8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -33,8 +33,7 @@ public class TaskSetup {
record.putSimpleField("port", String.valueOf(1300 + i));
record.putSimpleField("partition", String.valueOf(i));
record.putSimpleField("cluster", clusterName);
- zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
- record);
+ zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId, record);
}
}
@@ -45,13 +44,11 @@ public class TaskSetup {
taskSetup.setup(clusterName, 10);
String zookeeperAddress = "localhost:2181";
for (int i = 0; i < 10; i++) {
- AssignmentFromZK assignmentFromZK = new AssignmentFromZK(
- clusterName, zookeeperAddress, 30000, 30000);
- ClusterNode assignClusterNode = assignmentFromZK
- .assignClusterNode();
- System.out.println(i+"-->"+assignClusterNode);
+ AssignmentFromZK assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
+ ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+ System.out.println(i + "-->" + assignClusterNode);
}
- TopologyFromZK topologyFromZK=new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
+ TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
Thread.sleep(3000);
Cluster topology = topologyFromZK.getTopology();
System.out.println(topology.getNodes().size());
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 85b164c..1e1f229 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -77,8 +77,14 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
synchronized (nodes) {
for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
Integer partition = clusterNode.getPartition();
- nodes.put(partition, clusterNode);
+ nodes.forcePut(partition, clusterNode);
}
}
}
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index 84aa7fa..5cd62de 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -12,11 +12,10 @@ import org.apache.s4.comm.topology.ClusterNode;
import com.google.inject.Inject;
-
/**
*
* Implementation of a simple UDP listener.
- *
+ *
*/
public class UDPListener implements Listener, Runnable {
@@ -54,8 +53,7 @@ public class UDPListener implements Listener, Runnable {
while (!Thread.interrupted()) {
socket.receive(datagram);
byte[] data = new byte[datagram.getLength()];
- System.arraycopy(datagram.getData(), datagram.getOffset(),
- data, 0, data.length);
+ System.arraycopy(datagram.getData(), datagram.getOffset(), data, 0, data.length);
datagram.setLength(BUFFER_LENGTH);
try {
handoffQueue.put(data);
@@ -72,7 +70,7 @@ public class UDPListener implements Listener, Runnable {
try {
return handoffQueue.take();
} catch (InterruptedException e) {
- return null;
+ return null;
}
}
@@ -80,4 +78,10 @@ public class UDPListener implements Listener, Runnable {
return node.getPartition();
}
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
deleted file mode 100644
index 19b4de1..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
+++ /dev/null
@@ -1,143 +0,0 @@
-package org.apache.s4.comm;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/*
- * Test util for communication protocols.
- *
- * <ul>
- * <li> The util defines Send and Receive Threads </li>
- * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
- * <li> ReceiveThread receives all/most of these messages </li>
- * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would
- * interrupt after a pre-defined but long enough interval </li>
- * </ul>
- *
- */
-public class DeliveryTestUtil {
-
- private final Emitter emitter;
- private final Listener listener;
- private final int interval;
- private int numMessages;
- private int sleepCount;
-
- // public Thread sendThread, receiveThread;
- private final int messagesExpected;
-
- @Inject
- public DeliveryTestUtil(Emitter emitter, Listener listener, @Named("emitter.send.interval") int interval,
- @Named("emitter.send.numMessages") int numMessages, @Named("listener.recv.sleepCount") int sleepCount) {
- this.emitter = emitter;
- this.listener = listener;
- this.interval = interval;
- this.numMessages = numMessages;
- this.sleepCount = sleepCount;
- this.messagesExpected = numMessages * this.emitter.getPartitionCount();
-
- // this.sendThread = new SendThread();
- // this.receiveThread = new ReceiveThread();
- }
-
- public class SendThread extends Thread {
- @Override
- public void run() {
- try {
- for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
- for (int i = 0; i < numMessages; i++) {
- byte[] message = (new String("message-" + i)).getBytes();
- emitter.send(partition, message);
- Thread.sleep(interval);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- return;
- }
- }
- }
-
- /*
- * TimerThread - interrupts the passed thread, after specified time-interval.
- */
- class TimerThread extends Thread {
- private final Thread watchThread;
- private Integer sleepCounter;
-
- TimerThread(Thread watchThread) {
- this.watchThread = watchThread;
- this.sleepCounter = new Integer(sleepCount);
- }
-
- public void resetSleepCounter() {
- synchronized (this.sleepCounter) {
- this.sleepCounter = sleepCount;
- }
- }
-
- public void clearSleepCounter() {
- synchronized (this.sleepCounter) {
- this.sleepCounter = 0;
- }
- }
-
- private int getCounter() {
- synchronized (this.sleepCounter) {
- return this.sleepCounter--;
- }
- }
-
- @Override
- public void run() {
- try {
- while (getCounter() > 0) {
- Thread.sleep(interval);
- }
- watchThread.interrupt();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- class ReceiveThread extends Thread {
- private int messagesReceived = 0;
-
- @Override
- public void run() {
-
- // start the timer thread to interrupt if blocked for too long
- TimerThread timer = new TimerThread(this);
- timer.start();
- while (messagesReceived < messagesExpected) {
- byte[] message = listener.recv();
- timer.resetSleepCounter();
- if (message != null)
- messagesReceived++;
- else
- break;
- }
- timer.clearSleepCounter();
- }
-
- private boolean moreMessages() {
- return ((messagesExpected - messagesReceived) > 0);
- }
- }
-
- public Thread newSendThread() {
- return new SendThread();
- }
-
- public Thread newReceiveThread() {
- return new ReceiveThread();
- }
-
- public boolean moreMessages(Thread recvThread) {
- return ((ReceiveThread) recvThread).moreMessages();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
new file mode 100644
index 0000000..0752874
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
@@ -0,0 +1,28 @@
+package org.apache.s4.comm.tcp;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiPartitionDeliveryTest extends TCPBasedTest {
+ private static final Logger logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
+
+ public MultiPartitionDeliveryTest() {
+ super(4);
+ }
+
+ @Test
+ public void testMultiPartitionDelivery() throws InterruptedException {
+
+ startThreads();
+ waitForThreads();
+
+ int messagesNotReceived = notDeliveredMessages();
+ logger.info("# Messages not received = " + messagesNotReceived);
+ Assert.assertEquals("Guaranteed message delivery", messagesNotReceived, 0);
+
+ logger.info("Message ordering - " + messageOrdering());
+ Assert.assertTrue("Pairwise message ordering", messageOrdering());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
new file mode 100644
index 0000000..5f26f89
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
@@ -0,0 +1,32 @@
+package org.apache.s4.comm.tcp;
+
+import org.apache.s4.comm.util.PartitionInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetworkGlitchTest extends TCPBasedTest {
+ private static final Logger logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
+
+ @Test
+ public void testResilienceToNetworkGlitches() throws InterruptedException {
+ PartitionInfo util = partitions[0];
+
+ startThreads();
+
+ for (int i = 1; i < 10; i++) {
+ Thread.sleep(100);
+ ((TCPEmitter) util.emitter).closeChannel(0);
+ }
+
+ waitForThreads();
+
+ int messagesNotReceived = notDeliveredMessages();
+ logger.info("# Messages not received = " + messagesNotReceived);
+ Assert.assertEquals("Guaranteed message delivery", messagesNotReceived, 0);
+
+ logger.info("Message ordering - " + messageOrdering());
+ Assert.assertTrue("Pairwise message ordering", messageOrdering());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
new file mode 100644
index 0000000..93638a4
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
@@ -0,0 +1,23 @@
+package org.apache.s4.comm.tcp;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleDeliveryTest extends TCPBasedTest {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
+
+ @Test
+ public void testSimpleDelivery() throws InterruptedException {
+ startThreads();
+ waitForThreads();
+
+ int messagesNotReceived = notDeliveredMessages();
+ logger.info("# Messages not received = " + messagesNotReceived);
+ Assert.assertEquals("Guaranteed message delivery", messagesNotReceived, 0);
+
+ logger.info("Message ordering - " + messageOrdering());
+ Assert.assertTrue("Pairwise message ordering", messageOrdering());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasedTest.java
new file mode 100644
index 0000000..3c5a4bc
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasedTest.java
@@ -0,0 +1,33 @@
+package org.apache.s4.comm.tcp;
+
+import org.apache.s4.comm.util.ProtocolTestUtil;
+import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+
+import com.google.inject.Guice;
+import com.google.inject.name.Names;
+
+public abstract class TCPBasedTest extends ProtocolTestUtil {
+ protected TCPBasedTest() {
+ super();
+ super.injector = Guice.createInjector(new TCPTestModule());
+ }
+
+ protected TCPBasedTest(int numTasks) {
+ super(numTasks);
+ super.injector = Guice.createInjector(new TCPTestModule());
+ }
+
+ class TCPTestModule extends ZkBasedClusterManagementTestModule {
+ TCPTestModule() {
+ super(TCPEmitter.class, TCPListener.class);
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ bind(Integer.class).annotatedWith(Names.named("tcp.partition.queue_size")).toInstance(256);
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(2);
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(100);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
deleted file mode 100644
index f02bf64..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.s4.comm.tcp;
-
-import java.io.IOException;
-import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.name.Names;
-
-public class TCPCommTest extends ZkBasedTest {
- DeliveryTestUtil util;
-
- @Before
- public void setup() throws IOException, InterruptedException, KeeperException {
- Injector injector = Guice.createInjector(new TCPCommTestModule());
- util = injector.getInstance(DeliveryTestUtil.class);
- }
-
- class TCPCommTestModule extends ZkBasedClusterManagementTestModule {
- TCPCommTestModule() {
- super(TCPEmitter.class, TCPListener.class);
- }
-
- @Override
- protected void configure() {
- super.configure();
- bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
- bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
- bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
- }
- }
-
- /**
- * Tests the protocol. If all components function without throwing exceptions, the test passes. The test also
- * reports the loss of messages, if any.
- *
- * @throws InterruptedException
- */
- @Test
- public void testTCPDelivery() throws InterruptedException {
- try {
- Thread sendThread = util.newSendThread();
- Thread receiveThread = util.newReceiveThread();
-
- // start send and receive threads
- sendThread.start();
- receiveThread.start();
-
- // wait for them to finish
- sendThread.join();
- receiveThread.join();
-
- Assert.assertTrue("Guaranteed message delivery", !util.moreMessages(receiveThread));
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("TCP has failed basic functionality test");
- }
-
- System.out.println("Done");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
index eb5f42c..1b47285 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
@@ -8,6 +8,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.*;
import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.ZkBasedTest;
import org.junit.Test;
public class TopologyFromZKTest extends ZKBaseTest {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
new file mode 100644
index 0000000..4e1f175
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
@@ -0,0 +1,30 @@
+package org.apache.s4.comm.udp;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiPartitionDeliveryTest extends UDPBasedTest {
+ private static final Logger logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
+
+ public MultiPartitionDeliveryTest() {
+ super(2);
+ }
+
+ /**
+ * Tests the protocol. If all components function without throwing exceptions, the test passes.
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testUDPDelivery() {
+ try {
+ startThreads();
+ waitForThreads();
+ logger.info("# Messages not received = " + notDeliveredMessages());
+ } catch (Exception e) {
+ Assert.fail("UDP Simple DeliveryTest");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
new file mode 100644
index 0000000..82b4be4
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
@@ -0,0 +1,30 @@
+package org.apache.s4.comm.udp;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleDeliveryTest extends UDPBasedTest {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
+
+ public SimpleDeliveryTest() {
+ super();
+ }
+
+ /**
+ * Tests the protocol. If all components function without throwing exceptions, the test passes.
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testUDPDelivery() {
+ try {
+ startThreads();
+ waitForThreads();
+ logger.info("# Messages not received = " + notDeliveredMessages());
+ } catch (Exception e) {
+ Assert.fail("UDP Simple DeliveryTest");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasedTest.java
new file mode 100644
index 0000000..9f8d8b0
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasedTest.java
@@ -0,0 +1,32 @@
+package org.apache.s4.comm.udp;
+
+import org.apache.s4.comm.util.ProtocolTestUtil;
+import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+
+import com.google.inject.Guice;
+import com.google.inject.name.Names;
+
+public abstract class UDPBasedTest extends ProtocolTestUtil {
+ protected UDPBasedTest() {
+ super();
+ super.injector = Guice.createInjector(new UDPTestModule());
+ }
+
+ protected UDPBasedTest(int numTasks) {
+ super(numTasks);
+ super.injector = Guice.createInjector(new UDPTestModule());
+ }
+
+ class UDPTestModule extends ZkBasedClusterManagementTestModule {
+ UDPTestModule() {
+ super(UDPEmitter.class, UDPListener.class);
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
deleted file mode 100644
index 808a357..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package org.apache.s4.comm.udp;
-
-import java.io.IOException;
-
-import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.name.Names;
-
-public class UDPCommTest extends ZkBasedTest {
- DeliveryTestUtil util;
-
- @Before
- public void setup() throws IOException, InterruptedException, KeeperException {
- Injector injector = Guice.createInjector(new UDPCommTestModule());
- util = injector.getInstance(DeliveryTestUtil.class);
- }
-
- class UDPCommTestModule extends ZkBasedClusterManagementTestModule {
- UDPCommTestModule() {
- super(UDPEmitter.class, UDPListener.class);
- }
-
- @Override
- protected void configure() {
- super.configure();
- bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
- bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
- bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
- }
- }
-
- /**
- * Tests the protocol. If all components function without throwing exceptions, the test passes.
- *
- * @throws InterruptedException
- */
- @Test
- public void testUDPDelivery() throws InterruptedException {
- try {
- Thread sendThread = util.newSendThread();
- Thread receiveThread = util.newReceiveThread();
-
- // start send and receive threads
- sendThread.start();
- receiveThread.start();
-
- // wait for them to finish
- sendThread.join();
- receiveThread.join();
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("UDP has failed basic functionality test");
- }
- System.out.println("Done");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
new file mode 100644
index 0000000..47d1aa6
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
@@ -0,0 +1,139 @@
+package org.apache.s4.comm.util;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/*
+ * Test util for communication protocols.
+ *
+ * <ul>
+ * <li> The util defines Send and Receive Threads </li>
+ * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
+ * <li> ReceiveThread receives all/most of these messages </li>
+ * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would
+ * interrupt after a pre-defined but long enough interval </li>
+ * </ul>
+ *
+ */
+public class PartitionInfo {
+ private static final Logger logger = LoggerFactory.getLogger(PartitionInfo.class);
+ public Emitter emitter;
+ public Listener listener;
+ private final int interval;
+ private int numMessages;
+ private int partitionId;
+
+ public SendThread sendThread;
+ public ReceiveThread receiveThread;
+ private int messagesExpected;
+
+ @Inject
+ public PartitionInfo(Emitter emitter, Listener listener, @Named("emitter.send.interval") int interval,
+ @Named("emitter.send.numMessages") int numMessages) {
+ this.emitter = emitter;
+ this.listener = listener;
+ this.partitionId = this.listener.getPartitionId();
+ logger.debug("# Partitions = {}; Current partition = {}", this.emitter.getPartitionCount(),
+ this.listener.getPartitionId());
+
+ this.interval = interval;
+ this.numMessages = numMessages;
+ this.messagesExpected = numMessages * this.emitter.getPartitionCount();
+
+ this.sendThread = new SendThread();
+ this.receiveThread = new ReceiveThread();
+ }
+
+ public class SendThread extends Thread {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
+ byte[] message = new String(partitionId + " " + i).getBytes();
+ while (!emitter.send(partition, message)) {
+ logger.debug("SendThread {}: Resending message to {}", partitionId, partition);
+ Thread.sleep(interval);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+
+ logger.debug("SendThread {}: Exiting", partitionId);
+ }
+ }
+
+ private static class TimelyInterrupt extends TimerTask {
+ private final Thread watchThread;
+
+ TimelyInterrupt(Thread watchThread) {
+ this.watchThread = watchThread;
+ }
+
+ @Override
+ public void run() {
+ watchThread.interrupt();
+ }
+ }
+
+ public class ReceiveThread extends Thread {
+ private int sleepCount = 10;
+ private int messagesReceived = 0;
+ private boolean ordered = true;
+ private int receivedMessages[];
+ private Timer timer;
+
+ ReceiveThread() {
+ receivedMessages = new int[emitter.getPartitionCount()];
+ for (int i = 0; i < receivedMessages.length; i++)
+ receivedMessages[i] = -1;
+ timer = new Timer();
+ }
+
+ @Override
+ public void run() {
+ timer.schedule(new TimelyInterrupt(this), 10 * interval);
+
+ while (messagesReceived < messagesExpected) {
+ byte[] message = listener.recv();
+ if (message != null) {
+ messagesReceived++;
+ String msgString = new String(message);
+ String[] msgTokens = msgString.split(" ");
+ int senderPartition = Integer.parseInt(msgTokens[0]);
+ int receivedMsg = Integer.parseInt(msgTokens[1]);
+ if (receivedMsg < receivedMessages[senderPartition])
+ ordered = false;
+ receivedMessages[senderPartition] = receivedMsg;
+ } else if (sleepCount-- > 0) {
+ continue;
+ } else {
+ break;
+ }
+ }
+ timer.cancel();
+ timer.purge();
+
+ logger.debug("ReceiveThread {}: Exiting with {} messages left", partitionId, moreMessages());
+ }
+
+ public boolean ordered() {
+ return ordered;
+ }
+
+ public int moreMessages() {
+ return (messagesExpected - messagesReceived);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
new file mode 100644
index 0000000..60c5bba
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
@@ -0,0 +1,76 @@
+package org.apache.s4.comm.util;
+
+import java.io.IOException;
+
+import org.apache.s4.comm.util.PartitionInfo;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.Before;
+
+import com.google.inject.Injector;
+
+public abstract class ProtocolTestUtil extends ZkBasedTest {
+ protected PartitionInfo[] partitions;
+ protected Injector injector;
+
+ protected ProtocolTestUtil() {
+ super();
+ }
+
+ protected ProtocolTestUtil(int numTasks) {
+ super(numTasks);
+ }
+
+ @Before
+ public void setup() throws IOException, InterruptedException, KeeperException {
+ partitions = new PartitionInfo[super.numTasks];
+ for (int i = 0; i < this.numTasks; i++) {
+ partitions[i] = injector.getInstance(PartitionInfo.class);
+ }
+ }
+
+ protected void startThreads() {
+ for (PartitionInfo partition : partitions) {
+ partition.sendThread.start();
+ partition.receiveThread.start();
+ }
+ }
+
+ protected void waitForThreads() throws InterruptedException {
+ for (PartitionInfo partition : partitions) {
+ partition.sendThread.join();
+ partition.receiveThread.join();
+ }
+ }
+
+ protected int notDeliveredMessages() {
+ int sum = 0;
+ for (PartitionInfo partition : partitions) {
+ sum += partition.receiveThread.moreMessages();
+ }
+ return sum;
+ }
+
+ protected boolean messageOrdering() {
+ for (PartitionInfo partition : partitions) {
+ if (!partition.receiveThread.ordered())
+ return false;
+ }
+ return true;
+ }
+
+ @After
+ public void tearDown() {
+ for (PartitionInfo partition : partitions) {
+ if (partition.emitter != null) {
+ partition.emitter.close();
+ partition.emitter = null;
+ }
+ if (partition.listener != null) {
+ partition.listener.close();
+ partition.listener = null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
index ebcf388..272f9b7 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
@@ -14,13 +14,13 @@ import org.apache.s4.base.Listener;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromFile;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.Topology;
import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
@@ -69,9 +69,8 @@ public abstract class FileBasedClusterManagementTestModule<T> extends AbstractMo
bind(SerializerDeserializer.class).to(KryoSerDeser.class);
bind(Assignment.class).to(AssignmentFromFile.class);
bind(Topology.class).to(TopologyFromFile.class);
- bind(Emitter.class).to(UDPEmitter.class);
- bind(Listener.class).to(UDPListener.class);
-
+ bind(Emitter.class).to(TCPEmitter.class);
+ bind(Listener.class).to(TCPListener.class);
+ bind(Integer.class).annotatedWith(Names.named("tcp.partition.queue_size")).toInstance(256);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
index a368db8..1e8cf2f 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -19,19 +19,23 @@ import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.Topology;
import org.apache.s4.comm.topology.TopologyFromZK;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.name.Names;
public class ZkBasedClusterManagementTestModule extends AbstractModule {
-
+ private static final Logger logger = LoggerFactory.getLogger(ZkBasedClusterManagementTestModule.class);
protected PropertiesConfiguration config = null;
private Class<? extends Emitter> emitterClass = null;
private Class<? extends Listener> listenerClass = null;
protected ZkBasedClusterManagementTestModule() {
+ this.emitterClass = TCPEmitter.class;
+ this.listenerClass = TCPListener.class;
}
protected ZkBasedClusterManagementTestModule(Class<? extends Emitter> emitterClass,
@@ -72,16 +76,13 @@ public class ZkBasedClusterManagementTestModule extends AbstractModule {
bind(Assignment.class).to(AssignmentFromZK.class);
bind(Topology.class).to(TopologyFromZK.class);
- if (this.emitterClass != null) {
- bind(Emitter.class).to(this.emitterClass);
- } else {
- bind(Emitter.class).to(TCPEmitter.class);
- }
+ bind(Emitter.class).to(this.emitterClass);
+ bind(Listener.class).to(this.listenerClass);
- if (this.listenerClass != null) {
- bind(Listener.class).to(this.listenerClass);
- } else {
- bind(Listener.class).to(TCPListener.class);
+ if (this.emitterClass.equals(TCPEmitter.class)) {
+ bind(Integer.class).annotatedWith(Names.named("tcp.partition.queue_size")).toInstance(256);
}
+
+ logger.info("Emitter set to - {}; Listener set to - {}", this.emitterClass, this.listenerClass);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8e2427b0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index 02c5467..3b1e45a 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -6,6 +6,7 @@ import org.I0Itec.zkclient.IDefaultNameSpace;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkServer;
import org.apache.s4.comm.tools.TaskSetup;
+import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -13,15 +14,26 @@ import org.slf4j.LoggerFactory;
public abstract class ZkBasedTest {
private static final Logger logger = LoggerFactory.getLogger(ZkBasedTest.class);
private ZkServer zkServer;
+ protected String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
+
+ private static final String clusterName = "s4-test-cluster";
+ protected final int numTasks;
+
+ protected ZkBasedTest() {
+ this.numTasks = 1;
+ }
+
+ protected ZkBasedTest(int numTasks) {
+ this.numTasks = numTasks;
+ }
@Before
- public void prepare() {
+ public void setupZkBasedTest() {
String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
CommTestUtils.cleanupTmpDirs();
IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
-
@Override
public void createDefaultNameSpace(ZkClient zkClient) {
@@ -32,17 +44,17 @@ public abstract class ZkBasedTest {
zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
zkServer.start();
- logger.info("Starting Zookeeper Client 1");
- String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
- @SuppressWarnings("unused")
- ZkClient zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
-
- ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
- zkClient2.getCreationTime("/");
-
TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
- final String clusterName = "s4-test-cluster";
taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 1);
+ taskSetup.setup(clusterName, this.numTasks);
+ }
+
+ @After
+ public void cleanupZkBasedTest() {
+ if (zkServer != null) {
+ zkServer.shutdown();
+ zkServer = null;
+ }
+ CommTestUtils.cleanupTmpDirs();
}
}
|