incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [1/2] [S4-110] Formatting code to make the diff look better
Date Thu, 03 Jan 2013 08:02:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
index b54187f..ce41ecd 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
@@ -19,64 +19,53 @@ import org.apache.s4.tools.DeployApp.DeployAppArgs;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
-public class GenericEventAdapter
-{
+public class GenericEventAdapter {
 
-  public static void main(String[] args)
-  {
-    AdapterArgs adapterArgs = new AdapterArgs();
+    public static void main(String[] args) {
+        AdapterArgs adapterArgs = new AdapterArgs();
 
-    Tools.parseArgs(adapterArgs, args);
-    try
-    {
-      String instanceName = "adapter";
-      HelixManager manager = HelixManagerFactory.getZKHelixManager(
-          adapterArgs.clusterName, instanceName, InstanceType.SPECTATOR,
-          adapterArgs.zkConnectionString);
-      ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName,
-          adapterArgs.zkConnectionString, 30, 60);
-      manager.connect();
-      manager.addExternalViewChangeListener(cluster);
+        Tools.parseArgs(adapterArgs, args);
+        try {
+            String instanceName = "adapter";
+            HelixManager manager = HelixManagerFactory.getZKHelixManager(adapterArgs.clusterName,
instanceName,
+                    InstanceType.SPECTATOR, adapterArgs.zkConnectionString);
+            ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName, adapterArgs.zkConnectionString,
+                    30, 60);
+            manager.connect();
+            manager.addExternalViewChangeListener(cluster);
 
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      IdealState idealstate = helixDataAccessor.getProperty(keyBuilder
-          .idealStates(adapterArgs.streamName));
-      TCPEmitter emitter = new TCPEmitter(cluster, 1000);
-      while (true)
-      {
-        int partitionId = ((int) (Math.random() * 1000))
-            % idealstate.getNumPartitions();
-        Event event = new Event();
-        event.put("name", String.class,
-            "Hello world to partition:" + partitionId);
-        KryoSerDeser serializer = new KryoSerDeser();
-        EventMessage message = new EventMessage("-1", adapterArgs.streamName,
-            serializer.serialize(event));
-        System.out.println("Sending event to partition:"+partitionId);
-        emitter.send(partitionId, message);
-        Thread.sleep(1000);
-      }
-    } catch (Exception e)
-    {
-      e.printStackTrace();
-    }
+            HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+            Builder keyBuilder = helixDataAccessor.keyBuilder();
+            IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(adapterArgs.streamName));
+            TCPEmitter emitter = new TCPEmitter(cluster, 1000);
+            while (true) {
+                int partitionId = ((int) (Math.random() * 1000)) % idealstate.getNumPartitions();
+                Event event = new Event();
+                event.put("name", String.class, "Hello world to partition:" + partitionId);
+                KryoSerDeser serializer = new KryoSerDeser();
+                EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
+                System.out.println("Sending event to partition:" + partitionId);
+                emitter.send(partitionId, message);
+                Thread.sleep(1000);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
 
-  }
+    }
 
-  @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription =
"Create a new stream processor")
-  static class AdapterArgs extends S4ArgsBase
-  {
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription
= "Create a new stream processor")
+    static class AdapterArgs extends S4ArgsBase {
 
-    @Parameter(names = "-zk", description = "ZooKeeper connection string")
-    String zkConnectionString = "localhost:2181";
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
 
-    @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster",
required = true)
-    String clusterName;
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster",
required = true)
+        String clusterName;
 
-    @Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the event
will be sent to", required = true)
-    String streamName;
+        @Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the
event will be sent to", required = true)
+        String streamName;
 
