Updated Branches: refs/heads/master 5cf0a0f55 -> 4cfc32e39 implementing orchestrator cpi Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4cfc32e3 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4cfc32e3 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4cfc32e3 Branch: refs/heads/master Commit: 4cfc32e39b8954390acadcad5003309e8b4bbb0b Parents: 5cf0a0f Author: lginnali Authored: Tue Feb 4 10:57:35 2014 -0500 Committer: lginnali Committed: Tue Feb 4 10:57:35 2014 -0500 ---------------------------------------------------------------------- .../airavata/orchestrator/cpi/Orchestrator.java | 95 +++++++++++ .../cpi/impl/SimpleOrchestratorImpl.java | 163 +++++++++++++++++++ 2 files changed, 258 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/4cfc32e3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java new file mode 100644 index 0000000..83178a1 --- /dev/null +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.orchestrator.cpi; + +import org.apache.airavata.orchestrator.core.exception.OrchestratorException; +import org.apache.airavata.orchestrator.core.model.ExperimentRequest; +import org.apache.airavata.registry.api.JobRequest; + +/* + This is the interface for orchestrator functionality exposed to the out side of the + module +*/ +public interface Orchestrator { + + + /** + * This method will initialize the Orchestrator, during restart this will + * get called and do init tasks + * @return + * @throws OrchestratorException + */ + boolean initialize() throws OrchestratorException; + + + /** + * This method is the very first method which create an entry in + * database for a given experiment, this return the experiment ID, so + * user have full control for the experiment + * @param request + * @return + * @throws OrchestratorException + */ + String createExperiment(ExperimentRequest request) throws OrchestratorException; + + /** + * After creating the experiment user has the experimentID, then user + * can create the JobRequest and send the Job input parameters to Orchestrator + * @param request + * @return + * @throws OrchestratorException + */ + boolean launchExperiment(JobRequest request) throws OrchestratorException; + + /** + * This method can be used to cancel a running experiment, if job is already finished it + * throws an exception. If job is not yet submitted it will just change the status to cancelled, + * if Job is running it will be killed from the resource and make the status to cancelled + * @param experimentID + * @return + * @throws OrchestratorException + */ + boolean terminateExperiment(String experimentID)throws OrchestratorException; + + /** + * This is like a cron job which runs continuously and take available jobs to + * submit to GFAC and submit them to GFAC + * @throws OrchestratorException + */ + void startJobSubmitter() throws OrchestratorException; + + /** + * This method will get called during graceful shutdown of Orchestrator + * This can be used to handle the shutdown of orchestrator gracefully. + * @return + * @throws OrchestratorException + */ + void shutdown() throws OrchestratorException; + + /** + * This method can be used to parse the current job data configured in + * Registry and validate its status, if it has minimum required parameters to + * submit the job this method returns true otherwise this returns false + * @param jobRequest + * @return + */ + boolean validateExperiment(JobRequest jobRequest); +} http://git-wip-us.apache.org/repos/asf/airavata/blob/4cfc32e3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java new file mode 100644 index 0000000..1616690 --- /dev/null +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.orchestrator.cpi.impl; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.airavata.common.utils.AiravataJobState; +import org.apache.airavata.orchestrator.core.AbstractOrchestrator; +import org.apache.airavata.orchestrator.core.HangedJobWorker; +import org.apache.airavata.orchestrator.core.NewJobWorker; +import org.apache.airavata.orchestrator.core.exception.OrchestratorException; +import org.apache.airavata.orchestrator.core.job.JobSubmitter; +import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils; +import org.apache.airavata.registry.api.JobRequest; +import org.apache.airavata.registry.api.exception.RegistryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleOrchestratorImpl extends AbstractOrchestrator { + private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class); + private ExecutorService executor; + + + // this is going to be null unless the thread count is 0 + private JobSubmitter jobSubmitter = null; + + public boolean initialize() throws OrchestratorException { + super.initialize(); + // we have a thread to run normal new jobs except to monitor hanged jobs + if (orchestratorConfiguration.getThreadPoolSize() != 0) { + executor = Executors.newFixedThreadPool(orchestratorConfiguration.getThreadPoolSize() + 1); + this.startJobSubmitter(); + } else { + + try { + String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getNewJobSubmitterClass(); + Class aClass = Class.forName(submitterClass.trim()).asSubclass(JobSubmitter.class); + jobSubmitter = aClass.newInstance(); + jobSubmitter.initialize(this.orchestratorContext); + } catch (Exception e) { + String error = "Error creating JobSubmitter in non threaded mode "; + logger.error(error); + throw new OrchestratorException(error, e); + } + } + return true; + } + + + public void shutdown() throws OrchestratorException { + executor.shutdown(); + + } + + public boolean launchExperiment(JobRequest request) throws OrchestratorException { + // validate the jobRequest first + if (!OrchestratorUtils.validateJobRequest(request)) { + logger.error("Invalid Job request sent, Experiment creation failed"); + return false; + } + String experimentID = OrchestratorUtils.getUniqueID(request); + // we give higher priority to userExperimentID + if (experimentID == null) { + logger.error("Invalid Experiment ID given: " + request.getUserName()); + return false; + } + //todo use a more concrete user type in to this + //FIXME: (MEP) Why don't we pass the JobRequest to the registry and let it do all of this? Or just store the JobRequest as an object directly in the registry? + try { + if (request.getHostDescription() != null) { + if (!airavataRegistry.isHostDescriptorExists(request.getHostDescription().getType().getHostName())) { + airavataRegistry.addHostDescriptor(request.getHostDescription()); + } + } + if (request.getServiceDescription() != null) { + if (!airavataRegistry.isServiceDescriptorExists(request.getServiceDescription().getType().getName())) { + airavataRegistry.addServiceDescriptor(request.getServiceDescription()); + } + } + if (request.getApplicationDescription() != null) { + if (request.getServiceDescription() != null && request.getHostDescription() != null) { + if (!airavataRegistry.isApplicationDescriptorExists(request.getServiceDescription().getType().getName(), + request.getHostDescription().getType().getHostName(), request.getApplicationDescription().getType().getApplicationName().getStringValue())) { + airavataRegistry.addApplicationDescriptor(request.getServiceDescription(), + request.getHostDescription(), request.getApplicationDescription()); + } + } else { + String error = "Providing just Application Descriptor is not sufficient to save to Registry"; + logger.error(error); + throw new OrchestratorException(error); + } + } + airavataRegistry.changeStatus(experimentID, AiravataJobState.State.ACCEPTED); + if (orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize() == 0) { + jobSubmitter.directJobSubmit(request); + } + + //todo save jobRequest data in to the database + } catch (RegistryException e) { + //todo put more meaningful error message + logger.error("Failed to create experiment for the request from " + request.getUserName()); + return false; + } + return true; + } + + public void startJobSubmitter() throws OrchestratorException { + //FIXME: (MEP) Why create a new thread for jobSubmittedWorker but use the pool for HangedJobWorker? + //FIXME: (MEP) As discussed on the dev list, we need to improve this + NewJobWorker jobSubmitterWorker = new NewJobWorker(orchestratorContext); + executor.execute(jobSubmitterWorker); + + for (int i = 0; i < orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize() - 1; i++) { + HangedJobWorker hangedJobWorker = new HangedJobWorker(orchestratorContext); + executor.execute(hangedJobWorker); + } + } + + public boolean terminateExperiment(String experimentID) throws OrchestratorException { + try { + AiravataJobState state = orchestratorContext.getRegistry().getState(experimentID); + if (state.getJobState().equals(AiravataJobState.State.RUNNING) || state.getJobState().equals(AiravataJobState.State.PENDING) || + state.getJobState().equals(AiravataJobState.State.ACTIVE) || state.getJobState().equals(AiravataJobState.State.SUBMITTED)) { + + //todo perform cancelling and last peform the database update + + orchestratorContext.getRegistry().changeStatus(experimentID, AiravataJobState.State.CANCELLED); + } else if (state.getJobState().equals(AiravataJobState.State.DONE)) { + String error = "Job is already Finished so cannot cancel the job " + experimentID; + logger.error(error); + new OrchestratorException(error); + } else { + // do nothing but simply change the job state to cancelled because job is not yet submitted to the resource + orchestratorContext.getRegistry().changeStatus(experimentID, AiravataJobState.State.CANCELLED); + } + + } catch (RegistryException e) { + String error = "Error reading the job state for the given Experiment ID: " + experimentID; + logger.error(error); + throw new OrchestratorException(error, e); + } + return true; + } +}