incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [2/2] git commit: [S4-110] Formatting code to make the diff look better
Date Thu, 03 Jan 2013 08:02:44 GMT
Updated Branches:
  refs/heads/S4-110 ac0dc74b7 -> 9661f2700


[S4-110] Formatting code to make the diff look better


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/9661f270
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/9661f270
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/9661f270

Branch: refs/heads/S4-110
Commit: 9661f270084400b9b40eeb608f3b02dabc38508f
Parents: ac0dc74
Author: Kishore Gopalakrishna <g.kishore@gmail.com>
Authored: Thu Jan 3 00:01:55 2013 -0800
Committer: Kishore Gopalakrishna <g.kishore@gmail.com>
Committed: Thu Jan 3 00:01:55 2013 -0800

----------------------------------------------------------------------
 .../org/apache/s4/comm/HelixBasedCommModule.java   |    5 +-
 .../org/apache/s4/comm/RemoteEmitterFactory.java   |    2 +-
 .../org/apache/s4/comm/helix/S4StateModel.java     |   87 +-
 .../s4/comm/helix/TaskStateModelFactory.java       |    3 +-
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   54 +-
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |  146 ++--
 .../s4/comm/topology/AssignmentFromHelix.java      |  214 ++--
 .../apache/s4/comm/topology/AssignmentFromZK.java  |    2 +-
 .../apache/s4/comm/topology/ClusterFromHelix.java  |  295 +++---
 .../org/apache/s4/comm/topology/ClusterFromZK.java |   12 +-
 .../org/apache/s4/comm/topology/ClusterNode.java   |    2 +-
 .../apache/s4/comm/topology/ClustersFromHelix.java |    3 -
 .../apache/s4/comm/topology/PhysicalCluster.java   |    5 +-
 .../org/apache/s4/comm/topology/RemoteStreams.java |   15 +-
 .../s4/comm/topology/RemoteStreamsManager.java     |   27 +-
 .../s4/comm/topology/RemoteStreamsManagerImpl.java |   37 +-
 .../s4/comm/topology/ZNRecordSerializer.java       |    6 +-
 .../java/org/apache/s4/comm/topology/ZkClient.java |  120 +-
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |   11 +-
 .../org/apache/s4/comm/udp/UDPRemoteEmitter.java   |    2 +-
 .../java/org/apache/s4/core/DefaultCoreModule.java |    8 +-
 .../src/main/java/org/apache/s4/core/Main.java     |    9 +-
 .../main/java/org/apache/s4/core/RemoteSender.java |    5 +-
 .../java/org/apache/s4/core/RemoteSenders.java     |    6 +-
 .../org/apache/s4/core/RemoteSendersManager.java   |    7 +-
 .../apache/s4/core/RemoteSendersManagerImpl.java   |   14 +-
 .../java/org/apache/s4/deploy/AppStateModel.java   |  139 +--
 .../org/apache/s4/deploy/AppStateModelFactory.java |   28 +-
 .../s4/deploy/HelixBasedDeploymentManager.java     |   47 +-
 .../main/java/org/apache/s4/tools/AddNodes.java    |    8 +-
 .../java/org/apache/s4/tools/CreateCluster.java    |    4 +-
 .../main/java/org/apache/s4/tools/DeployApp.java   |    6 +-
 .../org/apache/s4/tools/GenericEventAdapter.java   |   89 +-
 .../java/org/apache/s4/tools/RebalanceTask.java    |   65 +-
 .../main/java/org/apache/s4/tools/S4Status.java    |  858 +++++++--------
 .../src/main/java/org/apache/s4/tools/Status.java  |    2 +-
 .../src/main/java/org/apache/s4/tools/Tools.java   |   19 +-
 37 files changed, 1048 insertions(+), 1314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index c9cfcb7..cf2cd40 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -33,7 +33,7 @@ import com.google.inject.Binder;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