-  }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
index 1b8214f..13b88ca 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
@@ -17,46 +17,43 @@ import org.apache.helix.model.IdealState.IdealStateModeProperty;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
-public class RebalanceTask extends S4ArgsBase
-{
-  public static void main(String[] args)
-  {
-    RebalanceTaskArgs taskArgs = new RebalanceTaskArgs();
-
-    Tools.parseArgs(taskArgs, args);
-
-    HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
-    // This does the assignment of partition to nodes. It uses a modified
-    // version of consistent hashing to distribute active partitions and standbys
-    // equally among nodes.
-    IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName, taskArgs.taskId);
-    List<String> instancesInGroup = new ArrayList<String>();
-    List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
-    for (String instanceName : instancesInCluster) {
-        InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
-        String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
-        if (nodeGroup.equals(taskArgs.nodeGroup)) {
-            instancesInGroup.add(instanceName);
+public class RebalanceTask extends S4ArgsBase {
+    public static void main(String[] args) {
+        RebalanceTaskArgs taskArgs = new RebalanceTaskArgs();
+
+        Tools.parseArgs(taskArgs, args);
+
+        HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
+        // This does the assignment of partition to nodes. It uses a modified
+        // version of consistent hashing to distribute active partitions and standbys
+        // equally among nodes.
+        IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName,
taskArgs.taskId);
+        List<String> instancesInGroup = new ArrayList<String>();
+        List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+        for (String instanceName : instancesInCluster) {
+            InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName,
instanceName);
+            String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+            if (nodeGroup.equals(taskArgs.nodeGroup)) {
+                instancesInGroup.add(instanceName);
+            }
         }
+        admin.rebalance(taskArgs.clusterName, currentAssignment, instancesInGroup);
     }
-    admin.rebalance(taskArgs.clusterName,currentAssignment, instancesInGroup);
-  }
 
-  @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription =
"Create a new stream processor")
-  static class RebalanceTaskArgs extends S4ArgsBase
-  {
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription
= "Create a new stream processor")
+    static class RebalanceTaskArgs extends S4ArgsBase {
 
-    @Parameter(names = "-zk", description = "ZooKeeper connection string")
-    String zkConnectionString = "localhost:2181";
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
 
-    @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster",
required = true)
-    String clusterName;
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster",
required = true)
+        String clusterName;
 
-    @Parameter(names={"-id","-taskId"},description = "id of the task that produces/consumes
a stream", required = true, arity = 1)
-    String taskId;
+        @Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes
a stream", required = true, arity = 1)
+        String taskId;
 
-    @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the
task needs to be run", required = true, arity = 1)
-    String nodeGroup = "default";
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where
the task needs to be run", required = true, arity = 1)
+        String nodeGroup = "default";
 
-  }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
index bd4224e..2be58c1 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
@@ -54,521 +54,413 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.google.common.collect.Maps;
 
