incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dfe...@apache.org
Subject [1/3] git commit: Remove NodeGroups, reuse existing s4 cluster ids
Date Fri, 22 Feb 2013 17:56:14 GMT
Remove NodeGroups, reuse existing s4 cluster ids


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/36acbce8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/36acbce8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/36acbce8

Branch: refs/heads/S4-110-new
Commit: 36acbce8c7b55d2b33791bcb3219f724ca5bbbf5
Parents: 0b93cae
Author: Daniel Gómez Ferro <danielgf@yahoo-inc.com>
Authored: Fri Feb 22 12:31:56 2013 +0100
Committer: Daniel Gómez Ferro <danielgf@yahoo-inc.com>
Committed: Fri Feb 22 12:31:56 2013 +0100

----------------------------------------------------------------------
 .../org/apache/s4/comm/helix/S4HelixConstants.java |    6 +++
 .../apache/s4/comm/topology/ClusterFromHelix.java  |    4 +-
 .../java/org/apache/s4/core/S4HelixBootstrap.java  |    7 ++-
 .../java/org/apache/s4/tools/helix/AddNodes.java   |   16 ++++----
 .../org/apache/s4/tools/helix/ClusterStatus.java   |    4 +-
 .../org/apache/s4/tools/helix/CreateCluster.java   |   31 ++++++++-------
 .../java/org/apache/s4/tools/helix/CreateTask.java |   20 ++++-----
 .../java/org/apache/s4/tools/helix/DeployApp.java  |   16 +++----
 .../org/apache/s4/tools/helix/RebalanceTask.java   |   14 +++----
 9 files changed, 62 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4HelixConstants.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4HelixConstants.java
