incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [25/50] [abbrv] git commit: comm layer improvements
Date Tue, 03 Jan 2012 14:03:28 GMT
comm layer improvements

- dependency injection for zookeeper topology and assignment
- handle dynamic topology updates
- added application level test using zookeeper and netty

Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/d6450a49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/d6450a49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/d6450a49

Branch: refs/heads/piper
Commit: d6450a49ba999c988283d6dcc1a8d47ba025944b
Parents: df927d2
Author: Matthieu Morel <mmorel@apache.org>
Authored: Thu Nov 10 20:36:11 2011 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Thu Nov 10 20:36:11 2011 +0100

----------------------------------------------------------------------
 .../org/apache/s4/comm/netty/NettyEmitter.java     |  344 ++++++++-------
 .../apache/s4/comm/topology/AssignmentFromZK.java  |   74 ++--
 .../apache/s4/comm/topology/TopologyFromZK.java    |   46 +-
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |   48 ++-
 .../java/test/s4/core/apploading/SimpleModule.java |    4 +-
 .../FileBasedClusterManagementTestModule.java      |   78 ++++
 .../java/test/s4/wordcount/WordCountModule.java    |    4 +-
 .../src/test/resources/default.s4.properties       |    6 +-
 8 files changed, 342 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6450a49/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
index c57177f..8958018 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
@@ -30,174 +30,180 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.HashBiMap;
 import com.google.inject.Inject;
 