-public class S4Status extends S4ArgsBase
-{
-  static Logger logger = LoggerFactory.getLogger(S4Status.class);
-
-  private static String NONE = "--";
-
-  public static void main(String[] args)
-  {
-
-    StatusArgs statusArgs = new StatusArgs();
-    Tools.parseArgs(statusArgs, args);
-
-    try
-    {
-      if (statusArgs.clusters.size() > 0)
-      {
-        for (String cluster : statusArgs.clusters)
-        {
-          HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster,
-              "ADMIN", InstanceType.ADMINISTRATOR,
-              statusArgs.zkConnectionString);
-          manager.connect();
-          ConfigAccessor configAccessor = manager.getConfigAccessor();
-          ConfigScopeBuilder builder = new ConfigScopeBuilder();
-          printClusterInfo(manager, cluster);
-          List<String> resourcesInCluster = manager.getClusterManagmentTool()
-              .getResourcesInCluster(cluster);
-
-          List<String> apps = new ArrayList<String>();
-          List<String> tasks = new ArrayList<String>();
-
-          for (String resource : resourcesInCluster)
-          {
-            ConfigScope scope = builder.forCluster(cluster)
-                .forResource(resource).build();
-            String resourceType = configAccessor.get(scope, "type");
-            if ("App".equals(resourceType))
-            {
-              apps.add(resource);
-            } else if ("Task".equals(resourceType))
-            {
-              tasks.add(resource);
+public class S4Status extends S4ArgsBase {
+    static Logger logger = LoggerFactory.getLogger(S4Status.class);
+
+    private static String NONE = "--";
+
+    public static void main(String[] args) {
+
+        StatusArgs statusArgs = new StatusArgs();
+        Tools.parseArgs(statusArgs, args);
+
+        try {
+            if (statusArgs.clusters.size() > 0) {
+                for (String cluster : statusArgs.clusters) {
+                    HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster,
"ADMIN",
+                            InstanceType.ADMINISTRATOR, statusArgs.zkConnectionString);
+                    manager.connect();
+                    ConfigAccessor configAccessor = manager.getConfigAccessor();
+                    ConfigScopeBuilder builder = new ConfigScopeBuilder();
+                    printClusterInfo(manager, cluster);
+                    List<String> resourcesInCluster = manager.getClusterManagmentTool().getResourcesInCluster(cluster);
+
+                    List<String> apps = new ArrayList<String>();
+                    List<String> tasks = new ArrayList<String>();
+
+                    for (String resource : resourcesInCluster) {
+                        ConfigScope scope = builder.forCluster(cluster).forResource(resource).build();
+                        String resourceType = configAccessor.get(scope, "type");
+                        if ("App".equals(resourceType)) {
+                            apps.add(resource);
+                        } else if ("Task".equals(resourceType)) {
+                            tasks.add(resource);
+                        }
+                    }
+                    if (statusArgs.apps == null && statusArgs.streams == null) {
+                        statusArgs.apps = apps;
+                        statusArgs.streams = tasks;
+                    }
+                    for (String app : statusArgs.apps) {
+                        if (resourcesInCluster.contains(app)) {
+                            printAppInfo(manager, cluster, app);
+                        }
+                    }
+
+                    if (statusArgs.streams != null && statusArgs.streams.size() >
0) {
+                        for (String stream : statusArgs.streams) {
+                            if (resourcesInCluster.contains(stream)) {
+                                printStreamInfo(manager, cluster, stream);
+                            }
+                        }
+                    }
+                    manager.disconnect();
+                }
             }
-          }
-          if (statusArgs.apps == null && statusArgs.streams == null)
-          {
-            statusArgs.apps = apps;
-            statusArgs.streams = tasks;
-          }
-          for (String app : statusArgs.apps)
-          {
-            if (resourcesInCluster.contains(app))
-            {
-              printAppInfo(manager, cluster, app);
-            }
-          }
-
-          if (statusArgs.streams != null && statusArgs.streams.size() > 0)
-          {
-            for (String stream : statusArgs.streams)
-            {
-              if (resourcesInCluster.contains(stream))
-              {
-                printStreamInfo(manager, cluster, stream);
-              }
+        } catch (Exception e) {
+            logger.error("Cannot get the status of S4", e);
+        }
+
+    }
+
+    private static void printStreamInfo(HelixManager manager, String cluster, String taskId)
{
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
+        String streamName = configAccessor.get(scope, "streamName");
+        String taskType = configAccessor.get(scope, "taskType");
+
+        System.out.println("Task Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20), inMiddle("Cluster",
20),
+                inMiddle("Description", 90));
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20), inMiddle(cluster, 20),
+                inMiddle(streamName + " " + taskType, 90));
+        System.out.println(generateEdge(130));
+        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = helixDataAccessor.keyBuilder();
+        IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(taskId));
+        ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(taskId));
+        List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+        System.out.format("%-50s%-100s%n", inMiddle("Partition", 50), inMiddle("State", 20));
+        System.out.println(generateEdge(130));
+        for (String partition : assignment.getPartitionSet()) {
+            Map<String, String> stateMap = view.getStateMap(partition);
+            StringBuilder sb = new StringBuilder();
+            String delim = "";
+            for (String instance : stateMap.keySet()) {
+                sb.append(delim);
+                String state = stateMap.get(instance);
+                if (liveInstances.contains(instance)) {
+                    sb.append(instance).append(":").append(state);
+                } else {
+                    sb.append(instance).append(":").append("OFFLINE");
+                }
+                delim = ", ";
             }
-          }
-          manager.disconnect();
+            System.out.format("%-50s%-10s%n", inMiddle(partition, 50), inMiddle(sb.toString(),
100));
         }
-      }
-    } catch (Exception e)
-    {
-      logger.error("Cannot get the status of S4", e);
+        System.out.println(generateEdge(130));
     }
 
