Making minor changes to YarnProvisioner to maintain a fixed number of containers Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cb6aa4fa Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cb6aa4fa Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cb6aa4fa Branch: refs/heads/master Commit: cb6aa4fa0e82436f1d6714c3cdcf1435c510024a Parents: d1e7ca6 Author: Kishore Gopalakrishna Authored: Wed Feb 19 17:16:55 2014 -0800 Committer: Kishore Gopalakrishna Committed: Wed Feb 19 17:16:55 2014 -0800 ---------------------------------------------------------------------- .../controller/provisioner/ContainerSpec.java | 18 ++++-- .../stages/ContainerProvisioningStage.java | 2 +- .../provisioning/yarn/YarnProvisioner.java | 63 ++++++++++++++------ 3 files changed, 58 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/cb6aa4fa/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java index b393a64..4d3a521 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java @@ -23,23 +23,31 @@ public class ContainerSpec { /** * Some unique id representing the container. */ - ContainerId containerId; + ContainerId _containerId; - String memory; + int _memory; public ContainerSpec(ContainerId containerId) { - this.containerId = containerId; + this._containerId = containerId; } public ContainerId getContainerId() { - return containerId; + return _containerId; } @Override public String toString() { - return containerId.toString(); + return _containerId.toString(); + } + + public void setMemory(int memory){ + _memory = memory; } + public int getMemory(){ + return _memory; + } + public static ContainerSpec from(String serialized) { return new ContainerSpec(ContainerId.from(serialized)); } http://git-wip-us.apache.org/repos/asf/helix/blob/cb6aa4fa/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java index 2f97c5a..48166bf 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java @@ -123,7 +123,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { // allocate new containers for (final ContainerSpec spec : response.getContainersToAcquire()) { // random participant id - final ParticipantId participantId = ParticipantId.from(UUID.randomUUID().toString()); + final ParticipantId participantId = ParticipantId.from(spec.getContainerId().stringify()); // create a new Participant, attach the container spec InstanceConfig instanceConfig = new InstanceConfig(participantId); instanceConfig.setContainerSpec(spec); http://git-wip-us.apache.org/repos/asf/helix/blob/cb6aa4fa/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java index 477023b..4fcc219 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java @@ -8,8 +8,10 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Vector; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -44,7 +46,9 @@ import org.apache.helix.HelixManager; import org.apache.helix.api.Cluster; import org.apache.helix.api.Participant; import org.apache.helix.api.config.ContainerConfig; +import org.apache.helix.api.config.ParticipantConfig; import org.apache.helix.api.config.ResourceConfig; +import org.apache.helix.api.id.ParticipantId; import org.apache.helix.api.id.ResourceId; import org.apache.helix.controller.provisioner.ContainerId; import org.apache.helix.controller.provisioner.ContainerProvider; @@ -54,6 +58,7 @@ import org.apache.helix.controller.provisioner.Provisioner; import org.apache.helix.controller.provisioner.ProvisionerConfig; import org.apache.helix.controller.provisioner.TargetProvider; import org.apache.helix.controller.provisioner.TargetProviderResponse; +import org.apache.helix.model.InstanceConfig; import com.google.common.collect.Lists; import com.google.common.base.Function; @@ -73,8 +78,9 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr Map allocatedContainersMap = new HashMap(); private HelixManager _helixManager; private ResourceConfig _resourceConfig; - public YarnProvisioner(){ - + + public YarnProvisioner() { + } @Override @@ -109,7 +115,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr } @Override - public ListenableFuture startContainer(final ContainerId containerId, Participant participant) { + public ListenableFuture startContainer(final ContainerId containerId, + Participant participant) { Container container = allocatedContainersMap.get(containerId); ContainerLaunchContext launchContext; try { @@ -128,11 +135,12 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr }, service); } - private ContainerLaunchContext createLaunchContext(ContainerId containerId, Container container, Participant participant) throws Exception { + private ContainerLaunchContext createLaunchContext(ContainerId containerId, Container container, + Participant participant) throws Exception { ContainerLaunchContext participantContainer = Records.newRecord(ContainerLaunchContext.class); -// Map envs = System.getenv(); + // Map envs = System.getenv(); String appName = applicationMasterConfig.getAppName(); int appId = applicationMasterConfig.getAppId(); String serviceName = _resourceConfig.getId().stringify(); @@ -166,7 +174,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr // resource the client intended to use with the application servicePackageResource.setTimestamp(destStatus.getModificationTime()); servicePackageResource.setSize(destStatus.getLen()); - LOG.info("Setting local resource:" + servicePackageResource + " for service" + serviceName ); + LOG.info("Setting local resource:" + servicePackageResource + " for service" + serviceName); localResources.put(serviceName, servicePackageResource); // Set local resource info into app master container launch context @@ -195,7 +203,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr classPathEnv.append(c.trim()); } classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties"); - LOG.info("Setting classpath for service:\n"+ classPathEnv.toString()); + LOG.info("Setting classpath for service:\n" + classPathEnv.toString()); env.put("CLASSPATH", classPathEnv.toString()); participantContainer.setEnvironment(env); @@ -214,8 +222,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr vargs.add("--zkAddress " + zkAddress); vargs.add("--cluster " + appName); vargs.add("--participantId " + participant.getId().stringify()); - vargs.add("--participantClass " + mainClass);; - + vargs.add("--participantClass " + mainClass); + ; vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr"); @@ -226,7 +234,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr command.append(str).append(" "); } - LOG.info("Completed setting up container launch command " + command.toString() + " with arguments \n" + vargs); + LOG.info("Completed setting up container launch command " + command.toString() + + " with arguments \n" + vargs); List commands = new ArrayList(); commands.add(command.toString()); participantContainer.setCommands(commands); @@ -260,13 +269,13 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr List containersToStart = Lists.newArrayList(); List containersToRelease = Lists.newArrayList(); List containersToStop = Lists.newArrayList(); - YarnProvisionerConfig provisionerConfig = (YarnProvisionerConfig) cluster.getConfig().getResourceMap().get(resourceId).getProvisionerConfig(); + YarnProvisionerConfig provisionerConfig = + (YarnProvisionerConfig) cluster.getConfig().getResourceMap().get(resourceId) + .getProvisionerConfig(); int targetNumContainers = provisionerConfig.getNumContainers(); - for (int i = 0; i < targetNumContainers - participants.size(); i++) { - containersToAcquire.add(new ContainerSpec(ContainerId.from("container" - + (targetNumContainers - i)))); - } - response.setContainersToAcquire(containersToAcquire); + + Set existingContainersIdSet = new HashSet(); + for (Participant participant : participants) { ContainerConfig containerConfig = participant.getContainerConfig(); @@ -278,17 +287,20 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr containersToStart.add(participant); break; case ACTIVE: - + existingContainersIdSet.add(containerConfig.getId()); break; case HALTED: // halted containers can be released - // containersToRelease.add(participant); + containersToRelease.add(participant); break; case ACQUIRING: + existingContainersIdSet.add(containerConfig.getId()); break; case CONNECTING: break; case FAILED: + //remove the failed instance + _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(), new InstanceConfig(participant.getId())); break; case FINALIZED: break; @@ -306,6 +318,19 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr } } } + + for (int i = 0; i < targetNumContainers; i++) { + ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i)); + if(!existingContainersIdSet.contains(containerId)){ + ContainerSpec containerSpec = new ContainerSpec(containerId); + ParticipantId participantId = ParticipantId.from(containerId.stringify()); + ParticipantConfig participantConfig = applicationSpec.getParticipantConfig(resourceId.stringify(), participantId); + containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024)); + containersToAcquire.add(containerSpec); + } + } + + response.setContainersToAcquire(containersToAcquire); response.setContainersToStart(containersToStart); response.setContainersToRelease(containersToRelease); response.setContainersToStop(containersToStop); @@ -326,7 +351,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr // Set up resource type requirements // For now, only memory is supported so we set memory requirements Resource capability = Records.newRecord(Resource.class); - int memory = 1024; + int memory = spec.getMemory(); capability.setMemory(memory); ContainerRequest request = new ContainerRequest(capability, null, null, pri);