incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [2/2] git commit: Flag S4 tools specific to Helix with "-helix" option + fix status output
Date Thu, 21 Feb 2013 20:07:19 GMT
Updated Branches:
  refs/heads/S4-110-new 8af60c83e -> 0b93cae4b


Flag S4 tools specific to Helix with "-helix" option + fix status output


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/0b93cae4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/0b93cae4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/0b93cae4

Branch: refs/heads/S4-110-new
Commit: 0b93cae4b9dbc8b65f2c98c177c3c9af41194268
Parents: d341879
Author: Matthieu Morel <mmorel@apache.org>
Authored: Thu Feb 21 18:59:55 2013 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Thu Feb 21 21:05:56 2013 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/s4/core/BaseModule.java   |    8 +-
 .../main/java/org/apache/s4/core/RemoteSender.java |   16 +-
 .../src/main/java/org/apache/s4/core/S4Node.java   |    8 +-
 .../main/java/org/apache/s4/core/SenderImpl.java   |   55 +--
 .../core/moduleloader/ModuleLoaderTestUtils.java   |    2 +-
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |    3 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |    2 +-
 .../src/main/java/org/apache/s4/tools/Status.java  |  105 ++---
 .../main/java/org/apache/s4/tools/StatusUtils.java |   65 +++
 .../src/main/java/org/apache/s4/tools/Tools.java   |   35 +-
 .../java/org/apache/s4/tools/helix/AddNodes.java   |    7 +-
 .../org/apache/s4/tools/helix/ClusterStatus.java   |  346 +++------------
 .../org/apache/s4/tools/helix/CreateCluster.java   |    4 +-
 .../java/org/apache/s4/tools/helix/CreateTask.java |   11 +-
 .../java/org/apache/s4/tools/helix/DeployApp.java  |   17 +-
 .../apache/s4/tools/helix/GenericEventAdapter.java |   10 +-
 .../org/apache/s4/tools/helix/HelixS4ArgsBase.java |   16 +
 .../org/apache/s4/tools/helix/RebalanceTask.java   |    8 +-
 .../java/org/apache/s4/tools/helix/RemoveTask.java |   21 +-
 19 files changed, 274 insertions(+), 465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index 5715060..b92c87b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -36,13 +36,14 @@ public class BaseModule extends AbstractModule {
     InputStream baseConfigInputStream;
     String clusterName;
     private final String instanceName;
-    boolean useHelix = true;
+    boolean useHelix = false;
 
-    public BaseModule(InputStream baseConfigInputStream, String clusterName, String instanceName) {
+    public BaseModule(InputStream baseConfigInputStream, String clusterName, String instanceName, boolean useHelix) {
         super();
         this.baseConfigInputStream = baseConfigInputStream;
         this.clusterName = clusterName;
         this.instanceName = instanceName;
+        this.useHelix = useHelix;
     }
 
     @Override
@@ -53,8 +54,7 @@ public class BaseModule extends AbstractModule {
         // share the Zookeeper connection
         bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
         bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
-        String clusterManager = System.getenv("S4_CLUSTER_MANAGER");
-        if (config.getBoolean("s4.helix") || "HELIX".equalsIgnoreCase(clusterManager)) {
+        if (useHelix) {
             bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
             bind(Cluster.class).to(ClusterFromHelix.class).in(Scopes.SINGLETON);
             bind(TaskStateModelFactory.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 43d01c8..248a798 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -39,7 +39,7 @@ public class RemoteSender {
     final private Hasher hasher;
     AtomicInteger targetPartition = new AtomicInteger();
     final private String remoteClusterName;
-    private Cluster cluster;
+    private final Cluster cluster;
 
     public RemoteSender(Cluster cluster, Emitter emitter, Hasher hasher, String clusterName) {
         super();
@@ -50,8 +50,8 @@ public class RemoteSender {
 
     }
 
-    public void send(String streamName,String hashKey, ByteBuffer message) throws InterruptedException {
-        
+    public void send(String streamName, String hashKey, ByteBuffer message) throws InterruptedException {
+
         int partition;
         if (hashKey == null) {
             // round robin by default
@@ -59,11 +59,13 @@ public class RemoteSender {
         } else {
             partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(streamName));
         }
-        //TODO: where do we get the mode
-        
+        // TODO: where do we get the mode
+
         Destination destination = cluster.getDestination(streamName, partition, emitter.getType());
-        logger.info("Sending event to partition:"+ partition + " stream: "+streamName);
-        
+
+        // TODO log the name of the partition
+        logger.trace("Sending event to partition [{}] through stream [{}]", String.valueOf(partition), streamName);
+
         emitter.send(destination, message);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
index ab2be80..1293daa 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -48,8 +48,9 @@ public class S4Node {
             }
         });
 
-        Injector injector = Guice.createInjector(new Module[] { new BaseModule(Resources.getResource(
-                "default.s4.base.properties").openStream(), nodeArgs.clusterName, nodeArgs.instanceName) });
+        Injector injector = Guice
+                .createInjector(new Module[] { new BaseModule(Resources.getResource("default.s4.base.properties")
+                        .openStream(), nodeArgs.clusterName, nodeArgs.instanceName, nodeArgs.useHelix) });
         Bootstrap bootstrap = injector.getInstance(Bootstrap.class);
         try {
             bootstrap.start(injector);
@@ -76,5 +77,8 @@ public class S4Node {
 
         @Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
         String instanceName = null;
+
+        @Parameter(names = "-helix", description = "Required flag when using a Helix based cluster manager", required = false, arity = 0)
+        boolean useHelix = false;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 6923f98..d81146d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -39,14 +39,12 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.Inject;
 
 /**
- * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top
- * level classes of the communication layer.
+ * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top level classes of the communication layer.
  * <p>
- * {@link SenderImpl} is responsible for sending an event to a
- * {@link ProcessingElement} instance using a hashKey.
+ * {@link SenderImpl} is responsible for sending an event to a {@link ProcessingElement} instance using a hashKey.
  * <p>
- * Details on how the cluster is partitioned and how events are serialized and
- * transmitted to its destination are hidden from the application developer.
+ * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
+ * from the application developer.
  */
 public class SenderImpl implements Sender {
 
@@ -76,10 +74,8 @@ public class SenderImpl implements Sender {
      *            a hashing function to map keys to partition IDs.
      */
     @Inject
-    public SenderImpl(Emitter emitter, SerializerDeserializer serDeser,
-            Hasher hasher, Assignment assignment,
-            SenderExecutorServiceFactory senderExecutorServiceFactory,
-            Cluster cluster) {
+    public SenderImpl(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
+            SenderExecutorServiceFactory senderExecutorServiceFactory, Cluster cluster) {
         this.emitter = emitter;
         this.serDeser = serDeser;
         this.hasher = hasher;
@@ -99,15 +95,12 @@ public class SenderImpl implements Sender {
     /*
      * (non-Javadoc)
      * 
-     * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String,
-     * org.apache.s4.base.Event)
+     * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String, org.apache.s4.base.Event)
      */
     @Override
     public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
-        int partition = (int) (hasher.hash(hashKey) % emitter
-                .getPartitionCount(event.getStreamName()));
-        Destination destination = cluster.getDestination(event.getStreamName(),
-                partition, emitter.getType());
+        int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(event.getStreamName()));
+        Destination destination = cluster.getDestination(event.getStreamName(), partition, emitter.getType());
         if (isDestinationLocal(destination)) {
             metrics.sentLocal();
             /* Hey we are in the same JVM, don't use the network. */
@@ -119,11 +112,9 @@ public class SenderImpl implements Sender {
     }
 
     private boolean isDestinationLocal(Destination destination) {
-         if (emitter.getType().equals("tcp")) {
+        if (emitter.getType().equals("tcp")) {
             TCPDestination tcpDestination = ((TCPDestination) destination);
-            if (localNode != null
-                    && localNode.getMachineName().equalsIgnoreCase(
-                            tcpDestination.getMachineName())
+            if (localNode != null && tcpDestination.getMachineName().equalsIgnoreCase(localNode.getMachineName())
                     && localNode.getPort() == tcpDestination.getPort()) {
                 return true;
             }
@@ -139,9 +130,7 @@ public class SenderImpl implements Sender {
     /*
      * (non-Javadoc)
      * 
-     * @see
-     * org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event
-     * )
+     * @see org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event )
      */
     @Override
     public void sendToAllRemotePartitions(Event event) {
@@ -153,8 +142,7 @@ public class SenderImpl implements Sender {
         Event event;
         int remotePartitionId;
 
-        public SerializeAndSendToRemotePartitionTask(Event event,
-                int remotePartitionId) {
+        public SerializeAndSendToRemotePartitionTask(Event event, int remotePartitionId) {
             this.event = event;
             this.remotePartitionId = remotePartitionId;
         }
@@ -164,14 +152,11 @@ public class SenderImpl implements Sender {
             ByteBuffer serializedEvent = serDeser.serialize(event);
             try {
                 // TODO: where can we get the type ?
-                Destination destination = cluster.getDestination(
-                        event.getStreamName(), remotePartitionId,
+                Destination destination = cluster.getDestination(event.getStreamName(), remotePartitionId,
                         emitter.getType());
                 emitter.send(destination, serializedEvent);
             } catch (InterruptedException e) {
-                logger.error(
-                        "Interrupted blocking send operation for event {}. Event is lost.",
-                        event);
+                logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
                 Thread.currentThread().interrupt();
             }
 
@@ -191,23 +176,19 @@ public class SenderImpl implements Sender {
         @Override
         public void run() {
             ByteBuffer serializedEvent = serDeser.serialize(event);
-            Integer partitionCount = cluster.getPartitionCount(event
-                    .getStreamName());
+            Integer partitionCount = cluster.getPartitionCount(event.getStreamName());
             for (int i = 0; i < partitionCount; i++) {
 
                 /* Don't use the comm layer when we send to the same partition. */
                 try {
                     // TODO: where to get the mode from
-                    Destination destination = cluster.getDestination(
-                            event.getStreamName(), i, "tcp");
+                    Destination destination = cluster.getDestination(event.getStreamName(), i, "tcp");
                     if (!isDestinationLocal(destination)) {
                         emitter.send(destination, serializedEvent);
                         metrics.sentEvent(i);
                     }
                 } catch (InterruptedException e) {
-                    logger.error(
-                            "Interrupted blocking send operation for event {}. Event is lost.",
-                            event);
+                    logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
                     // no reason to continue: we were interrupted, so we reset
                     // the interrupt status and leave
                     Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index aedac39..c0e5bb7 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@ -69,7 +69,7 @@ public class ModuleLoaderTestUtils {
         }
 
         Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
-                .openStream(), "cluster1", null),
+                .openStream(), "cluster1", null, false),
                 new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()));
 
         Emitter emitter = injector.getInstance(TCPEmitter.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 29bdc0a..19f1a26 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -115,7 +115,8 @@ public class CoreTestUtils extends CommTestUtils {
 
     public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
         return Guice.createInjector(Modules.override(
-                new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null),
+                new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null,
+                        false),
                 new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
                 new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
                 new NonFailFastZookeeperClientsModule()));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 8d2b58c..69db25b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -60,7 +60,7 @@ public class WordCountTest extends ZkBasedTest {
 
     public void createEmitter() throws IOException {
         injector = Guice.createInjector(new BaseModule(
-                Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null),
+                Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null, false),
                 new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
                 new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
index 09ae37d..244b187 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
@@ -19,7 +19,6 @@
 package org.apache.s4.tools;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -41,8 +40,6 @@ import com.google.common.collect.Maps;
 public class Status extends S4ArgsBase {
     static Logger logger = LoggerFactory.getLogger(Status.class);
 
-    private static String NONE = "--";
-
     public static void main(String[] args) {
 
         StatusArgs statusArgs = new StatusArgs();
@@ -146,33 +143,33 @@ public class Status extends S4ArgsBase {
 
     private static void showAppsStatus(List<Cluster> clusters) {
         System.out.println("App Status");
-        System.out.println(generateEdge(130));
-        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
-        System.out.println(generateEdge(130));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle("Name", 20), StatusUtils.inMiddle("Cluster", 20), StatusUtils.inMiddle("URI", 90));
+        System.out.println(StatusUtils.generateEdge(130));
         for (Cluster cluster : clusters) {
-            if (!NONE.equals(cluster.app.name)) {
-                System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
-                        inMiddle(cluster.app.cluster, 20), cluster.app.uri);
+            if (!StatusUtils.NONE.equals(cluster.app.name)) {
+                System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle(cluster.app.name, 20),
+                        StatusUtils.inMiddle(cluster.app.cluster, 20), cluster.app.uri);
             }
         }
-        System.out.println(generateEdge(130));
+        System.out.println(StatusUtils.generateEdge(130));
 
     }
 
     private static void showClustersStatus(List<Cluster> clusters) {
         System.out.println("Cluster Status");
-        System.out.println(generateEdge(130));
-        System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
-        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
-                generateEdge(80));
-        System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
-                inMiddle("Host", 50), inMiddle("Port", 8));
-        System.out.println(generateEdge(130));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.format("%-50s%-80s%n", " ", StatusUtils.inMiddle("Active nodes", 80));
+        System.out.format("%-20s%-20s%-10s%s%n", StatusUtils.inMiddle("Name", 20), StatusUtils.inMiddle("App", 20), StatusUtils.inMiddle("Tasks", 10),
+                StatusUtils.generateEdge(80));
+        System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", StatusUtils.inMiddle("Number", 8), StatusUtils.inMiddle("Task id", 10),
+                StatusUtils.inMiddle("Host", 50), StatusUtils.inMiddle("Port", 8));
+        System.out.println(StatusUtils.generateEdge(130));
 
         for (Cluster cluster : clusters) {
-            System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
-                    inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
-                    inMiddle("" + cluster.nodes.size(), 8));
+            System.out.format("%-20s%-20s%-10s%-10s", StatusUtils.inMiddle(cluster.clusterName, 20),
+                    StatusUtils.inMiddle(cluster.app.name, 20), StatusUtils.inMiddle("" + cluster.taskNumber, 8),
+                    StatusUtils.inMiddle("" + cluster.nodes.size(), 8));
             boolean first = true;
             for (ClusterNode node : cluster.nodes) {
                 if (first) {
@@ -180,68 +177,28 @@ public class Status extends S4ArgsBase {
                 } else {
                     System.out.format("%n%-60s", " ");
                 }
-                System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
-                        inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
+                System.out.format("%-10s%-50s%-10s", StatusUtils.inMiddle("" + node.getTaskId(), 10),
+                        StatusUtils.inMiddle(node.getMachineName(), 50), StatusUtils.inMiddle(node.getPort() + "", 10));
             }
             System.out.println();
         }
-        System.out.println(generateEdge(130));
+        System.out.println(StatusUtils.generateEdge(130));
     }
 
     private static void showStreamsStatus(List<Stream> streams) {
         System.out.println("Stream Status");
-        System.out.println(generateEdge(130));
-        System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
-                inMiddle("Consumers", 55));
-        System.out.println(generateEdge(130));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.format("%-20s%-55s%-55s%n", StatusUtils.inMiddle("Name", 20), StatusUtils.inMiddle("Producers", 55),
+                StatusUtils.inMiddle("Consumers", 55));
+        System.out.println(StatusUtils.generateEdge(130));
 
         for (Stream stream : streams) {
-            System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
-                    inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
-                    inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
-        }
-        System.out.println(generateEdge(130));
-
-    }
-
-    private static String inMiddle(String content, int width) {
-        int i = (width - content.length()) / 2;
-        return String.format("%" + i + "s%s", " ", content);
-    }
-
-    private static String generateEdge(int length) {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < length; i++) {
-            sb.append("-");
+            System.out.format("%-20s%-55s%-55s%n", StatusUtils.inMiddle(stream.streamName, 20),
+                    StatusUtils.inMiddle(StatusUtils.getFormatString(stream.producers, stream.clusterAppMap), 55),
+                    StatusUtils.inMiddle(StatusUtils.getFormatString(stream.consumers, stream.clusterAppMap), 55));
         }
-        return sb.toString();
-    }
+        System.out.println(StatusUtils.generateEdge(130));
 
-    /**
-     * show as cluster1(app1), cluster2(app2)
-     * 
-     * @param clusters
-     *            cluster list
-     * @param clusterAppMap
-     *            <cluster,app>
-     * @return
-     */
-    private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
-        if (clusters == null || clusters.size() == 0) {
-            return NONE;
-        } else {
-            // show as: cluster1(app1), cluster2(app2)
-            StringBuilder sb = new StringBuilder();
-            for (String cluster : clusters) {
-                String app = clusterAppMap.get(cluster);
-                sb.append(cluster);
-                if (!NONE.equals(app)) {
-                    sb.append("(").append(app).append(")");
-                }
-                sb.append(" ");
-            }
-            return sb.toString();
-        }
     }
 
     static class Stream {
@@ -302,14 +259,14 @@ public class Status extends S4ArgsBase {
                         + "/app/s4App"));
                 return appConfig.getAppName();
             }
-            return NONE;
+            return StatusUtils.NONE;
         }
     }
 
     static class App {
-        private String name = NONE;
+        private String name = StatusUtils.NONE;
         private String cluster;
-        private String uri = NONE;
+        private String uri = StatusUtils.NONE;
     }
 
     static class Cluster {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/StatusUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/StatusUtils.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/StatusUtils.java
new file mode 100644
index 0000000..ebfda15
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/StatusUtils.java
@@ -0,0 +1,65 @@
+package org.apache.s4.tools;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.Strings;
+
+/**
+ * Some utility classes for formatting the output of Status tools
+ * 
+ */
+public class StatusUtils {
+
+    /**
+     * show as cluster1(app1), cluster2(app2)
+     * 
+     * @param clusters
+     *            cluster list
+     * @param clusterAppMap
+     *            <cluster,app>
+     * @return
+     */
+    public static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
+        if (clusters == null || clusters.size() == 0) {
+            return StatusUtils.NONE;
+        } else {
+            // show as: cluster1(app1), cluster2(app2)
+            StringBuilder sb = new StringBuilder();
+            for (String cluster : clusters) {
+                String app = clusterAppMap.get(cluster);
+                sb.append(cluster);
+                if (!StatusUtils.NONE.equals(app)) {
+                    sb.append("(").append(app).append(")");
+                }
+                sb.append(" ");
+            }
+            return sb.toString();
+        }
+    }
+
+    public static String title(String content, char highlighter, int width) {
+        return Strings.repeat(String.valueOf(highlighter), ((width - content.length()) / 2)) + content
+                + Strings.repeat(String.valueOf(highlighter), ((width - content.length()) / 2));
+    }
+
+    public static String noInfo(String content) {
+        return StatusUtils.inMiddle("---- " + content + " ----", 130) + "\n\n";
+    }
+
+    public static String inMiddle(String content, int width) {
+        int i = (width - content.length()) / 2;
+        return String.format("%" + i + "s%s", " ", content);
+    }
+
+    public static String generateEdge(int length) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            sb.append("-");
+        }
+        return sb.toString();
+    }
+
+    static String NONE = "--";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 7bbaeb2..b22e164 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -22,18 +22,17 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.Level;
 import org.apache.s4.core.S4Node;
 import org.apache.s4.tools.helix.AddNodes;
+import org.apache.s4.tools.helix.ClusterStatus;
 import org.apache.s4.tools.helix.CreateCluster;
 import org.apache.s4.tools.helix.CreateTask;
 import org.apache.s4.tools.helix.DeployApp;
 import org.apache.s4.tools.helix.GenericEventAdapter;
 import org.apache.s4.tools.helix.RebalanceTask;
-import org.apache.s4.tools.helix.ClusterStatus;
 import org.apache.s4.tools.helix.RemoveTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,14 +61,15 @@ public class Tools {
         createTask(null,CreateTask.class), 
         removeTask(null,RemoveTask.class), 
         rebalanceTask(null,RebalanceTask.class);
-      //formatter:on
-        
-        Class<?> zkTarget;
-        private Class<?> helixTarget;
-        
+        //formatter:on
+
+        private final Class<?> zkTarget;
+        private final Class<?> helixTarget;
+
         Task(Class<?> target) {
-            this(target,target); 
+            this(target, target);
         }
+
         Task(Class<?> zkTarget, Class<?> helixTarget) {
             this.zkTarget = zkTarget;
             this.helixTarget = helixTarget;
@@ -77,12 +77,20 @@ public class Tools {
 
         public void dispatch(String[] args) {
             try {
-                String clusterManager = System.getenv("S4_CLUSTER_MANAGER");
                 Class<?> target = zkTarget;
-                if("HELIX".equalsIgnoreCase(clusterManager)){
-                    target= helixTarget;
+                if (Sets.newHashSet(args).contains("-helix")) {
+                    target = helixTarget;
+                    if (target == null) {
+                        logger.error("{} is not a Helix related task", this.name());
+                        System.exit(1);
+                    }
+                } else {
+                    if (target == null && helixTarget != null) {
+                        logger.error("{} is a Helix related task, please specify -helix", this.name());
+                        System.exit(1);
+                    }
                 }
-                
+
                 Method main = target.getMethod("main", String[].class);
                 main.invoke(null, new Object[] { args });
 
@@ -91,9 +99,8 @@ public class Tools {
                 logger.error("Cannot dispatch to task [{}]: wrong arguments [{}]", this.name(), Arrays.toString(args));
             }
         }
-
     }
-   
+
     public static void main(String[] args) {
 
         // configure log4j for Zookeeper

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
index 136a4aa..3a598f7 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
@@ -21,8 +21,6 @@ package org.apache.s4.tools.helix;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.tools.S4ArgsBase;
 import org.apache.s4.tools.Tools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,11 +44,11 @@ public class AddNodes {
             if (clusterArgs.nbNodes > 0) {
                 String[] split = clusterArgs.nodes.split(",");
                 for (int i = 0; i < clusterArgs.nbNodes; i++) {
-                    InstanceConfig instanceConfig = new InstanceConfig("localhost_" + initialPort);
                     String host = "localhost";
                     if (split.length > 0 && split.length == clusterArgs.nbNodes) {
                         host = split[i].trim();
                     }
+                    InstanceConfig instanceConfig = new InstanceConfig("node_" + host + "_" + initialPort);
                     instanceConfig.setHostName(host);
                     instanceConfig.setPort("" + initialPort);
                     instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
@@ -66,7 +64,7 @@ public class AddNodes {
     }
 
     @Parameters(commandNames = "s4 addNodes", separators = "=", commandDescription = "Setup new S4 logical cluster")
-    static class ZKServerArgs extends S4ArgsBase {
+    static class ZKServerArgs extends HelixS4ArgsBase {
 
         @Parameter(names = { "-c", "-cluster" }, description = "S4 cluster name", required = true)
         String clusterName = "s4-test-cluster";
@@ -85,6 +83,7 @@ public class AddNodes {
 
         @Parameter(names = { "-ng", "-nodeGroup" }, description = "Assign the nodes to one or more groups. This will be useful when you create task", required = false)
         String nodeGroup = "default";
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
index 47ca98d..73d05c2 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
@@ -19,50 +19,34 @@
 package org.apache.s4.tools.helix;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.ConfigScope;
 import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.util.AppConfig;
-import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.StatusUtils;
 import org.apache.s4.tools.Tools;
-import org.apache.s4.tools.S4ArgsBase.GradleOptsConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
-import com.google.common.collect.Maps;
 
 public class ClusterStatus extends S4ArgsBase {
     static Logger logger = LoggerFactory.getLogger(ClusterStatus.class);
 
-    private static String NONE = "--";
-
     public static void main(String[] args) {
 
         StatusArgs statusArgs = new StatusArgs();
@@ -95,6 +79,10 @@ public class ClusterStatus extends S4ArgsBase {
                         statusArgs.apps = apps;
                         statusArgs.streams = tasks;
                     }
+
+                    System.out.println();
+                    System.out.println(StatusUtils.title(" App Status ", '*', 130));
+
                     for (String app : statusArgs.apps) {
                         if (resourcesInCluster.contains(app)) {
                             printAppInfo(manager, cluster, app);
@@ -124,23 +112,33 @@ public class ClusterStatus extends S4ArgsBase {
         String streamName = configAccessor.get(scope, "streamName");
         String taskType = configAccessor.get(scope, "taskType");
 
-        System.out.println("Task Status");
-        System.out.println(generateEdge(130));
-        System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20), inMiddle("Cluster", 20),
-                inMiddle("Description", 90));
-        System.out.println(generateEdge(130));
-        System.out.format("%-20s%-40s%-90s%n", inMiddle(taskId, 40), inMiddle(cluster, 20),
-                inMiddle(streamName + " " + taskType, 90));
-        System.out.println(generateEdge(130));
+        System.out.println();
+        System.out.println(StatusUtils.title(" Task Status ", '*', 130));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle("Task Id", 20),
+                StatusUtils.inMiddle("Cluster", 20), StatusUtils.inMiddle("Description", 90));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle(taskId, 20), StatusUtils.inMiddle(cluster, 20),
+                StatusUtils.inMiddle(streamName + " " + ((taskType == null) ? "(untyped)" : taskType), 90));
+        System.out.println(StatusUtils.generateEdge(130));
         HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
         Builder keyBuilder = helixDataAccessor.keyBuilder();
         IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(taskId));
         ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(taskId));
+        if (view == null) {
+            System.out.println(StatusUtils.noInfo("INFORMATION NOT AVAILABLE FOR TASK [" + taskId + "]"));
+            return;
+        }
         List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
-        System.out.format("%-50s%-100s%n", inMiddle("Partition", 50), inMiddle("State", 20));
-        System.out.println(generateEdge(130));
+        System.out.format("%-30s%-100s%n", StatusUtils.inMiddle("Partition", 30), StatusUtils.inMiddle("State", 100));
+        System.out.println(StatusUtils.generateEdge(130));
         for (String partition : assignment.getPartitionSet()) {
             Map<String, String> stateMap = view.getStateMap(partition);
+            if (stateMap == null) {
+                System.out.println(StatusUtils.noInfo("INFORMATION NOT AVAILABLE FOR TASK [" + taskId
+                        + "] / PARTITION [" + partition + "]"));
+                return;
+            }
             StringBuilder sb = new StringBuilder();
             String delim = "";
             for (String instance : stateMap.keySet()) {
@@ -153,9 +151,12 @@ public class ClusterStatus extends S4ArgsBase {
                 }
                 delim = ", ";
             }
-            System.out.format("%-50s%-10s%n", inMiddle(partition, 50), inMiddle(sb.toString(), 100));
+            System.out.format("%-50s%-10s%n", StatusUtils.inMiddle(partition, 50),
+                    StatusUtils.inMiddle(sb.toString(), 100));
         }
-        System.out.println(generateEdge(130));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.println("\n\n");
+
     }
 
     private static void printAppInfo(HelixManager manager, String cluster, String app) {
@@ -164,28 +165,38 @@ public class ClusterStatus extends S4ArgsBase {
         ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
         String uri = configAccessor.get(scope, AppConfig.APP_URI);
 
-        System.out.println("App Status");
-        System.out.println(generateEdge(130));
-        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
-        System.out.println(generateEdge(130));
-        System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20), inMiddle(cluster, 20), inMiddle(uri, 90));
-        System.out.println(generateEdge(130));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle("Name", 20), StatusUtils.inMiddle("Cluster", 20),
+                StatusUtils.inMiddle("URI", 90));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle(app, 20), StatusUtils.inMiddle(cluster, 20),
+                StatusUtils.inMiddle(uri, 90));
+        System.out.println(StatusUtils.generateEdge(130));
         HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
         Builder keyBuilder = helixDataAccessor.keyBuilder();
         IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(app));
         ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(app));
+        if (view == null) {
+            System.out.println(StatusUtils.noInfo("INFORMATION NOT AVAILABLE FOR APP [" + app + "]"));
+            return;
+        }
         List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
         Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
         Map<String, String> appStateMap = view.getStateMap(app);
-        System.out.format("%-50s%-20s%n", inMiddle("Node id", 50), inMiddle("DEPLOYED", 20));
-        System.out.println(generateEdge(130));
+        if (appStateMap == null) {
+            System.out.println(StatusUtils.noInfo("INFORMATION NOT AVAILABLE FOR APP [" + app + "]"));
+            return;
+        }
+        System.out.format("%-50s%-20s%n", StatusUtils.inMiddle("Node id", 50), StatusUtils.inMiddle("DEPLOYED", 20));
+        System.out.println(StatusUtils.generateEdge(130));
         for (String instance : assignmentMap.keySet()) {
             String state = appStateMap.get(instance);
-            System.out.format("%-50s%-10s%n", inMiddle(instance, 50),
-                    inMiddle((("ONLINE".equals(state) && liveInstances.contains(instance)) ? "Y" : "N"), 20));
+            System.out.format("%-50s%-10s%n", StatusUtils.inMiddle(instance, 50), StatusUtils.inMiddle(
+                    (("ONLINE".equals(state) && liveInstances.contains(instance)) ? "Y" : "N"), 20));
         }
 
-        System.out.println(generateEdge(130));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.println("\n\n");
 
     }
 
@@ -197,17 +208,18 @@ public class ClusterStatus extends S4ArgsBase {
         if (liveInstances == null) {
             liveInstances = Collections.emptyList();
         }
-        System.out.println("Cluster Status");
-        System.out.println(generateEdge(130));
-        System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
-        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20), inMiddle("Nodes", 20),
-                inMiddle("Active", 10), generateEdge(80));
-        System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", inMiddle("Node id", 10), inMiddle("Host", 50),
-                inMiddle("Port", 8), inMiddle("Active", 10));
-        System.out.println(generateEdge(130));
-
-        System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20), inMiddle("" + instances.size(), 8),
-                inMiddle("" + liveInstances.size(), 8));
+        System.out.println();
+        System.out.println(StatusUtils.title(" Cluster Status ", '*', 130));
+        System.out.println(StatusUtils.generateEdge(130));
+        System.out.format("%-50s%-80s%n", " ", StatusUtils.inMiddle("Nodes", 80));
+        System.out.format("%-20s%-20s%-10s%s%n", StatusUtils.inMiddle("Cluster Name", 20),
+                StatusUtils.inMiddle("Nodes", 20), StatusUtils.inMiddle("Active", 10), StatusUtils.generateEdge(80));
+        System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", StatusUtils.inMiddle("Node id", 10),
+                StatusUtils.inMiddle("Host", 50), StatusUtils.inMiddle("Port", 8), StatusUtils.inMiddle("Active", 10));
+        System.out.println(StatusUtils.generateEdge(130));
+
+        System.out.format("%-20s%-20s%-10s", StatusUtils.inMiddle(cluster, 20),
+                StatusUtils.inMiddle("" + instances.size(), 8), StatusUtils.inMiddle("" + liveInstances.size(), 8));
         boolean first = true;
 
         for (String instance : instances) {
@@ -218,16 +230,16 @@ public class ClusterStatus extends S4ArgsBase {
             } else {
                 System.out.format("%n%-50s", " ");
             }
-            System.out.format("%-10s%-46s%-10s%-10s", inMiddle("" + config.getId(), 10),
-                    inMiddle(config.getHostName(), 50), inMiddle(config.getPort() + "", 10),
-                    inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
+            System.out.format("%-10s%-46s%-10s%-10s", StatusUtils.inMiddle("" + config.getId(), 10),
+                    StatusUtils.inMiddle(config.getHostName(), 50), StatusUtils.inMiddle(config.getPort() + "", 10),
+                    StatusUtils.inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
         }
 
-        System.out.println();
+        System.out.println("\n\n");
     }
 
     @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
-    static class StatusArgs extends S4ArgsBase {
+    static class StatusArgs extends HelixS4ArgsBase {
 
         @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
         List<String> apps;
@@ -245,226 +257,4 @@ public class ClusterStatus extends S4ArgsBase {
         int timeout = 10000;
     }
 
-    private static void showAppsStatus(List<Cluster> clusters) {
-        System.out.println("App Status");
-        System.out.println(generateEdge(130));
-        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
-        System.out.println(generateEdge(130));
-        for (Cluster cluster : clusters) {
-            if (!NONE.equals(cluster.app.name)) {
-                System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
-                        inMiddle(cluster.app.cluster, 20), cluster.app.uri);
-            }
-        }
-        System.out.println(generateEdge(130));
-
-    }
-
-    private static void showClustersStatus(List<Cluster> clusters) {
-        System.out.println("Cluster Status");
-        System.out.println(generateEdge(130));
-        System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
-        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
-                generateEdge(80));
-        System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
-                inMiddle("Host", 50), inMiddle("Port", 8));
-        System.out.println(generateEdge(130));
-
-        for (Cluster cluster : clusters) {
-            System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
-                    inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
-                    inMiddle("" + cluster.nodes.size(), 8));
-            boolean first = true;
-            for (ClusterNode node : cluster.nodes) {
-                if (first) {
-                    first = false;
-                } else {
-                    System.out.format("%n%-60s", " ");
-                }
-                System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
-                        inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
-            }
-            System.out.println();
-        }
-        System.out.println(generateEdge(130));
-    }
-
-    private static void showStreamsStatus(List<Stream> streams) {
-        System.out.println("Stream Status");
-        System.out.println(generateEdge(130));
-        System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
-                inMiddle("Consumers", 55));
-        System.out.println(generateEdge(130));
-
-        for (Stream stream : streams) {
-            System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
-                    inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
-                    inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
-        }
-        System.out.println(generateEdge(130));
-
-    }
-
-    private static String inMiddle(String content, int width) {
-        int i = (width - content.length()) / 2;
-        return String.format("%" + i + "s%s", " ", content);
-    }
-
-    private static String generateEdge(int length) {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < length; i++) {
-            sb.append("-");
-        }
-        return sb.toString();
-    }
-
-    /**
-     * show as cluster1(app1), cluster2(app2)
-     * 
-     * @param clusters
-     *            cluster list
-     * @param clusterAppMap
-     *            <cluster,app>
-     * @return
-     */
-    private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
-        if (clusters == null || clusters.size() == 0) {
-            return NONE;
-        } else {
-            // show as: cluster1(app1), cluster2(app2)
-            StringBuilder sb = new StringBuilder();
-            for (String cluster : clusters) {
-                String app = clusterAppMap.get(cluster);
-                sb.append(cluster);
-                if (!NONE.equals(app)) {
-                    sb.append("(").append(app).append(")");
-                }
-                sb.append(" ");
-            }
-            return sb.toString();
-        }
-    }
-
-    static class Stream {
-
-        private final ZkClient zkClient;
-        private final String consumerPath;
-        private final String producerPath;
-
-        String streamName;
-        Set<String> producers = new HashSet<String>();// cluster name
-        Set<String> consumers = new HashSet<String>();// cluster name
-
-        Map<String, String> clusterAppMap = Maps.newHashMap();
-
-        public Stream(String streamName, ZkClient zkClient) throws Exception {
-            this.streamName = streamName;
-            this.zkClient = zkClient;
-            this.consumerPath = "/s4/streams/" + streamName + "/consumers";
-            this.producerPath = "/s4/streams/" + streamName + "/producers";
-            readStreamFromZk();
-        }
-
-        private void readStreamFromZk() throws Exception {
-            List<String> consumerNodes = zkClient.getChildren(consumerPath);
-            for (String node : consumerNodes) {
-                ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
-                consumers.add(consumer.getSimpleField("clusterName"));
-            }
-
-            List<String> producerNodes = zkClient.getChildren(producerPath);
-            for (String node : producerNodes) {
-                ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
-                producers.add(consumer.getSimpleField("clusterName"));
-            }
-
-            getAppNames();
-        }
-
-        private void getAppNames() {
-            Set<String> clusters = new HashSet<String>(consumers);
-            clusters.addAll(producers);
-            for (String cluster : clusters) {
-                clusterAppMap.put(cluster, getApp(cluster, zkClient));
-            }
-        }
-
-        public boolean containsCluster(String cluster) {
-            if (producers.contains(cluster) || consumers.contains(cluster)) {
-                return true;
-            }
-            return false;
-        }
-
-        private static String getApp(String clusterName, ZkClient zkClient) {
-            String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
-            if (zkClient.exists(appPath)) {
-                ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
-                return appRecord.getSimpleField("name");
-            }
-            return NONE;
-        }
-    }
-
-    static class App {
-        private String name = NONE;
-        private String cluster;
-        private String uri = NONE;
-    }
-
-    static class Cluster {
-        private final ZkClient zkClient;
-        private final String taskPath;
-        private final String processPath;
-        private final String appPath;
-
-        String clusterName;
-        int taskNumber;
-        App app;
-
-        List<ClusterNode> nodes = new ArrayList<ClusterNode>();
-
-        public Cluster(String clusterName, ZkClient zkClient) throws Exception {
-            this.clusterName = clusterName;
-            this.zkClient = zkClient;
-            this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
-            this.processPath = "/s4/clusters/" + clusterName + "/process";
-            this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
-            readClusterFromZk();
-        }
-
-        public void readClusterFromZk() throws Exception {
-            List<String> processes;
-            List<String> tasks;
-
-            tasks = zkClient.getChildren(taskPath);
-            processes = zkClient.getChildren(processPath);
-
-            taskNumber = tasks.size();
-
-            for (int i = 0; i < processes.size(); i++) {
-                ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
-                if (process != null) {
-                    int partition = Integer.parseInt(process.getSimpleField("partition"));
-                    String host = process.getSimpleField("host");
-                    int port = Integer.parseInt(process.getSimpleField("port"));
-                    String taskId = process.getSimpleField("taskId");
-                    ClusterNode node = new ClusterNode(partition, port, host, taskId);
-                    nodes.add(node);
-                }
-            }
-
-            app = new App();
-            app.cluster = clusterName;
-            try {
-                ZNRecord appRecord = zkClient.readData(appPath);
-                app.name = appRecord.getSimpleField("name");
-                app.uri = appRecord.getSimpleField("s4r_uri");
-            } catch (ZkNoNodeException e) {
-                logger.warn(appPath + " doesn't exist");
-            }
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
index 89361a3..f71d221 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
@@ -22,8 +22,6 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.tools.S4ArgsBase;
 import org.apache.s4.tools.Tools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,7 +72,7 @@ public class CreateCluster {
     }
 
     @Parameters(commandNames = "s4 newCluster", separators = "=", commandDescription = "Setup new S4 logical cluster")
-    static class ZKServerArgs extends S4ArgsBase {
+    static class ZKServerArgs extends HelixS4ArgsBase {
 
         @Parameter(names = { "-c", "-cluster" }, description = "S4 cluster name", required = true)
         String clusterName = "s4-test-cluster";

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
index d6074da..428cddc 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
@@ -1,7 +1,6 @@
 package org.apache.s4.tools.helix;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -10,9 +9,8 @@ import org.apache.helix.ConfigScope;
 import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.s4.tools.S4ArgsBase;
 import org.apache.s4.tools.Tools;
 import org.slf4j.Logger;
@@ -29,8 +27,7 @@ public class CreateTask extends S4ArgsBase {
         CreateTaskArgs taskArgs = new CreateTaskArgs();
 
         Tools.parseArgs(taskArgs, args);
-        String msg = String.format(
-                "Setting up new pe [{}] for stream(s) on nodes belonging to node group {}",
+        String msg = String.format("Setting up new pe [{}] for stream(s) on nodes belonging to node group {}",
                 taskArgs.taskId, taskArgs.streamName, taskArgs.nodeGroup);
         logger.info(msg);
         HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
@@ -56,12 +53,12 @@ public class CreateTask extends S4ArgsBase {
                 instancesInGroup.add(instanceName);
             }
         }
-        admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1,instancesInGroup);
+        admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1, instancesInGroup);
         logger.info("Finished setting up task:" + taskArgs.taskId + " on nodes " + instancesInGroup);
     }
 
     @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
-    static class CreateTaskArgs extends S4ArgsBase {
+    static class CreateTaskArgs extends HelixS4ArgsBase {
 
         @Parameter(names = "-zk", description = "ZooKeeper connection string")
         String zkConnectionString = "localhost:2181";

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
index 693e13d..4910bb1 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
@@ -67,8 +67,13 @@ public class DeployApp extends S4ArgsBase {
                 HelixBasedCoreModule.class.getName());
         // TODO merge with custom modules
 
-        AppConfig appConfig = new AppConfig.Builder().appClassName(deployArgs.appClass).appName(deployArgs.appName)
-                .appURI(deployArgs.s4rPath).customModulesNames(helixModules).customModulesURIs(null)
+        AppConfig appConfig = new AppConfig.Builder()
+                .appClassName(deployArgs.appClass)
+                .appName(deployArgs.appName)
+                .appURI(deployArgs.s4rPath)
+                .customModulesNames(
+                        new ImmutableList.Builder<String>().addAll(helixModules).addAll(deployArgs.modulesClassesNames)
+                                .build()).customModulesURIs(deployArgs.modulesURIs)
                 .namedParameters(Deploy.convertListArgsToMap(deployArgs.extraNamedParameters)).build();
         // properties.put("appConfig", appConfig.asMap());
         // properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
@@ -100,7 +105,7 @@ public class DeployApp extends S4ArgsBase {
     }
 
     @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
-    static class DeployAppArgs extends S4ArgsBase {
+    static class DeployAppArgs extends HelixS4ArgsBase {
 
         @Parameter(names = "-zk", description = "ZooKeeper connection string")
         String zkConnectionString = "localhost:2181";
@@ -126,5 +131,11 @@ public class DeployApp extends S4ArgsBase {
         @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
         List<String> extraNamedParameters = new ArrayList<String>();
 
+        @Parameter(names = { "-modulesURIs", "-mu" }, description = "URIs for fetching code of custom modules")
+        List<String> modulesURIs = new ArrayList<String>();
+
+        @Parameter(names = { "-modulesClasses", "-emc", "-mc" }, description = "Fully qualified class names of custom modules")
+        List<String> modulesClassesNames = new ArrayList<String>();
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
index bb86312..6d62a1e 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
@@ -11,7 +11,6 @@ import org.apache.s4.base.Event;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ClusterFromHelix;
-import org.apache.s4.tools.S4ArgsBase;
 import org.apache.s4.tools.Tools;
 
 import com.beust.jcommander.Parameter;
@@ -30,7 +29,7 @@ public class GenericEventAdapter {
             ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName, adapterArgs.zkConnectionString,
                     30, 60);
             manager.connect();
-            manager.addExternalViewChangeListener(cluster);	
+            manager.addExternalViewChangeListener(cluster);
 
             HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
             Builder keyBuilder = helixDataAccessor.keyBuilder();
@@ -43,9 +42,10 @@ public class GenericEventAdapter {
                 event.setStreamId("names");
                 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                 KryoSerDeser serializer = new KryoSerDeser(classLoader);
-//                EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
+                // EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
                 System.out.println("Sending event to partition:" + partitionId);
-                Destination destination = cluster.getDestination(adapterArgs.streamName, partitionId, emitter.getType());
+                Destination destination = cluster
+                        .getDestination(adapterArgs.streamName, partitionId, emitter.getType());
                 emitter.send(destination, serializer.serialize(event));
                 Thread.sleep(1000);
             }
@@ -56,7 +56,7 @@ public class GenericEventAdapter {
     }
 
     @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
-    static class AdapterArgs extends S4ArgsBase {
+    static class AdapterArgs extends HelixS4ArgsBase {
 
         @Parameter(names = "-zk", description = "ZooKeeper connection string")
         String zkConnectionString = "localhost:2181";

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/HelixS4ArgsBase.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/HelixS4ArgsBase.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/HelixS4ArgsBase.java
new file mode 100644
index 0000000..51e3cd3
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/HelixS4ArgsBase.java
@@ -0,0 +1,16 @@
+package org.apache.s4.tools.helix;
+
+import org.apache.s4.tools.S4ArgsBase;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Base class for args for Helix related S4 tools
+ * 
+ */
+public abstract class HelixS4ArgsBase extends S4ArgsBase {
+
+    @Parameter(names = { "-helix" }, description = "Use Helix - required for Helix-related S4 commands", required = true, hidden = true, arity = 0)
+    boolean helix;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
index e4f4526..c0aff7a 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
@@ -1,18 +1,12 @@
 package org.apache.s4.tools.helix;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.helix.ConfigScope;
-import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.IdealState.IdealStateModeProperty;
 import org.apache.s4.tools.S4ArgsBase;
 import org.apache.s4.tools.Tools;
 
@@ -43,7 +37,7 @@ public class RebalanceTask extends S4ArgsBase {
     }
 
     @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
-    static class RebalanceTaskArgs extends S4ArgsBase {
+    static class RebalanceTaskArgs extends HelixS4ArgsBase {
 
         @Parameter(names = "-zk", description = "ZooKeeper connection string")
         String zkConnectionString = "localhost:2181";

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
index 044e133..1ca06cc 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
@@ -1,19 +1,9 @@
 package org.apache.s4.tools.helix;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.ConfigScope;
-import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.IdealState.IdealStateModeProperty;
 import org.apache.s4.tools.S4ArgsBase;
 import org.apache.s4.tools.Tools;
-import org.apache.s4.tools.helix.CreateTask.CreateTaskArgs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -21,25 +11,22 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
 public class RemoveTask extends S4ArgsBase {
-    
+
     static Logger logger = LoggerFactory.getLogger(CreateTask.class);
 
     public static void main(String[] args) {
         RemoveTaskArgs taskArgs = new RemoveTaskArgs();
 
         Tools.parseArgs(taskArgs, args);
-        String msg = String.format(
-                "Removing task [{}] from cluster [{}]",
-                taskArgs.taskId, taskArgs.clusterName);
+        String msg = String.format("Removing task [{}] from cluster [{}]", taskArgs.taskId, taskArgs.clusterName);
         logger.info(msg);
         HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
         admin.dropResource(taskArgs.clusterName, taskArgs.taskId);
-        logger.info("Finished Removing task:" + taskArgs.taskId + " from cluster:"+ taskArgs.clusterName);
+        logger.info("Finished Removing task:" + taskArgs.taskId + " from cluster:" + taskArgs.clusterName);
     }
 
-    
     @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
-    static class RemoveTaskArgs extends S4ArgsBase {
+    static class RemoveTaskArgs extends HelixS4ArgsBase {
 
         @Parameter(names = "-zk", description = "ZooKeeper connection string")
         String zkConnectionString = "localhost:2181";


Mime
View raw message