Updated Branches:
refs/heads/master 10b41d864 -> 45df543e9
more changes to orchestrator cpi creation, https://issues.apache.org/jira/browse/AIRAVATA-1011
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f2eaba05
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f2eaba05
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f2eaba05
Branch: refs/heads/master
Commit: f2eaba0510464a4d9b412123ea7643c43e648e1c
Parents: 0f06c3c
Author: lahiru <lahiru@apache.org>
Authored: Tue Feb 4 15:36:12 2014 -0500
Committer: lahiru <lahiru@apache.org>
Committed: Tue Feb 4 15:36:12 2014 -0500
----------------------------------------------------------------------
.../core/PullBasedOrchestrator.java | 161 -------------------
.../core/SimpleOrchestratorImpl.java | 59 -------
.../core/SimpleOrchestratorTest.java | 3 +-
.../core/TestWithStoredDescriptors.java | 3 +-
4 files changed, 4 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/f2eaba05/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
deleted file mode 100644
index e81c3b7..0000000
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.orchestrator.core;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.airavata.common.utils.AiravataJobState;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
-import org.apache.airavata.registry.api.JobRequest;
-import org.apache.airavata.registry.api.exception.RegistryException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PullBasedOrchestrator extends AbstractOrchestrator {
- private final static Logger logger = LoggerFactory.getLogger(PullBasedOrchestrator.class);
- private ExecutorService executor;
-
-
- // this is going to be null unless the thread count is 0
- private JobSubmitter jobSubmitter = null;
-
- public boolean initialize() throws OrchestratorException {
- super.initialize();
- // we have a thread to run normal new jobs except to monitor hanged jobs
- if (orchestratorConfiguration.getThreadPoolSize() != 0) {
- executor = Executors.newFixedThreadPool(orchestratorConfiguration.getThreadPoolSize()
+ 1);
- this.startJobSubmitter();
- } else {
-
- try {
- String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getSubmitterClass();
- Class<? extends JobSubmitter> aClass = Class.forName(submitterClass.trim()).asSubclass(JobSubmitter.class);
- jobSubmitter = aClass.newInstance();
- jobSubmitter.initialize(this.orchestratorContext);
- } catch (Exception e) {
- String error = "Error creating JobSubmitter in non threaded mode ";
- logger.error(error);
- throw new OrchestratorException(error, e);
- }
- }
- return true;
- }
-
-
- public void shutdown() throws OrchestratorException {
- executor.shutdown();
-
- }
-
- public boolean launchExperiment(JobRequest request) throws OrchestratorException {
- // validate the jobRequest first
- if (!OrchestratorUtils.validateJobRequest(request)) {
- logger.error("Invalid Job request sent, Experiment creation failed");
- return false;
- }
- String experimentID = OrchestratorUtils.getUniqueID(request);
- // we give higher priority to userExperimentID
- if(experimentID == null) {
- logger.error("Invalid Experiment ID given: " + request.getUserName());
- return false;
- }
- //todo use a more concrete user type in to this
- //FIXME: (MEP) Why don't we pass the JobRequest to the registry and let it do all of
this? Or just store the JobRequest as an object directly in the registry?
- try {
- if (request.getHostDescription() != null) {
- if (!airavataRegistry.isHostDescriptorExists(request.getHostDescription().getType().getHostName()))
{
- airavataRegistry.addHostDescriptor(request.getHostDescription());
- }
- }
- if (request.getServiceDescription() != null) {
- if (!airavataRegistry.isServiceDescriptorExists(request.getServiceDescription().getType().getName()))
{
- airavataRegistry.addServiceDescriptor(request.getServiceDescription());
- }
- }
- if (request.getApplicationDescription() != null) {
- if (request.getServiceDescription() != null && request.getHostDescription()
!= null) {
- if(!airavataRegistry.isApplicationDescriptorExists(request.getServiceDescription().getType().getName(),
- request.getHostDescription().getType().getHostName(),request.getApplicationDescription().getType().getApplicationName().getStringValue())){
- airavataRegistry.addApplicationDescriptor(request.getServiceDescription(),
- request.getHostDescription(), request.getApplicationDescription());
- }
- } else {
- String error = "Providing just Application Descriptor is not sufficient
to save to Registry";
- logger.error(error);
- throw new OrchestratorException(error);
- }
- }
- airavataRegistry.changeStatus(experimentID, AiravataJobState.State.ACCEPTED);
- //FIXME: (MEP) Instead of embedding this special case, why not make a separate SimpleOrchestratorImpl
class that just does this?
- if (orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize() ==
0) {
- jobSubmitter.directJobSubmit(request);
- }
-
- //todo save jobRequest data in to the database
- } catch (RegistryException e) {
- //todo put more meaningful error message
- logger.error("Failed to create experiment for the request from " + request.getUserName());
- return false;
- }
- return true;
- }
-
- public void startJobSubmitter() throws OrchestratorException {
- //FIXME: (MEP) Why create a new thread for jobSubmittedWorker but use the pool for HangedJobWorker?
- //FIXME: (MEP) As discussed on the dev list, we need to improve this
- NewJobWorker jobSubmitterWorker = new NewJobWorker(orchestratorContext);
- executor.execute(jobSubmitterWorker);
-
- for (int i = 0; i < orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize()
- 1; i++) {
- HangedJobWorker hangedJobWorker = new HangedJobWorker(orchestratorContext);
- executor.execute(hangedJobWorker);
- }
- }
-
- public boolean cancelExperiment(String experimentID) throws OrchestratorException {
- try {
- AiravataJobState state = orchestratorContext.getRegistry().getState(experimentID);
- if (state.getJobState().equals(AiravataJobState.State.RUNNING) || state.getJobState().equals(AiravataJobState.State.PENDING)
||
- state.getJobState().equals(AiravataJobState.State.ACTIVE) || state.getJobState().equals(AiravataJobState.State.SUBMITTED))
{
-
- //todo perform cancelling and last peform the database update
-
- orchestratorContext.getRegistry().changeStatus(experimentID, AiravataJobState.State.CANCELLED);
- } else if (state.getJobState().equals(AiravataJobState.State.DONE)) {
- String error = "Job is already Finished so cannot cancel the job " + experimentID;
- logger.error(error);
- new OrchestratorException(error);
- } else {
- // do nothing but simply change the job state to cancelled because job is
not yet submitted to the resource
- orchestratorContext.getRegistry().changeStatus(experimentID, AiravataJobState.State.CANCELLED);
- }
-
- } catch (RegistryException e) {
- String error = "Error reading the job state for the given Experiment ID: " +
experimentID;
- logger.error(error);
- throw new OrchestratorException(error, e);
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/f2eaba05/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/SimpleOrchestratorImpl.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/SimpleOrchestratorImpl.java
deleted file mode 100644
index bfd2097..0000000
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/SimpleOrchestratorImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.orchestrator.core;
-
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.airavata.registry.api.JobRequest;
-
-public class SimpleOrchestratorImpl extends AbstractOrchestrator{
-
- @Override
- public boolean initialize() throws OrchestratorException {
- super.initialize();
- return true;
- }
-
- @Override
- public boolean launchExperiment(JobRequest request) throws OrchestratorException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean cancelExperiment(String experimentID) throws OrchestratorException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public void startJobSubmitter() throws OrchestratorException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void shutdown() throws OrchestratorException {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/f2eaba05/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/SimpleOrchestratorTest.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/SimpleOrchestratorTest.java
b/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/SimpleOrchestratorTest.java
index 4d31f1c..1a8b4c9 100644
--- a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/SimpleOrchestratorTest.java
+++ b/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/SimpleOrchestratorTest.java
@@ -33,6 +33,7 @@ import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.commons.gfac.type.ServiceDescription;
import org.apache.airavata.orchestrator.core.model.ExperimentRequest;
+import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
import org.apache.airavata.registry.api.JobRequest;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.DataType;
@@ -54,7 +55,7 @@ public class SimpleOrchestratorTest extends AbstractOrchestratorTest {
public void setUp() throws Exception {
AiravataUtils.setExecutionAsServer();
super.setUp();
- orchestrator = new PullBasedOrchestrator();
+ orchestrator = new SimpleOrchestratorImpl();
orchestrator.initialize();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/f2eaba05/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/TestWithStoredDescriptors.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/TestWithStoredDescriptors.java
b/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/TestWithStoredDescriptors.java
index 999a3f4..a75d3e2 100644
--- a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/TestWithStoredDescriptors.java
+++ b/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/TestWithStoredDescriptors.java
@@ -33,6 +33,7 @@ import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.commons.gfac.type.ServiceDescription;
import org.apache.airavata.orchestrator.core.model.ExperimentRequest;
+import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
import org.apache.airavata.registry.api.JobRequest;
import org.apache.airavata.schemas.gfac.*;
import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class TestWithStoredDescriptors extends AbstractOrchestratorTest
{
public void setUp() throws Exception {
AiravataUtils.setExecutionAsServer();
super.setUp();
- orchestrator = new PullBasedOrchestrator();
+ orchestrator = new SimpleOrchestratorImpl();
orchestrator.initialize();
createJobRequestWithDocuments(getAiravataAPI());
}
|