-  }
-
-  private static void printStreamInfo(HelixManager manager, String cluster,
-      String taskId)
-  {
-    ConfigAccessor configAccessor = manager.getConfigAccessor();
-    ConfigScopeBuilder builder = new ConfigScopeBuilder();
-    ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
-    String streamName = configAccessor.get(scope, "streamName");
-    String taskType = configAccessor.get(scope, "taskType");
-
-    System.out.println("Task Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20),
-        inMiddle("Cluster", 20), inMiddle("Description", 90));
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20),
-        inMiddle(cluster, 20), inMiddle(streamName + " " + taskType, 90));
-    System.out.println(generateEdge(130));
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    IdealState assignment = helixDataAccessor.getProperty(keyBuilder
-        .idealStates(taskId));
-    ExternalView view = helixDataAccessor.getProperty(keyBuilder
-        .externalView(taskId));
-    List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
-        .liveInstances());
-    System.out.format("%-50s%-100s%n", inMiddle("Partition", 50),
-        inMiddle("State", 20));
-    System.out.println(generateEdge(130));
-    for (String partition : assignment.getPartitionSet())
-    {
-      Map<String, String> stateMap = view.getStateMap(partition);
-      StringBuilder sb = new StringBuilder();
-      String delim="";
-      for (String instance : stateMap.keySet())
-      {
-        sb.append(delim);
-        String state = stateMap.get(instance);
-        if (liveInstances.contains(instance))
-        {
-          sb.append(instance).append(":").append(state);
-        } else
-        {
-          sb.append(instance).append(":").append("OFFLINE");
+    private static void printAppInfo(HelixManager manager, String cluster, String app) {
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
+        String uri = configAccessor.get(scope, "s4r_uri");
+
+        System.out.println("App Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster",
20), inMiddle("URI", 90));
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20), inMiddle(cluster, 20),
inMiddle(uri, 90));
+        System.out.println(generateEdge(130));
+        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = helixDataAccessor.keyBuilder();
+        IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(app));
+        ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(app));
+        List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+        Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
+        Map<String, String> appStateMap = view.getStateMap(app);
+        System.out.format("%-50s%-20s%n", inMiddle("Node id", 50), inMiddle("DEPLOYED", 20));
+        System.out.println(generateEdge(130));
+        for (String instance : assignmentMap.keySet()) {
+            String state = appStateMap.get(instance);
+            System.out.format("%-50s%-10s%n", inMiddle(instance, 50),
+                    inMiddle((("ONLINE".equals(state) && liveInstances.contains(instance))
? "Y" : "N"), 20));
         }
-       delim=", ";
-      }
-      System.out.format("%-50s%-10s%n", inMiddle(partition, 50),
-          inMiddle(sb.toString(), 100));
+
+        System.out.println(generateEdge(130));
+
     }
-    System.out.println(generateEdge(130));
-  }
-
-  private static void printAppInfo(HelixManager manager, String cluster,
-      String app)
-  {
-    ConfigAccessor configAccessor = manager.getConfigAccessor();
-    ConfigScopeBuilder builder = new ConfigScopeBuilder();
-    ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
-    String uri = configAccessor.get(scope, "s4r_uri");
-
-    System.out.println("App Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20),
-        inMiddle("Cluster", 20), inMiddle("URI", 90));
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20),
-        inMiddle(cluster, 20), inMiddle(uri, 90));
-    System.out.println(generateEdge(130));
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    IdealState assignment = helixDataAccessor.getProperty(keyBuilder
-        .idealStates(app));
-    ExternalView view = helixDataAccessor.getProperty(keyBuilder
-        .externalView(app));
-    List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
-        .liveInstances());
-    Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
-    Map<String, String> appStateMap = view.getStateMap(app);
-    System.out.format("%-50s%-20s%n", inMiddle("Node id", 50),
-        inMiddle("DEPLOYED", 20));
-    System.out.println(generateEdge(130));
-    for (String instance : assignmentMap.keySet())
-    {
-      String state = appStateMap.get(instance);
-      System.out
-          .format(
-              "%-50s%-10s%n",
-              inMiddle(instance, 50),
-              inMiddle((("ONLINE".equals(state) && liveInstances
-                  .contains(instance)) ? "Y" : "N"), 20));
+
+    private static void printClusterInfo(HelixManager manager, String cluster) {
+        HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = dataAccessor.keyBuilder();
+        List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
+        List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances());
+        if (liveInstances == null) {
+            liveInstances = Collections.emptyList();
+        }
+        System.out.println("Cluster Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
+        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20), inMiddle("Nodes",
20),
+                inMiddle("Active", 10), generateEdge(80));
+        System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", inMiddle("Node id", 10), inMiddle("Host",
50),
+                inMiddle("Port", 8), inMiddle("Active", 10));
+        System.out.println(generateEdge(130));
+
+        System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20), inMiddle("" + instances.size(),
8),
+                inMiddle("" + liveInstances.size(), 8));
+        boolean first = true;
+
+        for (String instance : instances) {
+            InstanceConfig config = dataAccessor.getProperty(keyBuilder.instanceConfig(instance));
+            // System.out.println(config);
+            if (first) {
+                first = false;
+            } else {
+                System.out.format("%n%-50s", " ");
+            }
+            System.out.format("%-10s%-46s%-10s%-10s", inMiddle("" + config.getId(), 10),
+                    inMiddle(config.getHostName(), 50), inMiddle(config.getPort() + "", 10),
+                    inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N",
10));
+        }
+
+        System.out.println();
     }
 