-public class HelixBasedCommModule extends AbstractModule{
+public class HelixBasedCommModule extends AbstractModule {
 
     private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
     InputStream commConfigInputStream;
@@ -74,7 +74,8 @@ public class HelixBasedCommModule extends AbstractModule{
 
         // a node holds a single partition assignment
         // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
-        bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(TaskStateModelFactory.class);
+        bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
+                TaskStateModelFactory.class);
         bind(Assignment.class).to(AssignmentFromHelix.class);
         bind(Cluster.class).to(ClusterFromHelix.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
index 5d79859..4c89aab 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
@@ -24,7 +24,7 @@ import org.apache.s4.comm.topology.Cluster;
 /**
  * Used for creating RemoteEmitter instances depending on the cluster configuration. Follows the "assisted injection"
  * pattern from Guice 3.
- *
+ * 
  */
 public interface RemoteEmitterFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
index 9eebd06..ea35bd6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
@@ -10,52 +10,45 @@ import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 
 @StateModelInfo(states = { "LEADER,STANDBY" }, initialState = "OFFLINE")
-public class S4StateModel extends StateModel
-{
-  private static Logger logger = LoggerFactory.getLogger(S4StateModel.class);
-
-  private final String streamName;
-  private final String partitionId;
-
-  public S4StateModel(String partitionName)
-  {
-    String[] parts = partitionName.split("_");
-    this.streamName = parts[0];
-    this.partitionId = parts[1];
-  }
-
-  @Transition(from = "OFFLINE", to = "STANDBY")
-  public void becomeLeaderFromOffline(Message msg, NotificationContext context)
-  {
-    logger.info("Transitioning from " + msg.getFromState() + " to "
-        + msg.getToState() + "for " + msg.getPartitionName());
-  }
-
-  @Transition(from = "STANDBY", to = "LEADER")
-  public void becomeLeaderFromStandby(Message msg, NotificationContext context)
-  {
-    logger.info("Transitioning from " + msg.getFromState() + " to "
-        + msg.getToState() + "for " + msg.getPartitionName());
-  }
-
-  @Transition(from = "LEADER", to = "STANDBY")
-  public void becomeStandbyFromLeader(Message msg, NotificationContext context)
-  {
-    logger.info("Transitioning from " + msg.getFromState() + " to "
-        + msg.getToState() + "for " + msg.getPartitionName());
-  }
-
-  @Transition(from = "STANDBY", to = "OFFLINE")
-  public void becomeOfflineFromStandby(Message msg, NotificationContext context)
-  {
-    logger.info("Transitioning from " + msg.getFromState() + " to "
-        + msg.getToState() + "for " + msg.getPartitionName());
-  }
-  
-  @Transition(from = "OFFLINE", to = "DROPPED")
-  public void dropPartition(Message msg, NotificationContext context)
-  {
-    logger.info("Dropping partition" + msg.getPartitionName());
-  }
+public class S4StateModel extends StateModel {
+    private static Logger logger = LoggerFactory.getLogger(S4StateModel.class);
+
+    private final String streamName;
+    private final String partitionId;
+
+    public S4StateModel(String partitionName) {
+        String[] parts = partitionName.split("_");
+        this.streamName = parts[0];
+        this.partitionId = parts[1];
+    }
+
+    @Transition(from = "OFFLINE", to = "STANDBY")
+    public void becomeLeaderFromOffline(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
+
+    @Transition(from = "STANDBY", to = "LEADER")
+    public void becomeLeaderFromStandby(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
+
+    @Transition(from = "LEADER", to = "STANDBY")
+    public void becomeStandbyFromLeader(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
+
+    @Transition(from = "STANDBY", to = "OFFLINE")
+    public void becomeOfflineFromStandby(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
+
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void dropPartition(Message msg, NotificationContext context) {
+        logger.info("Dropping partition" + msg.getPartitionName());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
index eef64ea..3b885a8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
@@ -2,8 +2,7 @@ package org.apache.s4.comm.helix;
 
 import org.apache.helix.participant.statemachine.StateModelFactory;
 
-
-public class TaskStateModelFactory extends StateModelFactory<S4StateModel>{
+public class TaskStateModelFactory extends StateModelFactory<S4StateModel> {
 
     @Override
     public S4StateModel createNewStateModel(String partitionName) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index a0e5002..120eee3 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -65,8 +65,7 @@ import org.apache.helix.model.InstanceConfig;
  */
 
 public class TCPEmitter implements Emitter, ClusterChangeListener {
-    private static final Logger logger = LoggerFactory
-            .getLogger(TCPEmitter.class);
+    private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
 
     private final int nettyTimeout;
 
@@ -87,24 +86,21 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     private final Lock lock;
 
     @Inject
-    SerializerDeserializer serDeser= new KryoSerDeser();
+    SerializerDeserializer serDeser = new KryoSerDeser();
 
     @Inject
-    public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout)
-            throws InterruptedException {
+    public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout) throws InterruptedException {
         this.nettyTimeout = timeout;
         this.topology = topology;
         this.lock = new ReentrantLock();
 
         // Initialize data structures
-        //int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+        // int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
         // TODO cluster can grow in size
-        nodeChannelMap = Maps.synchronizedBiMap(HashBiMap
-                .<InstanceConfig, Channel> create());
+        nodeChannelMap = Maps.synchronizedBiMap(HashBiMap.<InstanceConfig, Channel> create());
 
         // Initialize netty related structures
-        ChannelFactory factory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(),
+        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                 Executors.newCachedThreadPool());
         bootstrap = new ClientBootstrap(factory);
         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -137,19 +133,16 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         }
 
         try {
-            ChannelFuture connectFuture = this.bootstrap
-                    .connect(new InetSocketAddress(config.getHostName(),
-                            Integer.parseInt(config.getPort())));
+            ChannelFuture connectFuture = this.bootstrap.connect(new InetSocketAddress(config.getHostName(), Integer
+                    .parseInt(config.getPort())));
             connectFuture.await();
             if (connectFuture.isSuccess()) {
                 channels.add(connectFuture.getChannel());
-                nodeChannelMap
-                        .forcePut(config, connectFuture.getChannel());
+                nodeChannelMap.forcePut(config, connectFuture.getChannel());
                 return true;
             }
         } catch (InterruptedException ie) {
-            logger.error(String.format("Interrupted while connecting to %s:%d",
-                    config.getHostName(), config.getPort()));
+            logger.error(String.format("Interrupted while connecting to %s:%d", config.getHostName(), config.getPort()));
             Thread.currentThread().interrupt();
         }
         return false;
@@ -158,8 +151,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     private void sendMessage(String streamName, int partitionId, byte[] message) {
         ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
         buffer.writeBytes(message);
-        InstanceConfig config = topology
-                .getDestination(streamName, partitionId);
+        InstanceConfig config = topology.getDestination(streamName, partitionId);
         if (!nodeChannelMap.containsKey(config)) {
             if (!connectTo(config)) {
                 // Couldn't connect, discard message
@@ -176,8 +168,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     @Override
     public boolean send(int partitionId, EventMessage message) {
-        sendMessage(message.getStreamName(), partitionId,
-                serDeser.serialize(message));
+        sendMessage(message.getStreamName(), partitionId, serDeser.serialize(message));
         return true;
     }
 
@@ -188,8 +179,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         }
         c.close().addListener(new ChannelFutureListener() {
             @Override
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
+            public void operationComplete(ChannelFuture future) throws Exception {
                 if (future.isSuccess())
                     channels.remove(future.getChannel());
                 else
@@ -208,18 +198,18 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         }
     }
 
-    //@Override
+    // @Override
     public int getPartitionCount() {
         return topology.getPhysicalCluster().getPartitionCount();
     }
+
     public int getPartitionCount(String streamName) {
-    return topology.getPhysicalCluster().getPartitionCount();
-  }
+        return topology.getPhysicalCluster().getPartitionCount();
+    }
 
     class ExceptionHandler extends SimpleChannelUpstreamHandler {
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-                throws Exception {
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
             Throwable t = e.getCause();
             if (t instanceof ClosedChannelException) {
                 nodeChannelMap.inverse().remove(e.getChannel());
@@ -248,10 +238,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
                 try {
                     // TODO handle possible cluster reconfiguration between send
                     // and failure callback
-                    logger.warn(
-                            "Failed to send message to node {} (according to current cluster information)",
-                            topology.getPhysicalCluster().getNodes()
-                                    .get(partitionId));
+                    logger.warn("Failed to send message to node {} (according to current cluster information)",
+                            topology.getPhysicalCluster().getNodes().get(partitionId));
                 } catch (IndexOutOfBoundsException ignored) {
                     // cluster was changed
                 }
@@ -262,6 +250,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     @Override
     public void onChange() {
-        
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 952306a..dfe98bf 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -34,100 +34,80 @@ import org.apache.helix.tools.StateModelConfigGenerator;
  * Used for defining and dimensioning logical clusters in Zookeeper.
  * 
  */
-public class TaskSetup
-{
+public class TaskSetup {
 
-  private ZkClient zkclient;
-  private boolean isHelixEnabled = true;
-  private HelixAdmin helixAdmin;
-  org.apache.helix.manager.zk.ZkClient helixZkClient;
+    private ZkClient zkclient;
+    private boolean isHelixEnabled = true;
+    private HelixAdmin helixAdmin;
+    org.apache.helix.manager.zk.ZkClient helixZkClient;
 
-  public TaskSetup(String zookeeperAddress)
-  {
-    if (isHelixEnabled)
-    {
-      helixAdmin = new ZKHelixAdmin(zookeeperAddress);
-    } else
-    {
-      zkclient = new ZkClient(zookeeperAddress);
-      zkclient.setZkSerializer(new ZNRecordSerializer());
-      if (!zkclient.waitUntilConnected(10, TimeUnit.SECONDS))
-      {
-        throw new RuntimeException(
-            "Could not connect to ZooKeeper after 10 seconds.");
-      }
+    public TaskSetup(String zookeeperAddress) {
+        if (isHelixEnabled) {
+            helixAdmin = new ZKHelixAdmin(zookeeperAddress);
+        } else {
+            zkclient = new ZkClient(zookeeperAddress);
+            zkclient.setZkSerializer(new ZNRecordSerializer());
+            if (!zkclient.waitUntilConnected(10, TimeUnit.SECONDS)) {
+                throw new RuntimeException("Could not connect to ZooKeeper after 10 seconds.");
+            }
+        }
     }
-  }
 
-  public void clean(String clusterName)
-  {
-    if (isHelixEnabled)
-    {
-      helixAdmin.dropCluster(clusterName);
-    } else
-    {
-      zkclient.deleteRecursive("/s4/clusters/" + clusterName);
+    public void clean(String clusterName) {
+        if (isHelixEnabled) {
+            helixAdmin.dropCluster(clusterName);
+        } else {
+            zkclient.deleteRecursive("/s4/clusters/" + clusterName);
+        }
     }
-  }
 
-  public void setup(String cluster, int tasks, int initialPort)
-  {
-    if (isHelixEnabled)
-    {
-      helixAdmin.addCluster(cluster, false);
-      StateModelDefinition onlineofflinemodel = new StateModelDefinition(
-          new StateModelConfigGenerator().generateConfigForOnlineOffline());
-      StateModelDefinition leaderstandbymodel = new StateModelDefinition(
-          new StateModelConfigGenerator().generateConfigForLeaderStandby());
+    public void setup(String cluster, int tasks, int initialPort) {
+        if (isHelixEnabled) {
+            helixAdmin.addCluster(cluster, false);
+            StateModelDefinition onlineofflinemodel = new StateModelDefinition(
+                    new StateModelConfigGenerator().generateConfigForOnlineOffline());
+            StateModelDefinition leaderstandbymodel = new StateModelDefinition(
+                    new StateModelConfigGenerator().generateConfigForLeaderStandby());
 
-      helixAdmin.addStateModelDef(cluster, "OnlineOffline", onlineofflinemodel);
-      helixAdmin.addStateModelDef(cluster, "LeaderStandby", leaderstandbymodel);
-      
-      for (int i = 0; i < tasks; i++)
-      {
-        InstanceConfig instanceConfig = new InstanceConfig("localhost_"
-            + initialPort);
-        instanceConfig.setHostName("localhost");
-        instanceConfig.setPort("" + initialPort);
-        helixAdmin.addInstance(cluster, instanceConfig);
-        initialPort = initialPort + 1;
-      }
+            helixAdmin.addStateModelDef(cluster, "OnlineOffline", onlineofflinemodel);
+            helixAdmin.addStateModelDef(cluster, "LeaderStandby", leaderstandbymodel);
 
-      return;
-    }
-    try
-    {
-      zkclient.createPersistent("/s4/streams", true);
-    } catch (ZkException ignored)
-    {
-      // ignore if exists
-    }
+            for (int i = 0; i < tasks; i++) {
+                InstanceConfig instanceConfig = new InstanceConfig("localhost_" + initialPort);
+                instanceConfig.setHostName("localhost");
+                instanceConfig.setPort("" + initialPort);
+                helixAdmin.addInstance(cluster, instanceConfig);
+                initialPort = initialPort + 1;
+            }
+
+            return;
+        }
+        try {
+            zkclient.createPersistent("/s4/streams", true);
+        } catch (ZkException ignored) {
+            // ignore if exists
+        }
 
-    zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
-    zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
-    zkclient.createPersistent("/s4/clusters/" + cluster + "/app", true);
-    for (int i = 0; i < tasks; i++)
-    {
-      String taskId = "Task-" + i;
-      ZNRecord record = new ZNRecord(taskId);
-      record.putSimpleField("taskId", taskId);
-      record.putSimpleField("port", String.valueOf(initialPort + i));
-      record.putSimpleField("partition", String.valueOf(i));
-      record.putSimpleField("cluster", cluster);
-      zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks/" + taskId,
-          record);
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/app", true);
+        for (int i = 0; i < tasks; i++) {
+            String taskId = "Task-" + i;
+            ZNRecord record = new ZNRecord(taskId);
+            record.putSimpleField("taskId", taskId);
+            record.putSimpleField("port", String.valueOf(initialPort + i));
+            record.putSimpleField("partition", String.valueOf(i));
+            record.putSimpleField("cluster", cluster);
+            zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks/" + taskId, record);
+        }
     }
-  }
 
-  public void disconnect()
-  {
-    if (isHelixEnabled)
-    {
-      helixZkClient.close();
-    } else
-    {
-      zkclient.close();
+    public void disconnect() {
+        if (isHelixEnabled) {
+            helixZkClient.close();
+        } else {
+            zkclient.close();
+        }
     }
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
index 7e0b4ac..243faac 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -35,135 +35,109 @@ import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 
 @Singleton
-public class AssignmentFromHelix implements Assignment
-{
-  private static final Logger logger = LoggerFactory
-      .getLogger(AssignmentFromHelix.class);
+public class AssignmentFromHelix implements Assignment {
+    private static final Logger logger = LoggerFactory.getLogger(AssignmentFromHelix.class);
 
-  private String clusterName;
-  private final String zookeeperAddress;
-  private String machineId;
-  private HelixManager zkHelixManager;
-  
-  private HelixDataAccessor helixDataAccessor;
-  AtomicReference<ClusterNode> clusterNodeRef;
-  private final Lock lock;
-  private final AtomicBoolean currentlyOwningTask;
-  private final Condition taskAcquired;
+    private String clusterName;
+    private final String zookeeperAddress;
+    private String machineId;
+    private HelixManager zkHelixManager;
 
-  private final StateModelFactory<? extends StateModel> taskStateModelFactory;
-  //private final StateModelFactory<? extends StateModel> appStateModelFactory;
+    private HelixDataAccessor helixDataAccessor;
+    AtomicReference<ClusterNode> clusterNodeRef;
+    private final Lock lock;
+    private final AtomicBoolean currentlyOwningTask;
+    private final Condition taskAcquired;
 
-  @Inject
-  public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
-                             @Named("s4.instance.name") String instanceName,
-                             @Named("s4.cluster.zk_address") String zookeeperAddress 
-                             ) throws Exception
-  {
-    this.taskStateModelFactory = new TaskStateModelFactory();
-//    this.appStateModelFactory = appStateModelFactory;
-    this.clusterName = clusterName;
-    this.zookeeperAddress = zookeeperAddress;
-    machineId = "localhost";
-    lock = new ReentrantLock();
-    ZkClient zkClient = new ZkClient(zookeeperAddress);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
-    zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
-    BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(
-        zkClient);
-    helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
-    clusterNodeRef = new AtomicReference<ClusterNode>();
-    taskAcquired = lock.newCondition();
-    currentlyOwningTask = new AtomicBoolean(true);
-    try
-    {
-      machineId = InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException e)
-    {
-      logger.warn("Unable to get hostname", e);
-      machineId = "UNKNOWN";
-    }
-    ClusterNode node = new ClusterNode(-1,
-        Integer.parseInt(instanceName.split("_")[1]), machineId,
-        instanceName);
-    clusterNodeRef.set(node);
-    currentlyOwningTask.set(true);
-  }
+    private final StateModelFactory<? extends StateModel> taskStateModelFactory;
+
+    // private final StateModelFactory<? extends StateModel> appStateModelFactory;
 
-  @Inject
-  public void init()
-  {
-    //joinCluster();
-  }
+    @Inject
+    public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress)
+            throws Exception {
+        this.taskStateModelFactory = new TaskStateModelFactory();
+        // this.appStateModelFactory = appStateModelFactory;
+        this.clusterName = clusterName;
+        this.zookeeperAddress = zookeeperAddress;
+        machineId = "localhost";
+        lock = new ReentrantLock();
+        ZkClient zkClient = new ZkClient(zookeeperAddress);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
+        BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+        helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+        clusterNodeRef = new AtomicReference<ClusterNode>();
+        taskAcquired = lock.newCondition();
+        currentlyOwningTask = new AtomicBoolean(true);
+        try {
+            machineId = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            logger.warn("Unable to get hostname", e);
+            machineId = "UNKNOWN";
+        }
+        ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
+        clusterNodeRef.set(node);
+        currentlyOwningTask.set(true);
+    }
 
-  @Override
-  public ClusterNode assignClusterNode()
-  {
-    lock.lock();
-    try
-    {
-      while (!currentlyOwningTask.get())
-      {
-        taskAcquired.awaitUninterruptibly();
-      }
-    } catch (Exception e)
-    {
-      logger.error("Exception while waiting to join the cluster");
-      return null;
-    } finally
-    {
-      lock.unlock();
+    @Inject
+    public void init() {
+        // joinCluster();
     }
-    return clusterNodeRef.get();
-  }
 
-  public void joinClusterOld()
-  {
-    lock.lock();
-    try
-    {
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      do
-      {
-        List<InstanceConfig> instances = helixDataAccessor
-            .getChildValues(keyBuilder.instanceConfigs());
-        List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
-            .liveInstances());
-        for (InstanceConfig instanceConfig : instances)
-        {
-          String instanceName = instanceConfig.getInstanceName();
-          if (!liveInstances.contains(instanceName))
-          {
-            zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
-                instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
-            zkHelixManager.getStateMachineEngine().registerStateModelFactory(
-                "LeaderStandby", taskStateModelFactory);
-          
-            zkHelixManager.connect();
-            ClusterNode node = new ClusterNode(-1,
-                Integer.parseInt(instanceConfig.getPort()), machineId,
-                instanceName);
-            clusterNodeRef.set(node);
-            currentlyOwningTask.set(true);
-            taskAcquired.signalAll();
-            break;
-          }
+    @Override
+    public ClusterNode assignClusterNode() {
+        lock.lock();
+        try {
+            while (!currentlyOwningTask.get()) {
+                taskAcquired.awaitUninterruptibly();
+            }
+        } catch (Exception e) {
+            logger.error("Exception while waiting to join the cluster");
+            return null;
+        } finally {
+            lock.unlock();
         }
-        if (instances.size() == liveInstances.size())
-        {
-          System.out
-              .println("No more nodes can join the cluster. Will wait for some node to die.");
-          Thread.sleep(100000);
+        return clusterNodeRef.get();
+    }
+
+    public void joinClusterOld() {
+        lock.lock();
+        try {
+            Builder keyBuilder = helixDataAccessor.keyBuilder();
+            do {
+                List<InstanceConfig> instances = helixDataAccessor.getChildValues(keyBuilder.instanceConfigs());
+                List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+                for (InstanceConfig instanceConfig : instances) {
+                    String instanceName = instanceConfig.getInstanceName();
+                    if (!liveInstances.contains(instanceName)) {
+                        zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+                                InstanceType.PARTICIPANT, zookeeperAddress);
+                        zkHelixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
+                                taskStateModelFactory);
+
+                        zkHelixManager.connect();
+                        ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceConfig.getPort()), machineId,
+                                instanceName);
+                        clusterNodeRef.set(node);
+                        currentlyOwningTask.set(true);
+                        taskAcquired.signalAll();
+                        break;
+                    }
+                }
+                if (instances.size() == liveInstances.size()) {
+                    System.out.println("No more nodes can join the cluster. Will wait for some node to die.");
+                    Thread.sleep(100000);
+                }
+            } while (!currentlyOwningTask.get());
+            System.out.println("Joined the cluster:" + clusterName + " as " + clusterNodeRef.get().getTaskId());
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            lock.unlock();
         }
-      } while (!currentlyOwningTask.get());
-      System.out.println("Joined the cluster:"+ clusterName +" as "+ clusterNodeRef.get().getTaskId());
-    } catch (Exception e)
-    {
-      e.printStackTrace();
-    } finally
-    {
-      lock.unlock();
     }
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 0b81854..c482624 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -44,7 +44,7 @@ import com.google.inject.name.Named;
 
 /**
  * Handles partition assignment through Zookeeper.
- *
+ * 
  */
 @Singleton
 public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 3226fab..19e0628 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -46,172 +46,145 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
 
 /**
- * Represents a logical cluster definition fetched from Zookeeper. Notifies
- * listeners of runtime changes in the configuration.
+ * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
+ * configuration.
  * 
  */
-public class ClusterFromHelix extends RoutingTableProvider implements Cluster
-{
-
-  private static Logger logger = LoggerFactory
-      .getLogger(ClusterFromHelix.class);
-
-  private final String clusterName;
-  private final AtomicReference<PhysicalCluster> clusterRef;
-  private final List<ClusterChangeListener> listeners;
-  private final Lock lock;
-  private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
-
-  /**
-   * only the local topology
-   */
-  @Inject
-  public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
-      @Named("s4.cluster.zk_address") String zookeeperAddress,
-      @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-      @Named("s4.cluster.zk_connection_timeout") int connectionTimeout)
-      throws Exception
-  {
-    this.clusterName = clusterName;
-    Map<String, Integer> map = Collections.emptyMap();
-    partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
-    this.clusterRef = new AtomicReference<PhysicalCluster>();
-    this.listeners = new ArrayList<ClusterChangeListener>();
-    lock = new ReentrantLock();
-
-  }
-
-  /**
-   * any topology
-   */
-  public ClusterFromHelix(String clusterName, ZkClient zkClient,
-      String machineId)
-  {
-    this.clusterName = clusterName;
-    Map<String, Integer> map = Collections.emptyMap();
-    partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
-    this.clusterRef = new AtomicReference<PhysicalCluster>();
-    this.listeners = new ArrayList<ClusterChangeListener>();
-    lock = new ReentrantLock();
-
-  }
-
-  @Override
-  public void onExternalViewChange(List<ExternalView> externalViewList,
-      NotificationContext changeContext)
-  {
-    lock.lock();
-    try
-    {
-      logger.info("Start:Processing change in cluster topology");
-      super.onExternalViewChange(externalViewList, changeContext);
-      HelixManager manager = changeContext.getManager();
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      ConfigAccessor configAccessor = manager.getConfigAccessor();
-      ConfigScopeBuilder builder = new ConfigScopeBuilder();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      List<String> resources = helixDataAccessor.getChildNames(keyBuilder
-          .idealStates());
-      Map<String,Integer> map = new HashMap<String, Integer>();
-      for (String resource : resources)
-      {
-        String resourceType = configAccessor.get(
-            builder.forCluster(clusterName).forResource(resource)
-                .build(), "type");
-        if("Task".equalsIgnoreCase(resourceType)){
-          String streamName = configAccessor.get(
-              builder.forCluster(clusterName).forResource(resource)
-                  .build(), "streamName");
-          IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
-          map.put(streamName, idealstate.getNumPartitions());
+public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
+
+    private static Logger logger = LoggerFactory.getLogger(ClusterFromHelix.class);
+
+    private final String clusterName;
+    private final AtomicReference<PhysicalCluster> clusterRef;
+    private final List<ClusterChangeListener> listeners;
+    private final Lock lock;
+    private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
+
+    /**
+     * only the local topology
+     */
+    @Inject
+    public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        this.clusterName = clusterName;
+        Map<String, Integer> map = Collections.emptyMap();
+        partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+        this.clusterRef = new AtomicReference<PhysicalCluster>();
+        this.listeners = new ArrayList<ClusterChangeListener>();
+        lock = new ReentrantLock();
+
+    }
+
+    /**
+     * any topology
+     */
+    public ClusterFromHelix(String clusterName, ZkClient zkClient, String machineId) {
+        this.clusterName = clusterName;
+        Map<String, Integer> map = Collections.emptyMap();
+        partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+        this.clusterRef = new AtomicReference<PhysicalCluster>();
+        this.listeners = new ArrayList<ClusterChangeListener>();
+        lock = new ReentrantLock();
+
+    }
+
+    @Override
+    public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
+        lock.lock();
+        try {
+            logger.info("Start:Processing change in cluster topology");
+            super.onExternalViewChange(externalViewList, changeContext);
+            HelixManager manager = changeContext.getManager();
+            HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+            ConfigAccessor configAccessor = manager.getConfigAccessor();
+            ConfigScopeBuilder builder = new ConfigScopeBuilder();
+            Builder keyBuilder = helixDataAccessor.keyBuilder();
+            List<String> resources = helixDataAccessor.getChildNames(keyBuilder.idealStates());
+            Map<String, Integer> map = new HashMap<String, Integer>();
+            for (String resource : resources) {
+                String resourceType = configAccessor.get(builder.forCluster(clusterName).forResource(resource).build(),
+                        "type");
+                if ("Task".equalsIgnoreCase(resourceType)) {
+                    String streamName = configAccessor.get(builder.forCluster(clusterName).forResource(resource)
+                            .build(), "streamName");
+                    IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
+                    map.put(streamName, idealstate.getNumPartitions());
+                }
+            }
+            partitionCountMapRef.set(map);
+            for (ClusterChangeListener listener : listeners) {
+                listener.onChange();
+            }
+            logger.info("End:Processing change in cluster topology");
+
+        } catch (Exception e) {
+            logger.error("", e);
+        } finally {
+            lock.unlock();
         }
-      }
-      partitionCountMapRef.set(map);
-      for (ClusterChangeListener listener : listeners)
-      {
-        listener.onChange();
-      }
-      logger.info("End:Processing change in cluster topology");
-
-    } catch (Exception e)
-    {
-      logger.error("", e);
-    } finally
-    {
-      lock.unlock();
     }
-  }
-
-  @Override
-  public PhysicalCluster getPhysicalCluster()
-  {
-    return clusterRef.get();
-  }
-
-  @Override
-  public void addListener(ClusterChangeListener listener)
-  {
-    logger.info("Adding topology change listener:" + listener);
-    listeners.add(listener);
-  }
-
-  @Override
-  public void removeListener(ClusterChangeListener listener)
-  {
-    logger.info("Removing topology change listener:" + listener);
-    listeners.remove(listener);
-  }
-
-  @Override
-  public int hashCode()
-  {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result
-        + ((clusterName == null) ? 0 : clusterName.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    ClusterFromHelix other = (ClusterFromHelix) obj;
-    if (clusterName == null)
-    {
-      if (other.clusterName != null)
-        return false;
-    } else if (!clusterName.equals(other.clusterName))
-      return false;
-    return true;
-  }
-
-  @Override
-  public InstanceConfig getDestination(String streamName, int partitionId)
-  {
-    List<InstanceConfig> instances = getInstances(streamName, streamName + "_"
-        + partitionId, "LEADER");
-    if (instances.size() == 1)
-    {
-      return instances.get(0);
-    } else
-    {
-      return null;
+
+    @Override
+    public PhysicalCluster getPhysicalCluster() {
+        return clusterRef.get();
+    }
+
+    @Override
+    public void addListener(ClusterChangeListener listener) {
+        logger.info("Adding topology change listener:" + listener);
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(ClusterChangeListener listener) {
+        logger.info("Removing topology change listener:" + listener);
+        listeners.remove(listener);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ClusterFromHelix other = (ClusterFromHelix) obj;
+        if (clusterName == null) {
+            if (other.clusterName != null)
+                return false;
+        } else if (!clusterName.equals(other.clusterName))
+            return false;
+        return true;
     }
-  }
-  
-  @Override
-  public Integer getPartitionCount(String streamName){
-    Integer numPartitions = partitionCountMapRef.get().get(streamName);
-    if(numPartitions==null){
-      return -1;
+
+    @Override
+    public InstanceConfig getDestination(String streamName, int partitionId) {
+        List<InstanceConfig> instances = getInstances(streamName, streamName + "_" + partitionId, "LEADER");
+        if (instances.size() == 1) {
+            return instances.get(0);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Integer getPartitionCount(String streamName) {
+        Integer numPartitions = partitionCountMapRef.get().get(streamName);
+        if (numPartitions == null) {
+            return -1;
+        }
+        return numPartitions;
     }
-    return numPartitions;
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index f6d4df9..3e4567a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -42,7 +42,7 @@ import com.google.inject.name.Named;
 /**
  * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
  * configuration.
- *
+ * 
  */
 public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
 
@@ -216,15 +216,13 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
     }
 
     @Override
-    public InstanceConfig getDestination(String streamName, int partitionId)
-    {
-      return null;
+    public InstanceConfig getDestination(String streamName, int partitionId) {
+        return null;
     }
 
     @Override
-    public Integer getPartitionCount(String streamName)
-    {
-      return null;
+    public Integer getPartitionCount(String streamName) {
+        return null;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
index e622107..1e117c8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
@@ -20,7 +20,7 @@ package org.apache.s4.comm.topology;
 
 /**
  * Represents an node.
- *
+ * 
  */
 public class ClusterNode {
     private int partition;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
index 217d4e7..4e5351b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
@@ -55,11 +55,8 @@ public class ClustersFromHelix implements Clusters {
         this.clusterName = clusterName;
         this.connectionTimeout = connectionTimeout;
 
-
     }
 
-  
-
     public Cluster getCluster(String clusterName) {
         return clusters.get(clusterName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index ce53c1a..cbf209e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -99,8 +99,9 @@ public class PhysicalCluster {
     }
 
     public int getPartitionCount(String stream) {
-      return numPartitions;
-  }
+        return numPartitions;
+    }
+
     /**
      * @param node
      */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index 7720bbf..7b75ff4 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -107,7 +107,9 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener, Remote
 
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.comm.topology.RemoteStreamsManager#getConsumers(java.lang.String)
      */
     @Override
@@ -186,8 +188,11 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener, Remote
         streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.s4.comm.topology.RemoteStreamsManager#addOutputStream(java.lang.String, java.lang.String, java.lang.String)
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.comm.topology.RemoteStreamsManager#addOutputStream(java.lang.String, java.lang.String,
+     * java.lang.String)
      */
     @Override
     public void addOutputStream(String appId, String clusterName, String streamName) {
@@ -219,7 +224,9 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener, Remote
         zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.comm.topology.RemoteStreamsManager#addInputStream(int, java.lang.String, java.lang.String)
      */
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
index 94c95b6..3eb6d6a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
@@ -2,22 +2,19 @@ package org.apache.s4.comm.topology;
 
 import java.util.Set;
 
-public interface RemoteStreamsManager
-{
+public interface RemoteStreamsManager {
 
-  public abstract Set<StreamConsumer> getConsumers(String streamName);
+    public abstract Set<StreamConsumer> getConsumers(String streamName);
 
-  public abstract void addOutputStream(String appId, String clusterName,
-      String streamName);
+    public abstract void addOutputStream(String appId, String clusterName, String streamName);
 
-  /**
-   * Publishes interest in a stream from an application.
-   * 
-   * @param appId
-   * @param clusterName
-   * @param streamName
-   */
-  public abstract void addInputStream(int appId, String clusterName,
-      String streamName);
+    /**
+     * Publishes interest in a stream from an application.
+     * 
+     * @param appId
+     * @param clusterName
+     * @param streamName
+     */
+    public abstract void addInputStream(int appId, String clusterName, String streamName);
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
index 957318d..845e649 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
@@ -2,26 +2,21 @@ package org.apache.s4.comm.topology;
 
 import java.util.Set;
 
-public class RemoteStreamsManagerImpl implements RemoteStreamsManager
-{
-
-  @Override
-  public Set<StreamConsumer> getConsumers(String streamName)
-  {
-    return null;
-  }
-
-  @Override
-  public void addOutputStream(String appId, String clusterName,
-      String streamName)
-  {
-    
-  }
-
-  @Override
-  public void addInputStream(int appId, String clusterName, String streamName)
-  {
-    
-  }
+public class RemoteStreamsManagerImpl implements RemoteStreamsManager {
+
+    @Override
+    public Set<StreamConsumer> getConsumers(String streamName) {
+        return null;
+    }
+
+    @Override
+    public void addOutputStream(String appId, String clusterName, String streamName) {
+
+    }
+
+    @Override
+    public void addInputStream(int appId, String clusterName, String streamName) {
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
index b55f469..9858aef 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
@@ -24,12 +24,12 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
 import com.google.common.base.Preconditions;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
 /**
  * 
  * Utility to serialize/deserialize data in ZK. <br/>
- * Using Json format and Gson library. 
- * TODO: Explore other libraries like jackson much richer features.
- * Gson needs no-arg constructor to work with without additional work
+ * Using Json format and Gson library. TODO: Explore other libraries like jackson much richer features. Gson needs
+ * no-arg constructor to work with without additional work
  */
 public class ZNRecordSerializer implements ZkSerializer {
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
index e5a8689..cab6c0f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
@@ -25,72 +25,70 @@ import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.data.Stat;
+
 /**
  * 
  * Overwriting the ZKclient since the org.I0Itec.zkclient.ZkClient does not expose some important methods
  */
 public class ZkClient extends org.I0Itec.zkclient.ZkClient {
 
-	public ZkClient(IZkConnection connection, int connectionTimeout,
-			ZkSerializer zkSerializer) {
-		super(connection, connectionTimeout, zkSerializer);
-	}
-
-	public ZkClient(IZkConnection connection, int connectionTimeout) {
-		super(connection, connectionTimeout);
-	}
-
-	public ZkClient(IZkConnection connection) {
-		super(connection);
-	}
-
-	public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
-		super(zkServers, sessionTimeout, connectionTimeout);
-	}
-
-	public ZkClient(String zkServers, int connectionTimeout) {
-		super(zkServers, connectionTimeout);
-	}
-
-	public ZkClient(String serverstring) {
-		super(serverstring);
-	}
-
-	public IZkConnection getConnection() {
-		return _connection;
-	}
-	
-	public long getSessionId(){
-		return ((ZkConnection)_connection).getZookeeper().getSessionId();
-	}
-
-	public Stat getStat(final String path) {
-		Stat stat = retryUntilConnected(new Callable<Stat>() {
-
-			@Override
-			public Stat call() throws Exception {
-				Stat stat = ((ZkConnection) _connection).getZookeeper().exists(
-						path, false);
-				return stat;
-			}
-		});
-
-		return stat;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <T extends Object> T readData(String path,
-			boolean returnNullIfPathNotExists) {
-		T data = null;
-		try {
-			data = (T) readData(path, null);
-		} catch (ZkNoNodeException e) {
-			if (!returnNullIfPathNotExists) {
-				throw e;
-			}
-		}
-		return data;
-	}
+    public ZkClient(IZkConnection connection, int connectionTimeout, ZkSerializer zkSerializer) {
+        super(connection, connectionTimeout, zkSerializer);
+    }
+
+    public ZkClient(IZkConnection connection, int connectionTimeout) {
+        super(connection, connectionTimeout);
+    }
+
+    public ZkClient(IZkConnection connection) {
+        super(connection);
+    }
+
+    public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
+        super(zkServers, sessionTimeout, connectionTimeout);
+    }
+
+    public ZkClient(String zkServers, int connectionTimeout) {
+        super(zkServers, connectionTimeout);
+    }
+
+    public ZkClient(String serverstring) {
+        super(serverstring);
+    }
+
+    public IZkConnection getConnection() {
+        return _connection;
+    }
+
+    public long getSessionId() {
+        return ((ZkConnection) _connection).getZookeeper().getSessionId();
+    }
+
+    public Stat getStat(final String path) {
+        Stat stat = retryUntilConnected(new Callable<Stat>() {
+
+            @Override
+            public Stat call() throws Exception {
+                Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false);
+                return stat;
+            }
+        });
+
+        return stat;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists) {
+        T data = null;
+        try {
+            data = (T) readData(path, null);
+        } catch (ZkNoNodeException e) {
+            if (!returnNullIfPathNotExists) {
+                throw e;
+            }
+        }
+        return data;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index fb18dd6..a326535 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -39,7 +39,7 @@ import com.google.inject.Inject;
 
 /**
  * UDP based emitter.
- *
+ * 
  */
 public class UDPEmitter implements Emitter, ClusterChangeListener {
     private DatagramSocket socket;
@@ -103,11 +103,11 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
         return true;
     }
 
-   // @Override
+    // @Override
     public int getPartitionCount() {
         return topology.getPhysicalCluster().getPartitionCount();
     }
-    
+
     @Override
     public void onChange() {
         refreshCluster();
@@ -131,8 +131,7 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
     }
 
     @Override
-    public int getPartitionCount(String stream)
-    {
-      return topology.getPhysicalCluster().getPartitionCount(stream);      
+    public int getPartitionCount(String stream) {
+        return topology.getPhysicalCluster().getPartitionCount(stream);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
index 050a6ec..da015d7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
@@ -25,7 +25,7 @@ import com.google.inject.assistedinject.Assisted;
 
 /**
  * UDP-based emitter for sending events to remote clusters.
- *
+ * 
  */
 public class UDPRemoteEmitter extends UDPEmitter {
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 97ec9af..1c07870 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -83,14 +83,14 @@ public class DefaultCoreModule extends AbstractModule {
         /* Use Kryo to serialize events. */
         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 
-        bind(StateModelFactory.class).annotatedWith(Names.named("s4.app.statemodelfactory")).to(AppStateModelFactory.class);
+        bind(StateModelFactory.class).annotatedWith(Names.named("s4.app.statemodelfactory")).to(
+                AppStateModelFactory.class);
 
         bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class);
-        
+
         bind(RemoteSendersManager.class).to(RemoteSendersManagerImpl.class);
-        
-        bind(RemoteStreamsManager.class).to(RemoteStreamsManagerImpl.class);
 
+        bind(RemoteStreamsManager.class).to(RemoteStreamsManagerImpl.class);
 
         bind(S4RLoaderFactory.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 810b8f4..9ae356b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -155,11 +155,12 @@ public class Main {
                 }
                 combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
             }
-            
+
             injector = Guice.createInjector(combinedModule);
-            //start a HelixController to manage the cluster
+            // start a HelixController to manage the cluster
             String controllerName = Inet4Address.getLocalHost().getCanonicalHostName() + UUID.randomUUID().toString();
-            HelixControllerMain.startHelixController(mainArgs.zkConnectionString, mainArgs.clusterName, controllerName, HelixControllerMain.STANDALONE);
+            HelixControllerMain.startHelixController(mainArgs.zkConnectionString, mainArgs.clusterName, controllerName,
+                    HelixControllerMain.STANDALONE);
 
             if (mainArgs.appClass != null) {
                 logger.info("Starting S4 node with single application from class [{}]", mainArgs.appClass);
@@ -190,7 +191,7 @@ public class Main {
 
         @Parameter(names = { "-c", "-cluster" }, description = "cluster name", required = true)
         String clusterName = null;
-        
+
         @Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
         String instanceName = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 6bc52e3..c8da41a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -24,7 +24,7 @@ import org.apache.s4.base.Hasher;
 
 /**
  * Sends events to a remote cluster.
- *
+ * 
  */
 public class RemoteSender {
 
@@ -41,7 +41,8 @@ public class RemoteSender {
     public void send(String hashKey, EventMessage eventMessage) {
         if (hashKey == null) {
             // round robin by default
-            emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount(eventMessage.getStreamName())), eventMessage);
+            emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount(eventMessage.getStreamName())),
+                    eventMessage);
         } else {
             int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(eventMessage.getStreamName()));
             emitter.send(partition, eventMessage);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index a43aa7d..e8ed8e0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -38,7 +38,7 @@ import com.google.inject.Inject;
 /**
  * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
  * the event.
- *
+ * 
  */
 public class RemoteSenders implements RemoteSendersManager {
 
@@ -61,7 +61,9 @@ public class RemoteSenders implements RemoteSendersManager {
 
     Map<String, RemoteSender> sendersByTopology = new HashMap<String, RemoteSender>();
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.core.RemoteSendersManager#send(java.lang.String, org.apache.s4.base.Event)
      */
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
index 3e0e462..0f1f9fa 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
@@ -2,9 +2,8 @@ package org.apache.s4.core;
 
 import org.apache.s4.base.Event;
 
-public interface RemoteSendersManager
-{
+public interface RemoteSendersManager {
 
-  public abstract void send(String hashKey, Event event);
+    public abstract void send(String hashKey, Event event);
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
index 13f8ac8..ed2ce4a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
@@ -2,13 +2,11 @@ package org.apache.s4.core;
 
 import org.apache.s4.base.Event;
 
-public class RemoteSendersManagerImpl implements RemoteSendersManager
-{
-
-  @Override
-  public void send(String hashKey, Event event)
-  {
-    
-  }
+public class RemoteSendersManagerImpl implements RemoteSendersManager {
+
+    @Override
+    public void send(String hashKey, Event event) {
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index 38f71a8..d47ee0b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -23,90 +23,69 @@ import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 
 @StateModelInfo(states = { "ONLINE,OFFLINE" }, initialState = "OFFLINE")
-public class AppStateModel extends StateModel
-{
-  private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
-  private final String appName;
-  private final Server server;
+public class AppStateModel extends StateModel {
+    private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
+    private final String appName;
+    private final Server server;
 
-  public AppStateModel(Server server, String appName)
-  {
-    this.server = server;
-    this.appName = appName;
-  }
+    public AppStateModel(Server server, String appName) {
+        this.server = server;
+        this.appName = appName;
+    }
 
-  @Transition(from = "OFFLINE", to = "ONLINE")
-  public void deploy(Message message, NotificationContext context) throws Exception
-  {
-    HelixManager manager = context.getManager();
-    ConfigAccessor configAccessor = manager.getConfigAccessor();
-    ConfigScopeBuilder builder = new ConfigScopeBuilder();
-    ConfigScope scope = builder.forCluster(manager.getClusterName()).forResource(appName).build();
-    String uriString = configAccessor.get(scope,
-        DistributedDeploymentManager.S4R_URI);
-    String clusterName = manager.getClusterName();
-    try
-    {
-      URI uri = new URI(uriString);
-      // fetch application
-      File localS4RFileCopy;
-      try
-      {
-        localS4RFileCopy = File.createTempFile("tmp", "s4r");
-      } catch (IOException e1)
-      {
-        logger
-            .error(
-                "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
-                appName, e1.getClass().getName() + "->" + e1.getMessage());
-        throw new DeploymentFailedException("Cannot deploy application ["
-            + appName + "]", e1);
-      }
-      localS4RFileCopy.deleteOnExit();
-      try
-      {
-        if (ByteStreams.copy(DistributedDeploymentManager.fetchS4App(uri),
-            Files.newOutputStreamSupplier(localS4RFileCopy)) == 0)
-        {
-          throw new DeploymentFailedException("Cannot copy archive from ["
-              + uri.toString() + "] to [" + localS4RFileCopy.getAbsolutePath()
-              + "] (nothing was copied)");
-        }
-      } catch (IOException e)
-      {
-        throw new DeploymentFailedException("Cannot deploy application ["
-            + appName + "] from URI [" + uri.toString() + "] ", e);
-      }
-      // install locally
-      App loaded = server.loadApp(localS4RFileCopy, appName);
-      if (loaded != null)
-      {
-        logger.info("Successfully installed application {}", appName);
-        // TODO sync with other nodes? (e.g. wait for other apps deployed before
-        // starting?
-        server.startApp(loaded, appName, clusterName);
-      } else
-      {
-        throw new DeploymentFailedException("Cannot deploy application ["
-            + appName + "] from URI [" + uri.toString()
-            + "] : cannot start application");
-      }
+    @Transition(from = "OFFLINE", to = "ONLINE")
+    public void deploy(Message message, NotificationContext context) throws Exception {
+        HelixManager manager = context.getManager();
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(manager.getClusterName()).forResource(appName).build();
+        String uriString = configAccessor.get(scope, DistributedDeploymentManager.S4R_URI);
+        String clusterName = manager.getClusterName();
+        try {
+            URI uri = new URI(uriString);
+            // fetch application
+            File localS4RFileCopy;
+            try {
+                localS4RFileCopy = File.createTempFile("tmp", "s4r");
+            } catch (IOException e1) {
+                logger.error(
+                        "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
+                        appName, e1.getClass().getName() + "->" + e1.getMessage());
+                throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e1);
+            }
+            localS4RFileCopy.deleteOnExit();
+            try {
+                if (ByteStreams.copy(DistributedDeploymentManager.fetchS4App(uri),
+                        Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
+                    throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
+                            + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
+                }
+            } catch (IOException e) {
+                throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
+                        + uri.toString() + "] ", e);
+            }
+            // install locally
+            App loaded = server.loadApp(localS4RFileCopy, appName);
+            if (loaded != null) {
+                logger.info("Successfully installed application {}", appName);
+                // TODO sync with other nodes? (e.g. wait for other apps deployed before
+                // starting?
+                server.startApp(loaded, appName, clusterName);
+            } else {
+                throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
+                        + uri.toString() + "] : cannot start application");
+            }
 
-    } catch (URISyntaxException e)
-    {
-      logger
-          .error(
-              "Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ",
-              new String[] { appName, uriString, e.getMessage() });
-      throw new DeploymentFailedException("Cannot deploy application ["
-          + appName + "]", e);
+        } catch (URISyntaxException e) {
+            logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
+                    appName, uriString, e.getMessage() });
+            throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e);
+        }
     }
-  }
 
-  @Transition(from = "OFFLINE", to = "ONLINE")
-  public void undeploy(Message message, NotificationContext context) throws Exception
-  {
-    //todo
-  }
+    @Transition(from = "OFFLINE", to = "ONLINE")
+    public void undeploy(Message message, NotificationContext context) throws Exception {
+        // todo
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
index 3af4531..ef6ae58 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
@@ -5,20 +5,20 @@ import org.apache.s4.core.Server;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+
 @Singleton
-public class AppStateModelFactory extends StateModelFactory<AppStateModel>
-{
-  private final Server server;
-  
-  @Inject
-  public AppStateModelFactory(Server server){
-    this.server = server;
-    
-  }
-  @Override
-  public AppStateModel createNewStateModel(String partitionName)
-  {
-    return new AppStateModel(server,partitionName);
-  }
+public class AppStateModelFactory extends StateModelFactory<AppStateModel> {
+    private final Server server;
+
+    @Inject
+    public AppStateModelFactory(Server server) {
+        this.server = server;
+
+    }
+
+    @Override
+    public AppStateModel createNewStateModel(String partitionName) {
+        return new AppStateModel(server, partitionName);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
index ed32085..5dcfa76 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
@@ -5,39 +5,24 @@ import org.apache.s4.core.Server;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
-public class HelixBasedDeploymentManager implements DeploymentManager
-{
-  private final Server server;
-  boolean deployed = false;
-  private final String clusterName;
+public class HelixBasedDeploymentManager implements DeploymentManager {
+    private final Server server;
+    boolean deployed = false;
+    private final String clusterName;
 
-  @Inject
-  public HelixBasedDeploymentManager(
-      @Named("s4.cluster.name") String clusterName,
-      @Named("s4.cluster.zk_address") String zookeeperAddress,
-      @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-      @Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
-      Server server)
-  {
-    this.clusterName = clusterName;
-    this.server = server;
+    @Inject
+    public HelixBasedDeploymentManager(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server) {
+        this.clusterName = clusterName;
+        this.server = server;
 
-  }
+    }
 
-  @Override
-  public void start()
-  {
-   /* File s4r = new File(
-        "/Users/kgopalak/Documents/projects/s4/incubator-s4-helix/myApp/build/libs/myApp.s4r");
-    String appName = "myApp";
-    try
-    {
-      App loaded = server.loadApp(s4r, "myApp");
-      server.startApp(loaded, appName, clusterName);
-    } catch (Exception e)
-    {
-      e.printStackTrace();
-    }*/
-  }
+    @Override
+    public void start() {
+        
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
index 5bab316..6dfc82a 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
@@ -38,7 +38,7 @@ public class AddNodes {
         Tools.parseArgs(clusterArgs, args);
         try {
 
-            logger.info("Adding new nodes [{}] to cluster [{}] node(s)",  clusterArgs.nbNodes, clusterArgs.clusterName);
+            logger.info("Adding new nodes [{}] to cluster [{}] node(s)", clusterArgs.nbNodes, clusterArgs.clusterName);
             HelixAdmin helixAdmin = new ZKHelixAdmin(clusterArgs.zkConnectionString);
             int initialPort = clusterArgs.firstListeningPort;
             if (clusterArgs.nbNodes > 0) {
@@ -74,14 +74,14 @@ public class AddNodes {
 
         @Parameter(names = "-nodes", description = "Host names of the nodes", required = false)
         String nodes = "";
-        
+
         @Parameter(names = "-zk", description = "Zookeeper connection string")
         String zkConnectionString = "localhost:2181";
 
         @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
         int firstListeningPort = -1;
-        
-        @Parameter(names = {"-ng","-nodeGroup"}, description = "Assign the nodes to one or more groups. This will be useful when you create task", required=false)
+
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Assign the nodes to one or more groups. This will be useful when you create task", required = false)
         String nodeGroup = "default";
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
index 38fbbe8..190c8c0 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
@@ -88,10 +88,10 @@ public class CreateCluster {
 
         @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
         int firstListeningPort = -1;
-        
+
         @Parameter(names = { "-ng", "-nodeGroup" }, description = "Name of the App", required = false, arity = 1)
         String nodeGroup = "default";
-        
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
index ff1cb88..83a7088 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
@@ -47,14 +47,14 @@ public class DeployApp extends S4ArgsBase {
         for (String instanceName : instancesInCluster) {
             InstanceConfig instanceConfig = admin.getInstanceConfig(deployArgs.clusterName, instanceName);
             String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
-            if(nodeGroup.equals(deployArgs.nodeGroup)){
+            if (nodeGroup.equals(deployArgs.nodeGroup)) {
                 instancesInGroup.add(instanceName);
             }
         }
-        for(String instanceName:instancesInGroup){
+        for (String instanceName : instancesInGroup) {
             is.setPartitionState(deployArgs.appName, instanceName, "ONLINE");
         }
-        
+
         admin.setResourceIdealState(deployArgs.clusterName, deployArgs.appName, is);
     }
 


Mime
View raw message