b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4HelixConstants.java
new file mode 100644
index 0000000..9bf3fc6
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4HelixConstants.java
@@ -0,0 +1,6 @@
+package org.apache.s4.comm.helix;
+
+public class S4HelixConstants {
+    public static final String HELIX_CLUSTER_NAME = "S4";
+    public static final String GLOBAL_TASK_NAME = "GLOBAL";
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index d14bded..8a77606 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.ConfigScope;
 import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
@@ -39,6 +40,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.s4.base.Destination;
+import org.apache.s4.comm.helix.S4HelixConstants;
 import org.apache.s4.comm.tcp.TCPDestination;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -147,7 +149,7 @@ public class ClusterFromHelix extends RoutingTableProvider implements
Cluster {
             if (externalViewList != null) {
                 for (ExternalView extView : externalViewList) {
                     String resource = extView.getId();
-                    ConfigScope resourceScope = builder.forCluster(clusterName)
+                    ConfigScope resourceScope = builder.forCluster(S4HelixConstants.HELIX_CLUSTER_NAME)
                             .forResource(resource).build();
                     String resourceType = configAccessor.get(resourceScope,
                             "type");

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index 3944971..3d58dbc 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -13,6 +13,7 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.s4.comm.helix.S4HelixConstants;
 import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.util.ArchiveFetchException;
@@ -91,7 +92,7 @@ public class S4HelixBootstrap implements Bootstrap {
         // start a HelixController to manage the cluster
         // TODO set this as optional (small clusters only)
         String controllerName = Inet4Address.getLocalHost().getCanonicalHostName() + UUID.randomUUID().toString();
-        HelixControllerMain.startHelixController(zookeeperAddress, clusterName, controllerName,
+        HelixControllerMain.startHelixController(zookeeperAddress, S4HelixConstants.HELIX_CLUSTER_NAME,
controllerName,
                 HelixControllerMain.STANDALONE);
         // this.parentInjector = parentInjector;
         S4HelixBootstrap.rootInjector = parentInjector;
@@ -103,8 +104,8 @@ public class S4HelixBootstrap implements Bootstrap {
     private void registerWithHelix() {
         HelixManager helixManager;
         try {
-            helixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
InstanceType.PARTICIPANT,
-                    zookeeperAddress);
+            helixManager = HelixManagerFactory.getZKHelixManager(S4HelixConstants.HELIX_CLUSTER_NAME,
instanceName,
+                    InstanceType.PARTICIPANT, zookeeperAddress);
             helixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
taskStateModelFactory);
             helixManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
appStateModelFactory);
             helixManager.connect();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
index 3a598f7..2e155ed 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
@@ -21,6 +21,7 @@ package org.apache.s4.tools.helix;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.helix.S4HelixConstants;
 import org.apache.s4.tools.Tools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,13 +47,16 @@ public class AddNodes {
                 for (int i = 0; i < clusterArgs.nbNodes; i++) {
                     String host = "localhost";
                     if (split.length > 0 && split.length == clusterArgs.nbNodes)
{
-                        host = split[i].trim();
+                        String node = split[i].trim();
+                        if (!node.isEmpty()) {
+                            host = node;
+                        }
                     }
-                    InstanceConfig instanceConfig = new InstanceConfig("node_" + host + "_"
+ initialPort);
+                    InstanceConfig instanceConfig = new InstanceConfig(host + "_" + initialPort);
                     instanceConfig.setHostName(host);
                     instanceConfig.setPort("" + initialPort);
-                    instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
-                    helixAdmin.addInstance(clusterArgs.clusterName, instanceConfig);
+                    instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.clusterName);
+                    helixAdmin.addInstance(S4HelixConstants.HELIX_CLUSTER_NAME, instanceConfig);
                     initialPort = initialPort + 1;
                 }
             }
@@ -80,10 +84,6 @@ public class AddNodes {
 
         @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening
port for nodes in this cluster. First node listens on the specified port, other nodes listen
on port initial + nodeIndex", required = true)
         int firstListeningPort = -1;
-
-        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Assign the nodes to one
or more groups. This will be useful when you create task", required = false)
-        String nodeGroup = "default";
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
index 73d05c2..8dce98e 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
@@ -118,7 +118,7 @@ public class ClusterStatus extends S4ArgsBase {
         System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle("Task Id", 20),
                 StatusUtils.inMiddle("Cluster", 20), StatusUtils.inMiddle("Description",
90));
         System.out.println(StatusUtils.generateEdge(130));
-        System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle(taskId, 20), StatusUtils.inMiddle(cluster,
20),
+        System.out.format("%-30s%-20s%-90s%n", StatusUtils.inMiddle(taskId, 30), StatusUtils.inMiddle(cluster,
20),
                 StatusUtils.inMiddle(streamName + " " + ((taskType == null) ? "(untyped)"
: taskType), 90));
         System.out.println(StatusUtils.generateEdge(130));
         HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
@@ -230,7 +230,7 @@ public class ClusterStatus extends S4ArgsBase {
             } else {
                 System.out.format("%n%-50s", " ");
             }
-            System.out.format("%-10s%-46s%-10s%-10s", StatusUtils.inMiddle("" + config.getId(),
10),
+            System.out.format("%-30s%-46s%-10s%-10s", StatusUtils.inMiddle("" + config.getId(),
30),
                     StatusUtils.inMiddle(config.getHostName(), 50), StatusUtils.inMiddle(config.getPort()
+ "", 10),
                     StatusUtils.inMiddle(liveInstances.contains(config.getInstanceName())
? "Y" : "N", 10));
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
index f71d221..d616196 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
@@ -18,10 +18,12 @@
 
 package org.apache.s4.tools.helix;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.s4.comm.helix.S4HelixConstants;
 import org.apache.s4.tools.Tools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,14 +42,19 @@ public class CreateCluster {
         try {
             logger.info("preparing new cluster [{}] with [{}] node(s)", clusterArgs.clusterName,
clusterArgs.nbNodes);
             ZKHelixAdmin helixAdmin = new ZKHelixAdmin(clusterArgs.zkConnectionString);
-            helixAdmin.addCluster(clusterArgs.clusterName, false);
-            StateModelDefinition onlineofflinemodel = new StateModelDefinition(
-                    new StateModelConfigGenerator().generateConfigForOnlineOffline());
-            StateModelDefinition leaderstandbymodel = new StateModelDefinition(
-                    new StateModelConfigGenerator().generateConfigForLeaderStandby());
-
-            helixAdmin.addStateModelDef(clusterArgs.clusterName, "OnlineOffline", onlineofflinemodel);
-            helixAdmin.addStateModelDef(clusterArgs.clusterName, "LeaderStandby", leaderstandbymodel);
+            try {
+                helixAdmin.addCluster(S4HelixConstants.HELIX_CLUSTER_NAME, false);
+                StateModelDefinition onlineofflinemodel = new StateModelDefinition(
+                        new StateModelConfigGenerator().generateConfigForOnlineOffline());
+                StateModelDefinition leaderstandbymodel = new StateModelDefinition(
+                        new StateModelConfigGenerator().generateConfigForLeaderStandby());
+
+                helixAdmin.addStateModelDef(S4HelixConstants.HELIX_CLUSTER_NAME, "OnlineOffline",
onlineofflinemodel);
+                helixAdmin.addStateModelDef(S4HelixConstants.HELIX_CLUSTER_NAME, "LeaderStandby",
leaderstandbymodel);
+            } catch (HelixException he) {
+                // S4 configuration already exists, ignore
+            }
+
             if (clusterArgs.nbNodes > 0) {
                 String[] split = clusterArgs.nodes.split(",");
                 int initialPort = clusterArgs.firstListeningPort;
@@ -59,8 +66,8 @@ public class CreateCluster {
                     }
                     instanceConfig.setHostName(host);
                     instanceConfig.setPort("" + initialPort);
-                    instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
-                    helixAdmin.addInstance(clusterArgs.clusterName, instanceConfig);
+                    instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.clusterName);
+                    helixAdmin.addInstance(S4HelixConstants.HELIX_CLUSTER_NAME, instanceConfig);
                     initialPort = initialPort + 1;
                 }
             }
@@ -88,10 +95,6 @@ public class CreateCluster {
 
         @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening
port for nodes in this cluster. First node listens on the specified port, other nodes listen
on port initial + nodeIndex", required = true)
         int firstListeningPort = -1;
-
-        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Name of the App", required
= false, arity = 1)
-        String nodeGroup = "default";
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
index 428cddc..36a45bc 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
@@ -11,6 +11,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState.IdealStateModeProperty;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.helix.S4HelixConstants;
 import org.apache.s4.tools.S4ArgsBase;
 import org.apache.s4.tools.Tools;
 import org.slf4j.Logger;
@@ -28,32 +29,32 @@ public class CreateTask extends S4ArgsBase {
 
         Tools.parseArgs(taskArgs, args);
         String msg = String.format("Setting up new pe [{}] for stream(s) on nodes belonging
to node group {}",
-                taskArgs.taskId, taskArgs.streamName, taskArgs.nodeGroup);
+                taskArgs.taskId, taskArgs.streamName, taskArgs.clusterName);
         logger.info(msg);
         HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
         ConfigScopeBuilder builder = new ConfigScopeBuilder();
-        ConfigScope scope = builder.forCluster(taskArgs.clusterName).forResource(taskArgs.taskId).build();
+        ConfigScope scope = builder.forCluster(S4HelixConstants.HELIX_CLUSTER_NAME).forResource(taskArgs.taskId).build();
         Map<String, String> properties = new HashMap<String, String>();
-        properties.put("GROUP", taskArgs.nodeGroup);
+        properties.put("GROUP", taskArgs.clusterName);
         properties.put("type", "Task");
         properties.put("streamName", taskArgs.streamName);
         admin.setConfig(scope, properties);
         // A task is modeled as a resource in Helix
-        admin.addResource(taskArgs.clusterName, taskArgs.taskId, taskArgs.numPartitions,
"LeaderStandby",
+        admin.addResource(S4HelixConstants.HELIX_CLUSTER_NAME, taskArgs.taskId, taskArgs.numPartitions,
"LeaderStandby",
                 IdealStateModeProperty.AUTO.toString());
         // This does the assignment of partition to nodes. It uses a modified
         // version of consistent hashing to distribute active partitions and standbys
         // equally among nodes.
         List<String> instancesInGroup = new ArrayList<String>();
-        List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+        List<String> instancesInCluster = admin.getInstancesInCluster(S4HelixConstants.HELIX_CLUSTER_NAME);
         for (String instanceName : instancesInCluster) {
-            InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName,
instanceName);
+            InstanceConfig instanceConfig = admin.getInstanceConfig(S4HelixConstants.HELIX_CLUSTER_NAME,
instanceName);
             String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
-            if (nodeGroup.equals(taskArgs.nodeGroup)) {
+            if (nodeGroup.equals(taskArgs.clusterName)) {
                 instancesInGroup.add(instanceName);
             }
         }
-        admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1,
instancesInGroup);
+        admin.rebalance(S4HelixConstants.HELIX_CLUSTER_NAME, taskArgs.taskId, taskArgs.numStandBys
+ 1, instancesInGroup);
         logger.info("Finished setting up task:" + taskArgs.taskId + " on nodes " + instancesInGroup);
     }
 
@@ -78,8 +79,5 @@ public class CreateTask extends S4ArgsBase {
         @Parameter(names = { "-s", "-stream" }, description = "name of the stream the pe
listens to", required = true, arity = 1)
         String streamName;
 
-        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where
the task needs to be run", required = false, arity = 1)
-        String nodeGroup = "default";
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
index 4910bb1..686af79 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
@@ -16,6 +16,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateModeProperty;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.s4.comm.HelixBasedCommModule;
+import org.apache.s4.comm.helix.S4HelixConstants;
 import org.apache.s4.core.HelixBasedCoreModule;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.tools.Deploy;
@@ -41,7 +42,7 @@ public class DeployApp extends S4ArgsBase {
 
         HelixAdmin admin = new ZKHelixAdmin(deployArgs.zkConnectionString);
         ConfigScopeBuilder builder = new ConfigScopeBuilder();
-        ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
+        ConfigScope scope = builder.forCluster(S4HelixConstants.HELIX_CLUSTER_NAME).forResource(deployArgs.appName).build();
         Map<String, String> properties = new HashMap<String, String>();
 
         URI s4rURI = null;
@@ -81,7 +82,7 @@ public class DeployApp extends S4ArgsBase {
         properties.putAll(appConfig.asMap());
         admin.setConfig(scope, properties);
 
-        IdealState is = admin.getResourceIdealState(deployArgs.clusterName, deployArgs.appName);
+        IdealState is = admin.getResourceIdealState(S4HelixConstants.HELIX_CLUSTER_NAME,
deployArgs.appName);
         if (is == null) {
             is = new IdealState(deployArgs.appName);
         }
@@ -89,11 +90,11 @@ public class DeployApp extends S4ArgsBase {
         is.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
         is.setStateModelDefRef("OnlineOffline");
         List<String> instancesInGroup = new ArrayList<String>();
-        List<String> instancesInCluster = admin.getInstancesInCluster(deployArgs.clusterName);
+        List<String> instancesInCluster = admin.getInstancesInCluster(S4HelixConstants.HELIX_CLUSTER_NAME);
         for (String instanceName : instancesInCluster) {
-            InstanceConfig instanceConfig = admin.getInstanceConfig(deployArgs.clusterName,
instanceName);
+            InstanceConfig instanceConfig = admin.getInstanceConfig(S4HelixConstants.HELIX_CLUSTER_NAME,
instanceName);
             String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
-            if (nodeGroup.equals(deployArgs.nodeGroup)) {
+            if (nodeGroup.equals(deployArgs.clusterName)) {
                 instancesInGroup.add(instanceName);
             }
         }
@@ -101,7 +102,7 @@ public class DeployApp extends S4ArgsBase {
             is.setPartitionState(deployArgs.appName, instanceName, "ONLINE");
         }
 
-        admin.setResourceIdealState(deployArgs.clusterName, deployArgs.appName, is);
+        admin.setResourceIdealState(S4HelixConstants.HELIX_CLUSTER_NAME, deployArgs.appName,
is);
     }
 
     @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription
= "Create a new stream processor")
@@ -125,9 +126,6 @@ public class DeployApp extends S4ArgsBase {
         @Parameter(names = { "-a", "-appClass" }, description = "Full class name of the application
class (extending App or AdapterApp)", required = false)
         String appClass = "";
 
-        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where
the App needs to be deployed", required = false, arity = 1)
-        String nodeGroup = "default";
-
         @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated
list of inline configuration parameters, taking precedence over homonymous configuration parameters
from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter
= InlineConfigParameterConverter.class)
         List<String> extraNamedParameters = new ArrayList<String>();
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
index c0aff7a..dcedf7c 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
@@ -7,6 +7,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.helix.S4HelixConstants;
 import org.apache.s4.tools.S4ArgsBase;
 import org.apache.s4.tools.Tools;
 
@@ -23,17 +24,17 @@ public class RebalanceTask extends S4ArgsBase {
         // This does the assignment of partition to nodes. It uses a modified
         // version of consistent hashing to distribute active partitions and standbys
         // equally among nodes.
-        IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName,
taskArgs.taskId);
+        IdealState currentAssignment = admin.getResourceIdealState(S4HelixConstants.HELIX_CLUSTER_NAME,
taskArgs.taskId);
         List<String> instancesInGroup = new ArrayList<String>();
-        List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+        List<String> instancesInCluster = admin.getInstancesInCluster(S4HelixConstants.HELIX_CLUSTER_NAME);
         for (String instanceName : instancesInCluster) {
-            InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName,
instanceName);
+            InstanceConfig instanceConfig = admin.getInstanceConfig(S4HelixConstants.HELIX_CLUSTER_NAME,
instanceName);
             String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
-            if (nodeGroup.equals(taskArgs.nodeGroup)) {
+            if (nodeGroup.equals(taskArgs.clusterName)) {
                 instancesInGroup.add(instanceName);
             }
         }
-        admin.rebalance(taskArgs.clusterName, currentAssignment, instancesInGroup);
+        admin.rebalance(S4HelixConstants.HELIX_CLUSTER_NAME, currentAssignment, instancesInGroup);
     }
 
     @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription
= "Create a new stream processor")
@@ -48,8 +49,5 @@ public class RebalanceTask extends S4ArgsBase {
         @Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes
a stream", required = true, arity = 1)
         String taskId;
 
-        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where
the task needs to be run", required = true, arity = 1)
-        String nodeGroup = "default";
-
     }
 }


Mime
View raw message