-    System.out.println(generateEdge(130));
-
-  }
-
-  private static void printClusterInfo(HelixManager manager, String cluster)
-  {
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = dataAccessor.keyBuilder();
-    List<String> instances = dataAccessor.getChildNames(keyBuilder
-        .instanceConfigs());
-    List<String> liveInstances = dataAccessor.getChildNames(keyBuilder
-        .liveInstances());
-    if (liveInstances == null)
-    {
-      liveInstances = Collections.emptyList();
+    @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators
= "=")
+    static class StatusArgs extends S4ArgsBase {
+
+        @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)",
required = false)
+        List<String> apps;
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified
S4 cluster(s)", required = true)
+        List<String> clusters;
+
+        @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified
published stream(s)", required = false)
+        List<String> streams;
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in
ms")
+        int timeout = 10000;
     }
-    System.out.println("Cluster Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
-    System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20),
-        inMiddle("Nodes", 20), inMiddle("Active", 10), generateEdge(80));
-    System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ",
-        inMiddle("Node id", 10), inMiddle("Host", 50), inMiddle("Port", 8),
-        inMiddle("Active", 10));
-    System.out.println(generateEdge(130));
-
-    System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20),
-        inMiddle("" + instances.size(), 8),
-        inMiddle("" + liveInstances.size(), 8));
-    boolean first = true;
-
-    for (String instance : instances)
-    {
-      InstanceConfig config = dataAccessor.getProperty(keyBuilder
-          .instanceConfig(instance));
-      // System.out.println(config);
-      if (first)
-      {
-        first = false;
-      } else
-      {
-        System.out.format("%n%-50s", " ");
-      }
-      System.out
-          .format(
-              "%-10s%-46s%-10s%-10s",
-              inMiddle("" + config.getId(), 10),
-              inMiddle(config.getHostName(), 50),
-              inMiddle(config.getPort() + "", 10),
-              inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y"
-                  : "N", 10));
+
+    private static void showAppsStatus(List<Cluster> clusters) {
+        System.out.println("App Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster",
20), inMiddle("URI", 90));
+        System.out.println(generateEdge(130));
+        for (Cluster cluster : clusters) {
+            if (!NONE.equals(cluster.app.name)) {
+                System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
+                        inMiddle(cluster.app.cluster, 20), cluster.app.uri);
+            }
+        }
+        System.out.println(generateEdge(130));
+
     }
 
-    System.out.println();
-  }
-
-  @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators
= "=")
-  static class StatusArgs extends S4ArgsBase
-  {
-
-    @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)",
required = false)
-    List<String> apps;
-
-    @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified
S4 cluster(s)", required = true)
-    List<String> clusters;
-
-    @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified
published stream(s)", required = false)
-    List<String> streams;
-
-    @Parameter(names = "-zk", description = "ZooKeeper connection string")
-    String zkConnectionString = "localhost:2181";
-
-    @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
-    int timeout = 10000;
-  }
-
-  private static void showAppsStatus(List<Cluster> clusters)
-  {
-    System.out.println("App Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20),
-        inMiddle("Cluster", 20), inMiddle("URI", 90));
-    System.out.println(generateEdge(130));
-    for (Cluster cluster : clusters)
-    {
-      if (!NONE.equals(cluster.app.name))
-      {
-        System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
-            inMiddle(cluster.app.cluster, 20), cluster.app.uri);
-      }
+    private static void showClustersStatus(List<Cluster> clusters) {
+        System.out.println("Cluster Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
+        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20),
inMiddle("Tasks", 10),
+                generateEdge(80));
+        System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task
id", 10),
+                inMiddle("Host", 50), inMiddle("Port", 8));
+        System.out.println(generateEdge(130));
+
+        for (Cluster cluster : clusters) {
+            System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
+                    inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
+                    inMiddle("" + cluster.nodes.size(), 8));
+            boolean first = true;
+            for (ClusterNode node : cluster.nodes) {
+                if (first) {
+                    first = false;
+                } else {
+                    System.out.format("%n%-60s", " ");
+                }
+                System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
+                        inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "",
10));
+            }
+            System.out.println();
+        }
+        System.out.println(generateEdge(130));
     }