-public class NettyEmitter implements Emitter, ChannelFutureListener,
-		TopologyChangeListener {
-	private static final Logger logger = LoggerFactory
-			.getLogger(NettyEmitter.class);
-
-	private Topology topology;
-	private final ClientBootstrap bootstrap;
-
-	// Hashtable inherently allows capturing changes to the underlying topology
-	private HashBiMap<Integer, Channel> channels;
-	private HashBiMap<Integer, ClusterNode> nodes;
-
-	@Inject
-	public NettyEmitter(Topology topology) throws InterruptedException {
-		this.topology = topology;
-		int clusterSize = this.topology.getTopology().getNodes().size();
-		
-		channels = HashBiMap.create(clusterSize);
-		nodes = HashBiMap.create(clusterSize);
-		
-		for (ClusterNode clusterNode : NettyEmitter.this.topology.getTopology()
-				.getNodes()) {
-			Integer partition = clusterNode.getPartition();
-			nodes.forcePut(partition, clusterNode);
-		}
-
-		ChannelFactory factory = new NioClientSocketChannelFactory(
-				Executors.newCachedThreadPool(),
-				Executors.newCachedThreadPool());
-
-		bootstrap = new ClientBootstrap(factory);
-
-		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-			public ChannelPipeline getPipeline() {
-				ChannelPipeline p = Channels.pipeline();
-				p.addLast("1", new LengthFieldPrepender(4));
-				p.addLast("2", new TestHandler());
-				return p;
-			}
-		});
-
-		bootstrap.setOption("tcpNoDelay", true);
-		bootstrap.setOption("keepAlive", true);
-	}
-
-	private void connectTo(Integer partitionId) {
-		ClusterNode clusterNode = nodes.get(partitionId);
-
-		if (clusterNode == null)
-			logger.error("No ClusterNode exists for partitionId " + partitionId);
-
-		logger.info(String.format("Connecting to %s:%d",
-				clusterNode.getMachineName(), clusterNode.getPort()));
-		while (true) {
-			ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(
-					clusterNode.getMachineName(), clusterNode.getPort()));
-			f.awaitUninterruptibly();
-			if (f.isSuccess()) {
-				channels.forcePut(partitionId, f.getChannel());
-				break;
-			}
-			try {
-				Thread.sleep(10);
-			} catch (InterruptedException ie) {
-				logger.error(String.format(
-						"Interrupted while connecting to %s:%d",
-						clusterNode.getMachineName(), clusterNode.getPort()));
-			}
-		}
-	}
-
-	private Object sendLock = new Object();
-
-	public void send(int partitionId, byte[] message) {
-		Channel channel = channels.get(partitionId);
-
-		while (channel == null) {
-			connectTo(partitionId);
-			channel = channels.get(partitionId); // making sure it is reflected in the map
-		}
-
-		ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
-
-		// check if Netty's send queue has gotten quite large
-		if (!channel.isWritable()) {
-			synchronized (sendLock) {
-				// check again now that we have the lock
-				while (!channel.isWritable()) {
-					try {
-						sendLock.wait(); // wait until the channel's queue
-											// has gone down
-					} catch (InterruptedException ie) {
-						return; // somebody wants us to stop running
-					}
-				}
-				// logger.info("Woke up from send block!");
-			}
-		}
-		// between the above isWritable check and the below writeBytes, the
-		// isWritable
-		// may become false again. That's OK, we're just trying to avoid a
-		// very large
-		// above check to avoid creating a very large send queue inside
-		// Netty.
-		buffer.writeBytes(message);
-		ChannelFuture f = channel.write(buffer);
-		f.addListener(this);
-
-	}
-
-	public void operationComplete(ChannelFuture f) {
-		// when we get here, the I/O operation associated with f is complete
-		if (f.isCancelled()) {
-			logger.error("Send I/O was cancelled!! "
-					+ f.getChannel().getRemoteAddress());
-		} else if (!f.isSuccess()) {
-			logger.error("Exception on I/O operation", f.getCause());
-			// find the partition associated with this broken channel
-			int partition = channels.inverse().get(f.getChannel());
-			logger.error(String
-					.format("I/O on partition %d failed!", partition));
-		}
-	}
-
-	public void onChange() {
-		// do nothing for now, don't expect the topology to change.
-	}
-
-	public int getPartitionCount() {
-	    //Number of nodes is not same as number of partitions
-		return topology.getTopology().getPartitionCount();
-	}
-
-	class TestHandler extends SimpleChannelHandler {
-		public void channelInterestChanged(ChannelHandlerContext ctx,
-				ChannelStateEvent e) {
-			// logger.info(String.format("%08x %08x %08x", e.getValue(),
-			// e.getChannel().getInterestOps(), Channel.OP_WRITE));
-			synchronized (sendLock) {
-				if (e.getChannel().isWritable()) {
-					sendLock.notify();
-				}
-			}
-			ctx.sendUpstream(e);
-
-		}
-
-		public void exceptionCaught(ChannelHandlerContext context,
-				ExceptionEvent event) {
-			Integer partition = channels.inverse().get(context.getChannel());
-			if (partition == null) {
-				logger.error("Error on mystery channel!!");
-				// return;
-			}
-			logger.error("Error on channel to partition " + partition);
-
-			try {
-				throw event.getCause();
-			} catch (ConnectException ce) {
-				logger.error(ce.getMessage(), ce);
-			} catch (Throwable err) {
-				logger.error("Error", err);
-				if (context.getChannel().isOpen()) {
-					logger.error("Closing channel due to exception");
-					context.getChannel().close();
-				}
-			}
-		}
-	}
+public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener
{
+    private static final Logger logger = LoggerFactory.getLogger(NettyEmitter.class);
+
+    private Topology topology;
+    private final ClientBootstrap bootstrap;
+
+    // Hashtable inherently allows capturing changes to the underlying topology
+    private HashBiMap<Integer, Channel> channels;
+    private HashBiMap<Integer, ClusterNode> nodes;
+
+    @Inject
+    public NettyEmitter(Topology topology) throws InterruptedException {
+        this.topology = topology;
+        topology.addListener(this);
+        int clusterSize = this.topology.getTopology().getNodes().size();
+
+        channels = HashBiMap.create(clusterSize);
+        nodes = HashBiMap.create(clusterSize);
+
+        for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+            Integer partition = clusterNode.getPartition();
+            nodes.forcePut(partition, clusterNode);
+        }
+
+        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool());
+
+        bootstrap = new ClientBootstrap(factory);
+
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            @Override
+            public ChannelPipeline getPipeline() {
+                ChannelPipeline p = Channels.pipeline();
+                p.addLast("1", new LengthFieldPrepender(4));
+                p.addLast("2", new TestHandler());
+                return p;
+            }
+        });
+
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("keepAlive", true);
+    }
+
+    private void connectTo(Integer partitionId) {
+        ClusterNode clusterNode = nodes.get(partitionId);
+
+        if (clusterNode == null) {
+            logger.error("No ClusterNode exists for partitionId " + partitionId);
+            // clusterNode = topology.getTopology().getNodes().get(partitionId);
+        }
+
+        logger.info(String.format("Connecting to %s:%d", clusterNode.getMachineName(), clusterNode.getPort()));
+        while (true) {
+            ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(),
clusterNode
+                    .getPort()));
+            f.awaitUninterruptibly();
+            if (f.isSuccess()) {
+                channels.forcePut(partitionId, f.getChannel());
+                break;
+            }
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException ie) {
+                logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
+                        clusterNode.getPort()));
+            }
+        }
+    }
+
+    private final Object sendLock = new Object();
+
+    @Override
+    public void send(int partitionId, byte[] message) {
+        Channel channel = channels.get(partitionId);
+
+        while (channel == null) {
+            connectTo(partitionId);
+            channel = channels.get(partitionId); // making sure it is reflected in the map
+        }
+
+        ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
+
+        // check if Netty's send queue has gotten quite large
+        if (!channel.isWritable()) {
+            synchronized (sendLock) {
+                // check again now that we have the lock
+                while (!channel.isWritable()) {
+                    try {
+                        sendLock.wait(); // wait until the channel's queue
+                                         // has gone down
+                    } catch (InterruptedException ie) {
+                        return; // somebody wants us to stop running
+                    }
+                }
+                // logger.info("Woke up from send block!");
+            }
+        }
+        // between the above isWritable check and the below writeBytes, the
+        // isWritable
+        // may become false again. That's OK, we're just trying to avoid a
+        // very large
+        // above check to avoid creating a very large send queue inside
+        // Netty.
+        buffer.writeBytes(message);
+        ChannelFuture f = channel.write(buffer);
+        f.addListener(this);
+
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture f) {
+        // when we get here, the I/O operation associated with f is complete
+        if (f.isCancelled()) {
+            logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
+        } else if (!f.isSuccess()) {
+            logger.error("Exception on I/O operation", f.getCause());
+            // find the partition associated with this broken channel
+            int partition = channels.inverse().get(f.getChannel());
+            logger.error(String.format("I/O on partition %d failed!", partition));
+        }
+    }
+
+    @Override
+    public void onChange() {
+        // topology changes when processes pick tasks
+        synchronized (nodes) {
+            for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+                Integer partition = clusterNode.getPartition();
+                nodes.put(partition, clusterNode);
+            }
+        }
+    }
+
+    @Override
+    public int getPartitionCount() {
+        // Number of nodes is not same as number of partitions
+        return topology.getTopology().getPartitionCount();
+    }
+
+    class TestHandler extends SimpleChannelHandler {
+        @Override
+        public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e)
{
+            // logger.info(String.format("%08x %08x %08x", e.getValue(),
+            // e.getChannel().getInterestOps(), Channel.OP_WRITE));
+            synchronized (sendLock) {
+                if (e.getChannel().isWritable()) {
+                    sendLock.notify();
+                }
+            }
+            ctx.sendUpstream(e);
+
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event)
{
+            Integer partition = channels.inverse().get(context.getChannel());
+            if (partition == null) {
+                logger.error("Error on mystery channel!!");
+                // return;
+            }
+            logger.error("Error on channel to partition " + partition);
+
+            try {
+                throw event.getCause();
+            } catch (ConnectException ce) {
+                logger.error(ce.getMessage(), ce);
+            } catch (Throwable err) {
+                logger.error("Error", err);
+                if (context.getChannel().isOpen()) {
+                    logger.error("Closing channel due to exception");
+                    context.getChannel().close();
+                }
+            }
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6450a49/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 3f03105..680c920 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -19,10 +19,11 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AssignmentFromZK implements Assignment, IZkChildListener,
-        IZkStateListener, IZkDataListener {
-    private static final Logger logger = LoggerFactory
-            .getLogger(TopologyFromZK.class);
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener,
IZkDataListener {
+    private static final Logger logger = LoggerFactory.getLogger(TopologyFromZK.class);
     /*
      * Name of the cluster
      */
@@ -34,23 +35,23 @@ public class AssignmentFromZK implements Assignment, IZkChildListener,
     /**
      * ZkClient used to do all interactions with Zookeeper
      */
-    private ZkClient zkClient;
+    private final ZkClient zkClient;
     /**
      * Root path of tasks in ZK
      */
-    private String taskPath;
+    private final String taskPath;
     /**
      * Root path of processes in ZK
      */
-    private String processPath;
+    private final String processPath;
     /**
      * Reentrant lock used to synchronize processing callback
      */
-    private Lock lock;
+    private final Lock lock;
     /**
      * Variable that indicates if this instance is currently owning any task.
      */
-    private AtomicBoolean currentlyOwningTask;
+    private final AtomicBoolean currentlyOwningTask;
     /**
      * Hostname where the process is running
      */
@@ -58,15 +59,17 @@ public class AssignmentFromZK implements Assignment, IZkChildListener,
     /**
      * Condition to signal taskAcquisition
      */
-    private Condition taskAcquired;
+    private final Condition taskAcquired;
     /**
-     * Holds the reference to ClusterNode which points to the current partition
-     * owned
+     * Holds the reference to ClusterNode which points to the current partition owned
      */
     AtomicReference<ClusterNode> clusterNodeRef;
 
-    public AssignmentFromZK(String clusterName, String zookeeperAddress,
-            int sessionTimeout, int connectionTimeout) throws Exception {
+    @Inject
+    public AssignmentFromZK(@Named("cluster.name") String clusterName,
+            @Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception
{
         this.clusterName = clusterName;
         taskPath = "/" + clusterName + "/" + "tasks";
         processPath = "/" + clusterName + "/" + "process";
@@ -82,8 +85,7 @@ public class AssignmentFromZK implements Assignment, IZkChildListener,
             machineId = "UNKNOWN";
         }
 
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout,
-                connectionTimeout);
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
         ZkSerializer serializer = new ZNRecordSerializer();
         zkClient.setZkSerializer(serializer);
         zkClient.subscribeStateChanges(this);
@@ -119,14 +121,12 @@ public class AssignmentFromZK implements Assignment, IZkChildListener,
     }
 
     @Override
-    public void handleChildChange(String parentPath, List<String> currentChilds)
-            throws Exception {
+    public void handleChildChange(String parentPath, List<String> currentChilds) throws
Exception {
         doProcess();
     }
 
     /**
-     * One method to do any processing if there is a change in ZK, all callbacks
-     * will be processed sequentially
+     * One method to do any processing if there is a change in ZK, all callbacks will be
processed sequentially
      */
     private void doProcess() {
         lock.lock();
@@ -147,14 +147,12 @@ public class AssignmentFromZK implements Assignment, IZkChildListener,
     }
 
     /**
-     * Core method where the task acquisition happens. Algo is as follow Get All
-     * the tasks<br/>
+     * Core method where the task acquisition happens. Algo is as follow Get All the tasks<br/>
      * Get All the processes<br/>
      * Check if the number of process is less than task<br/>
      * Iterate over the tasks and pick up one that is not yet acquire<br/>
      * 
-     * If the creation of ephemeral process node is successful then task
-     * acquisition is successful
+     * If the creation of ephemeral process node is successful then task acquisition is successful
      */
     private void tryToAcquiretask() {
         List<String> tasks = zkClient.getChildren(taskPath);
@@ -168,19 +166,15 @@ public class AssignmentFromZK implements Assignment, IZkChildListener,
                     continue;
                 }
                 if (!zkClient.exists(processPath + "/" + taskName)) {
-                    ZNRecord task = zkClient
-                            .readData(taskPath + "/" + taskName);
+                    ZNRecord task = zkClient.readData(taskPath + "/" + taskName);
                     ZNRecord process = new ZNRecord(task);
                     process.setSimpleField("host", machineId);
-                    process.setSimpleField("session",
-                            String.valueOf(zkClient.getSessionId()));
+                    process.setSimpleField("session", String.valueOf(zkClient.getSessionId()));
                     try {
-                        zkClient.createEphemeral(processPath + "/" + taskName,
-                                process);
+                        zkClient.createEphemeral(processPath + "/" + taskName, process);
 
                     } catch (Throwable e) {
-                        logger.warn("Exception trying to acquire task:"
-                                + taskName
+                        logger.warn("Exception trying to acquire task:" + taskName
                                 + " This is warning and can be ignored. " + e);
                         // Any exception does not means we failed to acquire
                         // task because we might have acquired task but there
@@ -190,19 +184,13 @@ public class AssignmentFromZK implements Assignment, IZkChildListener,
                     }
                     // check if the process node is created and we own it
                     Stat stat = zkClient.getStat(processPath + "/" + taskName);
-                    if (stat != null
-                            && stat.getEphemeralOwner() == zkClient
-                                    .getSessionId()) {
-                        logger.info("Successfully acquired task:" + taskName
-                                + " by " + machineId);
-                        int partition = Integer.parseInt(process
-                                .getSimpleField("partition"));
+                    if (stat != null && stat.getEphemeralOwner() == zkClient.getSessionId())
{
+                        logger.info("Successfully acquired task:" + taskName + " by " + machineId);
+                        int partition = Integer.parseInt(process.getSimpleField("partition"));
                         String host = process.getSimpleField("host");
-                        int port = Integer.parseInt(process
-                                .getSimpleField("port"));
+                        int port = Integer.parseInt(process.getSimpleField("port"));
                         String taskId = process.getSimpleField("taskId");
-                        ClusterNode node = new ClusterNode(partition, port,
-                                host, taskId);
+                        ClusterNode node = new ClusterNode(partition, port, host, taskId);
                         clusterNodeRef.set(node);
                         currentlyOwningTask.set(true);
                         taskAcquired.signalAll();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6450a49/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
index 009ce1f..0be6686 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
@@ -19,30 +19,33 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TopologyFromZK implements Topology, IZkChildListener,
-        IZkStateListener, IZkDataListener {
-    private static final Logger logger = LoggerFactory
-            .getLogger(TopologyFromZK.class);
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class TopologyFromZK implements Topology, IZkChildListener, IZkStateListener, IZkDataListener
{
+    private static final Logger logger = LoggerFactory.getLogger(TopologyFromZK.class);
     private final String clusterName;
-    private AtomicReference<Cluster> clusterRef;
-    private List<TopologyChangeListener> listeners;
+    private final AtomicReference<Cluster> clusterRef;
+    private final List<TopologyChangeListener> listeners;
     private KeeperState state;
-    private ZkClient zkClient;
-    private String taskPath;
-    private String processPath;
-    private Lock lock;
+    private final ZkClient zkClient;
+    private final String taskPath;
+    private final String processPath;
+    private final Lock lock;
     private AtomicBoolean currentlyOwningTask;
     private String machineId;
 
-    public TopologyFromZK(String clusterName, String zookeeperAddress,
-            int sessionTimeout, int connectionTimeout) throws Exception {
+    @Inject
+    public TopologyFromZK(@Named("cluster.name") String clusterName,
+            @Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception
{
         this.clusterName = clusterName;
         taskPath = "/" + clusterName + "/" + "tasks";
         processPath = "/" + clusterName + "/" + "process";
         lock = new ReentrantLock();
         clusterRef = new AtomicReference<Cluster>();
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout,
-                connectionTimeout);
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
         ZkSerializer serializer = new ZNRecordSerializer();
         zkClient.setZkSerializer(serializer);
         listeners = new ArrayList<TopologyChangeListener>();
@@ -101,8 +104,7 @@ public class TopologyFromZK implements Topology, IZkChildListener,
     }
 
     @Override
-    public void handleChildChange(String parentPath, List<String> currentChilds)
-            throws Exception {
+    public void handleChildChange(String parentPath, List<String> currentChilds) throws
Exception {
         doProcess();
     }
 
@@ -122,21 +124,17 @@ public class TopologyFromZK implements Topology, IZkChildListener,
         List<String> tasks = zkClient.getChildren(taskPath);
         Cluster cluster = new Cluster(tasks.size());
         for (int i = 0; i < processes.size(); i++) {
-            ZNRecord process = zkClient.readData(
-                    processPath + "/" + processes.get(i), true);
+            ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
             if (process != null) {
-                int partition = Integer.parseInt(process
-                        .getSimpleField("partition"));
+                int partition = Integer.parseInt(process.getSimpleField("partition"));
                 String host = process.getSimpleField("host");
                 int port = Integer.parseInt(process.getSimpleField("port"));
                 String taskId = process.getSimpleField("taskId");
-                ClusterNode node = new ClusterNode(partition, port, host,
-                        taskId);
+                ClusterNode node = new ClusterNode(partition, port, host, taskId);
                 cluster.addNode(node);
             }
         }
-        logger.info("Changing cluster topology to " + cluster + " from "
-                + clusterRef.get());
+        logger.info("Changing cluster topology to " + cluster + " from " + clusterRef.get());
         clusterRef.set(cluster);
         // Notify all changeListeners about the topology change
         for (TopologyChangeListener listener : listeners) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6450a49/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 31e86f1..e365799 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -12,44 +12,43 @@ import org.apache.s4.base.Emitter;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.topology.Topology;
 import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.jboss.netty.channel.Channel;
 
+import com.google.common.collect.HashBiMap;
 import com.google.inject.Inject;
 
-
 public class UDPEmitter implements Emitter, TopologyChangeListener {
     private DatagramSocket socket;
-    private ClusterNode[] partitions;
-    private Map<Integer, InetAddress> inetCache = new HashMap<Integer, InetAddress>();
-    private long messageDropInQueueCount = 0;
-    private Topology topology;
-    
+    private final HashBiMap<Integer, ClusterNode> nodes;
+    private final Map<Integer, InetAddress> inetCache = new HashMap<Integer, InetAddress>();
+    private final long messageDropInQueueCount = 0;
+    private final Topology topology;
+
     public long getMessageDropInQueueCount() {
         return messageDropInQueueCount;
     }
-    
+
     @Inject
     public UDPEmitter(Topology topology) {
         this.topology = topology;
         topology.addListener(this);
-        partitions = new ClusterNode[topology.getTopology().getNodes().size()];
+        nodes = HashBiMap.create(topology.getTopology().getNodes().size());
         for (ClusterNode node : topology.getTopology().getNodes()) {
-            partitions[node.getPartition()] = node;
+            nodes.forcePut(node.getPartition(), node);
         }
-        
+
         try {
             socket = new DatagramSocket();
         } catch (SocketException se) {
             throw new RuntimeException(se);
         }
     }
-    
+
+    @Override
     public void send(int partitionId, byte[] message) {
         try {
-            ClusterNode node = null;
-            if (partitionId < partitions.length) {
-                node = partitions[partitionId];
-            }
-            else {
+            ClusterNode node = nodes.get(partitionId);
+            if (node == null) {
                 throw new RuntimeException(String.format("Bad partition id %d", partitionId));
             }
             byte[] byteBuffer = new byte[message.length];
@@ -59,19 +58,26 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
                 inetAddress = InetAddress.getByName(node.getMachineName());
                 inetCache.put(partitionId, inetAddress);
             }
-            DatagramPacket dp = new DatagramPacket(byteBuffer,
-                    byteBuffer.length, inetAddress, node.getPort());
+            DatagramPacket dp = new DatagramPacket(byteBuffer, byteBuffer.length, inetAddress,
node.getPort());
             socket.send(dp);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
-    
+
+    @Override
     public int getPartitionCount() {
         return topology.getTopology().getPartitionCount();
     }
-    
+
+    @Override
     public void onChange() {
-        // do nothing on change of Topology, for now
+        // topology changes when processes pick tasks
+        synchronized (nodes) {
+            for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+                Integer partition = clusterNode.getPartition();
+                nodes.put(partition, clusterNode);
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6450a49/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleModule.java b/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleModule.java
index cfd6862..205b8ac 100644
--- a/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleModule.java
+++ b/subprojects/s4-core/src/test/java/test/s4/core/apploading/SimpleModule.java
@@ -1,7 +1,7 @@
 package test.s4.core.apploading;
 
-import test.s4.fixtures.GenericTestModule;
+import test.s4.fixtures.FileBasedClusterManagementTestModule;
 
-public class SimpleModule extends GenericTestModule<SimpleApp> {
+public class SimpleModule extends FileBasedClusterManagementTestModule<SimpleApp> {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6450a49/subprojects/s4-core/src/test/java/test/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/fixtures/FileBasedClusterManagementTestModule.java
b/subprojects/s4-core/src/test/java/test/s4/fixtures/FileBasedClusterManagementTestModule.java
new file mode 100644
index 0000000..c7e6c40
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/test/s4/fixtures/FileBasedClusterManagementTestModule.java
@@ -0,0 +1,78 @@
+package test.s4.fixtures;
+
+import java.io.InputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public abstract class FileBasedClusterManagementTestModule<T> extends AbstractModule
{
+
+    protected PropertiesConfiguration config = null;
+    private final Class<?> appClass;
+
+    protected FileBasedClusterManagementTestModule() {
+        // infer actual app class through "super type tokens" (this simple code
+        // assumes actual module class is a direct subclass from this one)
+        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+        Type[] fieldArgTypes = pt.getActualTypeArguments();
+        this.appClass = (Class<?>) fieldArgTypes[0];
+    }
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+            config.setProperty("cluster.lock_dir",
+                    config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        bind(appClass);
+        bind(Cluster.class);
+        bind(Hasher.class).to(DefaultHasher.class);
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        bind(Assignment.class).to(AssignmentFromFile.class);
+        bind(Topology.class).to(TopologyFromFile.class);
+        bind(Emitter.class).to(UDPEmitter.class);
+        bind(Listener.class).to(UDPListener.class);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6450a49/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountModule.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountModule.java
index cb659db..9e04f4b 100644
--- a/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountModule.java
+++ b/subprojects/s4-core/src/test/java/test/s4/wordcount/WordCountModule.java
@@ -1,7 +1,7 @@
 package test.s4.wordcount;
 
-import test.s4.fixtures.GenericTestModule;
+import test.s4.fixtures.FileBasedClusterManagementTestModule;
 
-public class WordCountModule extends GenericTestModule<WordCountApp> {
+public class WordCountModule extends FileBasedClusterManagementTestModule<WordCountApp>
{
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6450a49/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
index 5fc183b..0e31dfa 100644
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ b/subprojects/s4-core/src/test/resources/default.s4.properties
@@ -2,4 +2,8 @@ comm.queue_emmiter_size = 8000
 comm.queue_listener_size = 8000
 cluster.hosts = localhost
 cluster.ports = 5077
-cluster.lock_dir = {user.dir}/tmp
\ No newline at end of file
+cluster.lock_dir = {user.dir}/tmp
+cluster.name = s4-test-cluster
+cluster.zk_address = localhost:2181
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
\ No newline at end of file


Mime
View raw message