-    System.out.println(generateEdge(130));
-
-  }
-
-  private static void showClustersStatus(List<Cluster> clusters)
-  {
-    System.out.println("Cluster Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
-    System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20),
-        inMiddle("App", 20), inMiddle("Tasks", 10), generateEdge(80));
-    System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ",
-        inMiddle("Number", 8), inMiddle("Task id", 10), inMiddle("Host", 50),
-        inMiddle("Port", 8));
-    System.out.println(generateEdge(130));
-
-    for (Cluster cluster : clusters)
-    {
-      System.out.format("%-20s%-20s%-10s%-10s",
-          inMiddle(cluster.clusterName, 20), inMiddle(cluster.app.name, 20),
-          inMiddle("" + cluster.taskNumber, 8),
-          inMiddle("" + cluster.nodes.size(), 8));
-      boolean first = true;
-      for (ClusterNode node : cluster.nodes)
-      {
-        if (first)
-        {
-          first = false;
-        } else
-        {
-          System.out.format("%n%-60s", " ");
+
+    private static void showStreamsStatus(List<Stream> streams) {
+        System.out.println("Stream Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers",
55),
+                inMiddle("Consumers", 55));
+        System.out.println(generateEdge(130));
+
+        for (Stream stream : streams) {
+            System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
+                    inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
+                    inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
         }
-        System.out.format("%-10s%-50s%-10s",
-            inMiddle("" + node.getTaskId(), 10),
-            inMiddle(node.getMachineName(), 50),
-            inMiddle(node.getPort() + "", 10));
-      }
-      System.out.println();
+        System.out.println(generateEdge(130));
+
     }
-    System.out.println(generateEdge(130));
-  }
-
-  private static void showStreamsStatus(List<Stream> streams)
-  {
-    System.out.println("Stream Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20),
-        inMiddle("Producers", 55), inMiddle("Consumers", 55));
-    System.out.println(generateEdge(130));
-
-    for (Stream stream : streams)
-    {
-      System.out
-          .format(
-              "%-20s%-55s%-55s%n",
-              inMiddle(stream.streamName, 20),
-              inMiddle(getFormatString(stream.producers, stream.clusterAppMap),
-                  55),
-              inMiddle(getFormatString(stream.consumers, stream.clusterAppMap),
-                  55));
+
+    private static String inMiddle(String content, int width) {
+        int i = (width - content.length()) / 2;
+        return String.format("%" + i + "s%s", " ", content);
     }
-    System.out.println(generateEdge(130));
-
-  }
-
-  private static String inMiddle(String content, int width)
-  {
-    int i = (width - content.length()) / 2;
-    return String.format("%" + i + "s%s", " ", content);
-  }
-
-  private static String generateEdge(int length)
-  {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < length; i++)
-    {
-      sb.append("-");
+
+    private static String generateEdge(int length) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            sb.append("-");
+        }
+        return sb.toString();
     }
-    return sb.toString();
-  }
-
-  /**
-   * show as cluster1(app1), cluster2(app2)
-   * 
-   * @param clusters
-   *          cluster list
-   * @param clusterAppMap
-   *          <cluster,app>
-   * @return
-   */
-  private static String getFormatString(Collection<String> clusters,
-      Map<String, String> clusterAppMap)
-  {
-    if (clusters == null || clusters.size() == 0)
-    {
-      return NONE;
-    } else
-    {
-      // show as: cluster1(app1), cluster2(app2)
-      StringBuilder sb = new StringBuilder();
-      for (String cluster : clusters)
-      {
-        String app = clusterAppMap.get(cluster);
-        sb.append(cluster);
-        if (!NONE.equals(app))
-        {
-          sb.append("(").append(app).append(")");
+
+    /**
+     * show as cluster1(app1), cluster2(app2)
+     * 
+     * @param clusters
+     *            cluster list
+     * @param clusterAppMap
+     *            <cluster,app>
+     * @return
+     */
+    private static String getFormatString(Collection<String> clusters, Map<String,
String> clusterAppMap) {
+        if (clusters == null || clusters.size() == 0) {
+            return NONE;
+        } else {
+            // show as: cluster1(app1), cluster2(app2)
+            StringBuilder sb = new StringBuilder();
+            for (String cluster : clusters) {
+                String app = clusterAppMap.get(cluster);
+                sb.append(cluster);
+                if (!NONE.equals(app)) {
+                    sb.append("(").append(app).append(")");
+                }
+                sb.append(" ");
+            }
+            return sb.toString();
         }
-        sb.append(" ");
-      }
-      return sb.toString();
     }
-  }
 
-  static class Stream
-  {
+    static class Stream {
 
-    private final ZkClient zkClient;
-    private final String consumerPath;
-    private final String producerPath;
+        private final ZkClient zkClient;
+        private final String consumerPath;
+        private final String producerPath;
 
-    String streamName;
-    Set<String> producers = new HashSet<String>();// cluster name
-    Set<String> consumers = new HashSet<String>();// cluster name
+        String streamName;
+        Set<String> producers = new HashSet<String>();// cluster name
+        Set<String> consumers = new HashSet<String>();// cluster name
 
-    Map<String, String> clusterAppMap = Maps.newHashMap();
+        Map<String, String> clusterAppMap = Maps.newHashMap();
 
-    public Stream(String streamName, ZkClient zkClient) throws Exception
-    {
-      this.streamName = streamName;
-      this.zkClient = zkClient;
-      this.consumerPath = "/s4/streams/" + streamName + "/consumers";
-      this.producerPath = "/s4/streams/" + streamName + "/producers";
-      readStreamFromZk();
-    }
+        public Stream(String streamName, ZkClient zkClient) throws Exception {
+            this.streamName = streamName;
+            this.zkClient = zkClient;
+            this.consumerPath = "/s4/streams/" + streamName + "/consumers";
+            this.producerPath = "/s4/streams/" + streamName + "/producers";
+            readStreamFromZk();
+        }
 
-    private void readStreamFromZk() throws Exception
-    {
-      List<String> consumerNodes = zkClient.getChildren(consumerPath);
-      for (String node : consumerNodes)
-      {
-        ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
-        consumers.add(consumer.getSimpleField("clusterName"));
-      }
-
-      List<String> producerNodes = zkClient.getChildren(producerPath);
-      for (String node : producerNodes)
-      {
-        ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
-        producers.add(consumer.getSimpleField("clusterName"));
-      }
-
-      getAppNames();
-    }
+        private void readStreamFromZk() throws Exception {
+            List<String> consumerNodes = zkClient.getChildren(consumerPath);
+            for (String node : consumerNodes) {
+                ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
+                consumers.add(consumer.getSimpleField("clusterName"));
+            }
 
-    private void getAppNames()
-    {
-      Set<String> clusters = new HashSet<String>(consumers);
-      clusters.addAll(producers);
-      for (String cluster : clusters)
-      {
-        clusterAppMap.put(cluster, getApp(cluster, zkClient));
-      }
-    }
+            List<String> producerNodes = zkClient.getChildren(producerPath);
+            for (String node : producerNodes) {
+                ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
+                producers.add(consumer.getSimpleField("clusterName"));
+            }
 
-    public boolean containsCluster(String cluster)
-    {
-      if (producers.contains(cluster) || consumers.contains(cluster))
-      {
-        return true;
-      }
-      return false;
-    }
+            getAppNames();
+        }
+
+        private void getAppNames() {
+            Set<String> clusters = new HashSet<String>(consumers);
+            clusters.addAll(producers);
+            for (String cluster : clusters) {
+                clusterAppMap.put(cluster, getApp(cluster, zkClient));
+            }
+        }
 
-    private static String getApp(String clusterName, ZkClient zkClient)
-    {
-      String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
-      if (zkClient.exists(appPath))
-      {
-        ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName
-            + "/app/s4App");
-        return appRecord.getSimpleField("name");
-      }
-      return NONE;
+        public boolean containsCluster(String cluster) {
+            if (producers.contains(cluster) || consumers.contains(cluster)) {
+                return true;
+            }
+            return false;
+        }
+
+        private static String getApp(String clusterName, ZkClient zkClient) {
+            String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+            if (zkClient.exists(appPath)) {
+                ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
+                return appRecord.getSimpleField("name");
+            }
+            return NONE;
+        }
     }
-  }
-
-  static class App
-  {
-    private String name = NONE;
-    private String cluster;
-    private String uri = NONE;
-  }
-
-  static class Cluster
-  {
-    private final ZkClient zkClient;
-    private final String taskPath;
-    private final String processPath;
-    private final String appPath;
-
-    String clusterName;
-    int taskNumber;
-    App app;
-
-    List<ClusterNode> nodes = new ArrayList<ClusterNode>();
-
-    public Cluster(String clusterName, ZkClient zkClient) throws Exception
-    {
-      this.clusterName = clusterName;
-      this.zkClient = zkClient;
-      this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
-      this.processPath = "/s4/clusters/" + clusterName + "/process";
-      this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
-      readClusterFromZk();
+
+    static class App {
+        private String name = NONE;
+        private String cluster;
+        private String uri = NONE;
     }
 
-    public void readClusterFromZk() throws Exception
-    {
-      List<String> processes;
-      List<String> tasks;
-
-      tasks = zkClient.getChildren(taskPath);
-      processes = zkClient.getChildren(processPath);
-
-      taskNumber = tasks.size();
-
-      for (int i = 0; i < processes.size(); i++)
-      {
-        ZNRecord process = zkClient.readData(
-            processPath + "/" + processes.get(i), true);
-        if (process != null)
-        {
-          int partition = Integer.parseInt(process.getSimpleField("partition"));
-          String host = process.getSimpleField("host");
-          int port = Integer.parseInt(process.getSimpleField("port"));
-          String taskId = process.getSimpleField("taskId");
-          ClusterNode node = new ClusterNode(partition, port, host, taskId);
-          nodes.add(node);
+    static class Cluster {
+        private final ZkClient zkClient;
+        private final String taskPath;
+        private final String processPath;
+        private final String appPath;
+
+        String clusterName;
+        int taskNumber;
+        App app;
+
+        List<ClusterNode> nodes = new ArrayList<ClusterNode>();
+
+        public Cluster(String clusterName, ZkClient zkClient) throws Exception {
+            this.clusterName = clusterName;
+            this.zkClient = zkClient;
+            this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+            this.processPath = "/s4/clusters/" + clusterName + "/process";
+            this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+            readClusterFromZk();
         }
-      }
-
-      app = new App();
-      app.cluster = clusterName;
-      try
-      {
-        ZNRecord appRecord = zkClient.readData(appPath);
-        app.name = appRecord.getSimpleField("name");
-        app.uri = appRecord.getSimpleField("s4r_uri");
-      } catch (ZkNoNodeException e)
-      {
-        logger.warn(appPath + " doesn't exist");
-      }
-    }
 
-  }
+        public void readClusterFromZk() throws Exception {
+            List<String> processes;
+            List<String> tasks;
+
+            tasks = zkClient.getChildren(taskPath);
+            processes = zkClient.getChildren(processPath);
+
+            taskNumber = tasks.size();
+
+            for (int i = 0; i < processes.size(); i++) {
+                ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i),
true);
+                if (process != null) {
+                    int partition = Integer.parseInt(process.getSimpleField("partition"));
+                    String host = process.getSimpleField("host");
+                    int port = Integer.parseInt(process.getSimpleField("port"));
+                    String taskId = process.getSimpleField("taskId");
+                    ClusterNode node = new ClusterNode(partition, port, host, taskId);
+                    nodes.add(node);
+                }
+            }
+
+            app = new App();
+            app.cluster = clusterName;
+            try {
+                ZNRecord appRecord = zkClient.readData(appPath);
+                app.name = appRecord.getSimpleField("name");
+                app.uri = appRecord.getSimpleField("s4r_uri");
+            } catch (ZkNoNodeException e) {
+                logger.warn(appPath + " doesn't exist");
+            }
+        }
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
index 824aa03..ab0ba3e 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
@@ -219,7 +219,7 @@ public class Status extends S4ArgsBase {
 
     /**
      * show as cluster1(app1), cluster2(app2)
-     *
+     * 
      * @param clusters
      *            cluster list
      * @param clusterAppMap

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9661f270/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 192869a..3940c6d 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -38,20 +38,11 @@ public class Tools {
 
     static Logger logger = LoggerFactory.getLogger(Tools.class);
 
-    enum Task { 
-        deployApp(DeployApp.class),
-        deploy(Deploy.class), 
-        node(Main.class), 
-        addNodes(AddNodes.class), 
-        zkServer(ZKServer.class), 
-        newCluster(CreateCluster.class), 
-        adapter(null), 
-        genericAdapter(GenericEventAdapter.class),
-        newApp(CreateApp.class), 
-        s4r(Package.class), 
-        status(S4Status.class),
-        createTask(CreateTask.class), 
-        rebalanceTask(RebalanceTask.class);
+    enum Task {
+        deployApp(DeployApp.class), deploy(Deploy.class), node(Main.class), addNodes(AddNodes.class),
zkServer(
+                ZKServer.class), newCluster(CreateCluster.class), adapter(null), genericAdapter(
+                GenericEventAdapter.class), newApp(CreateApp.class), s4r(Package.class),
status(S4Status.class), createTask(
+                CreateTask.class), rebalanceTask(RebalanceTask.class);
 
         Class<?> target;
 


Mime
View raw message