sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [2/2] git commit: SQOOP-1510: Sqoop2: Refactor JobRequestHandler for submit/abort job and SubmissionHandler for get operation only
Date Tue, 04 Nov 2014 05:41:48 GMT
SQOOP-1510: Sqoop2: Refactor JobRequestHandler for submit/abort job and SubmissionHandler for get operation only

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/268a4755
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/268a4755
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/268a4755

Branch: refs/heads/sqoop2
Commit: 268a47552f5758c2d9caa5e859bd3a631baaedc9
Parents: d6319e3
Author: Abraham Elmahrek <abraham@elmahrek.com>
Authored: Mon Nov 3 21:23:18 2014 -0800
Committer: Abraham Elmahrek <abraham@elmahrek.com>
Committed: Mon Nov 3 21:24:19 2014 -0800

----------------------------------------------------------------------
 .../org/apache/sqoop/client/SqoopClient.java    |  42 +-
 .../apache/sqoop/client/SubmissionCallback.java |   3 +-
 .../client/request/JobResourceRequest.java      |  59 ++-
 .../client/request/SqoopResourceRequests.java   |  21 +-
 .../request/SubmissionResourceRequest.java      |  48 +--
 .../java/org/apache/sqoop/json/JobBean.java     |  44 +-
 .../java/org/apache/sqoop/json/JobsBean.java    |  59 +++
 .../java/org/apache/sqoop/json/JsonBean.java    |   1 +
 .../org/apache/sqoop/json/SubmissionBean.java   | 120 +++---
 .../org/apache/sqoop/json/SubmissionsBean.java  |  59 +++
 .../org/apache/sqoop/model/MSubmission.java     |   3 -
 .../java/org/apache/sqoop/json/TestJobBean.java |   6 +-
 .../apache/sqoop/json/TestSubmissionBean.java   |   8 +-
 .../org/apache/sqoop/driver/DriverError.java    |   3 +
 .../org/apache/sqoop/driver/JobManager.java     |  28 +-
 .../org/apache/sqoop/driver/JobRequest.java     |   2 +-
 .../apache/sqoop/repository/JdbcRepository.java |  14 +
 .../sqoop/repository/JdbcRepositoryHandler.java |   9 +
 .../org/apache/sqoop/repository/Repository.java |   8 +
 .../derby/DerbyRepositoryHandler.java           |  27 ++
 ...erbySchemaInsertUpdateDeleteSelectQuery.java |   4 +
 .../apache/sqoop/handler/JobRequestHandler.java | 408 ++++++++++++-------
 .../sqoop/handler/SubmissionRequestHandler.java | 151 ++-----
 .../org/apache/sqoop/server/v1/JobServlet.java  |  46 +++
 .../org/apache/sqoop/server/v1/JobsServlet.java |  48 +++
 .../sqoop/server/v1/SubmissionServlet.java      |  52 ---
 .../sqoop/server/v1/SubmissionsServlet.java     |  49 +++
 server/src/main/webapp/WEB-INF/web.xml          |  22 +-
 .../sqoop/shell/ShowJobStatusFunction.java      |  59 +++
 .../apache/sqoop/shell/StartJobFunction.java    |   4 +-
 .../org/apache/sqoop/shell/StatusCommand.java   |   2 +-
 .../apache/sqoop/shell/StatusJobFunction.java   |  59 ---
 .../org/apache/sqoop/shell/StopJobFunction.java |   2 +-
 .../apache/sqoop/shell/UpdateJobFunction.java   |   2 +-
 .../apache/sqoop/shell/UpdateLinkFunction.java  |   2 +-
 .../sqoop/test/testcases/ConnectorTestCase.java |   2 +-
 .../jdbc/generic/FromRDBMSToHDFSTest.java       |   4 +-
 .../sqoop/tools/tool/RepositoryDumpTool.java    |   4 +-
 38 files changed, 905 insertions(+), 579 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
index 33a0c3c..e139132 100644
--- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
+++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
@@ -81,6 +81,8 @@ public class SqoopClient {
   /**
    * Status flags used when updating the submission callback status
    */
+  //TODO(https://issues.apache.org/jira/browse/SQOOP-1652): Why do wee need a duplicate status enum in client when shell is using the server status?
+  // NOTE: the getStatus method is on the job resource and this needs to be revisited
   private enum SubmissionStatus {
     SUBMITTED,
     UPDATED,
@@ -438,8 +440,8 @@ public class SqoopClient {
    * @param jobId Job id
    * @return
    */
-  public MSubmission startSubmission(long jobId) {
-    return resourceRequests.createSubmission(jobId).getSubmissions().get(0);
+  public MSubmission startJob(long jobId) {
+    return resourceRequests.startJob(jobId).getSubmissions().get(0);
   }
 
   /**
@@ -452,24 +454,27 @@ public class SqoopClient {
    * @return MSubmission - Final status of job submission
    * @throws InterruptedException
    */
-  public MSubmission startSubmission(long jobId, SubmissionCallback callback, long pollTime)
+  public MSubmission startJob(long jobId, SubmissionCallback callback, long pollTime)
       throws InterruptedException {
     if(pollTime <= 0) {
       throw new SqoopException(ClientError.CLIENT_0002);
     }
+    //TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start/first terminology difference
+    // What does first even mean in s distributed client/server model?
     boolean first = true;
-    MSubmission submission = resourceRequests.createSubmission(jobId).getSubmissions().get(0);
+    MSubmission submission = resourceRequests.startJob(jobId).getSubmissions().get(0);
+    // what happens when the server fails, do we just say finished?
     while(submission.getStatus().isRunning()) {
       if(first) {
-        submissionCallback(callback, submission, SubmissionStatus.SUBMITTED);
+        invokeSubmissionCallback(callback, submission, SubmissionStatus.SUBMITTED);
         first = false;
       } else {
-        submissionCallback(callback, submission, SubmissionStatus.UPDATED);
+        invokeSubmissionCallback(callback, submission, SubmissionStatus.UPDATED);
       }
       Thread.sleep(pollTime);
-      submission = getSubmissionStatus(jobId);
+      submission = getJobStatus(jobId);
     }
-    submissionCallback(callback, submission, SubmissionStatus.FINISHED);
+    invokeSubmissionCallback(callback, submission, SubmissionStatus.FINISHED);
     return submission;
   }
 
@@ -480,7 +485,7 @@ public class SqoopClient {
    * @param submission
    * @param status
    */
-  private void submissionCallback(SubmissionCallback callback, MSubmission submission,
+  private void invokeSubmissionCallback(SubmissionCallback callback, MSubmission submission,
       SubmissionStatus status) {
     if (callback == null) {
       return;
@@ -494,17 +499,19 @@ public class SqoopClient {
       break;
     case FINISHED:
       callback.finished(submission);
+    default:
+      break;
     }
   }
 
   /**
-   * Stop job with given id.
+   * stop job with given id.
    *
    * @param jid Job id
    * @return
    */
-  public MSubmission stopSubmission(long jid) {
-    return resourceRequests.deleteSubmission(jid).getSubmissions().get(0);
+  public MSubmission stopJob(long jid) {
+    return resourceRequests.stopJob(jid).getSubmissions().get(0);
   }
 
   /**
@@ -513,8 +520,8 @@ public class SqoopClient {
    * @param jid Job id
    * @return
    */
-  public MSubmission getSubmissionStatus(long jid) {
-    return resourceRequests.readSubmission(jid).getSubmissions().get(0);
+  public MSubmission getJobStatus(long jid) {
+    return resourceRequests.getJobStatus(jid).getSubmissions().get(0);
   }
 
   /**
@@ -523,7 +530,7 @@ public class SqoopClient {
    * @return
    */
   public List<MSubmission> getSubmissions() {
-    return resourceRequests.readHistory(null).getSubmissions();
+    return resourceRequests.readSubmission(null).getSubmissions();
   }
 
   /**
@@ -533,7 +540,7 @@ public class SqoopClient {
    * @return
    */
   public List<MSubmission> getSubmissionsForJob(long jobId) {
-    return resourceRequests.readHistory(jobId).getSubmissions();
+    return resourceRequests.readSubmission(jobId).getSubmissions();
   }
 
   private Status applyLinkValidations(ValidationResultBean bean, MLink link) {
@@ -541,12 +548,13 @@ public class SqoopClient {
     // Apply validation results
     ConfigUtils.applyValidation(link.getConnectorLinkConfig().getConfigs(), linkConfig);
     Long id = bean.getId();
-    if(id != null) {
+    if (id != null) {
       link.setPersistenceId(id);
     }
     return Status.getWorstStatus(linkConfig.getStatus());
   }
 
+
   private Status applyJobValidations(ValidationResultBean bean, MJob job) {
     ConfigValidationResult fromConfig = bean.getValidationResults()[0];
     ConfigValidationResult toConfig = bean.getValidationResults()[1];

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java b/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
index de7211a..e2e4860 100644
--- a/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
+++ b/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
@@ -20,8 +20,9 @@ package org.apache.sqoop.client;
 import org.apache.sqoop.model.MSubmission;
 
 /**
- * Callback interface for Synchronous job submission
+ * Callback interface for synchronous job submission
  */
+//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start consistent usage
 public interface SubmissionCallback {
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java b/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java
index 83c08b3..55c8db2 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java
@@ -18,14 +18,16 @@
 package org.apache.sqoop.client.request;
 
 import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.JobsBean;
+import org.apache.sqoop.json.SubmissionBean;
 import org.apache.sqoop.json.ValidationResultBean;
 import org.apache.sqoop.model.MJob;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
 /**
- * Provide CRUD semantics over RESTfull HTTP API for jobs. All operations
- * are normally supported.
+ * Provide CRUD semantics over RESTfull HTTP API for jobs. All operations are
+ * normally supported.
  */
 public class JobResourceRequest extends ResourceRequest {
 
@@ -33,18 +35,25 @@ public class JobResourceRequest extends ResourceRequest {
 
   private static final String ENABLE = "/enable";
   private static final String DISABLE = "/disable";
+  private static final String START = "/start";
+  private static final String STOP = "/stop";
+  private static final String STATUS = "/status";
 
-  public JobBean read(String serverUrl, Long linkId) {
+  public JobBean read(String serverUrl, Long jobId) {
     String response;
-    if (linkId == null) {
+    if (jobId == null) {
       response = super.get(serverUrl + RESOURCE + "all");
     } else {
-      response = super.get(serverUrl + RESOURCE + linkId);
+      response = super.get(serverUrl + RESOURCE + jobId);
     }
     JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
-    JobBean jobBean = new JobBean();
-    jobBean.restore(jsonObject);
-    return jobBean;
+    // defaults to all
+    JobBean bean = new JobsBean();
+    if (jobId != null) {
+      bean = new JobBean();
+    }
+    bean.restore(jsonObject);
+    return bean;
   }
 
   public ValidationResultBean create(String serverUrl, MJob job) {
@@ -61,21 +70,43 @@ public class JobResourceRequest extends ResourceRequest {
     JobBean jobBean = new JobBean(job);
     // Extract all config inputs including sensitive inputs
     JSONObject jobJson = jobBean.extract(false);
-    String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(), jobJson.toJSONString());
+    String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(),
+        jobJson.toJSONString());
     ValidationResultBean validationBean = new ValidationResultBean();
     validationBean.restore((JSONObject) JSONValue.parse(response));
     return validationBean;
   }
 
-  public void delete(String serverUrl, Long id) {
-     super.delete(serverUrl + RESOURCE + id);
+  public void delete(String serverUrl, Long jobId) {
+    super.delete(serverUrl + RESOURCE + jobId);
   }
 
-  public void enable(String serverUrl, Long id, Boolean enabled) {
+  public void enable(String serverUrl, Long jobId, Boolean enabled) {
     if (enabled) {
-      super.put(serverUrl + RESOURCE + id + ENABLE, null);
+      super.put(serverUrl + RESOURCE + jobId + ENABLE, null);
     } else {
-      super.put(serverUrl + RESOURCE + id + DISABLE, null);
+      super.put(serverUrl + RESOURCE + jobId + DISABLE, null);
     }
   }
+
+  public SubmissionBean start(String serverUrl, Long jobId) {
+    String response = super.put(serverUrl + RESOURCE + jobId + START, null);
+    return createJobSubmissionResponse(response);
+  }
+
+  public SubmissionBean stop(String serverUrl, Long jobId) {
+    String response = super.put(serverUrl + RESOURCE + jobId + STOP, null);
+    return createJobSubmissionResponse(response);
+  }
+
+  public SubmissionBean status(String serverUrl, Long jobId) {
+    String response = super.get(serverUrl + RESOURCE + jobId + STATUS);
+    return createJobSubmissionResponse(response);
+  }
+
+  private SubmissionBean createJobSubmissionResponse(String response) {
+    SubmissionBean submissionBean = new SubmissionBean();
+    submissionBean.restore((JSONObject) JSONValue.parse(response));
+    return submissionBean;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java b/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java
index 4a56bb7..fe528f2 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java
@@ -17,14 +17,15 @@
  */
 package org.apache.sqoop.client.request;
 
-import org.apache.sqoop.json.LinkBean;
 import org.apache.sqoop.json.ConnectorBean;
 import org.apache.sqoop.json.DriverBean;
 import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.LinkBean;
 import org.apache.sqoop.json.SubmissionBean;
+import org.apache.sqoop.json.SubmissionsBean;
 import org.apache.sqoop.json.ValidationResultBean;
-import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
 
 /**
  * Unified class for all request objects.
@@ -131,19 +132,19 @@ public class SqoopResourceRequests {
     getJobResourceRequest().delete(serverUrl, jid);
   }
 
-  public SubmissionBean readHistory(Long jid) {
-    return getSubmissionResourceRequest().readHistory(serverUrl, jid);
+  public SubmissionBean getJobStatus(Long jid) {
+    return getJobResourceRequest().status(serverUrl, jid);
   }
 
-  public SubmissionBean readSubmission(Long jid) {
-    return getSubmissionResourceRequest().read(serverUrl, jid);
+  public SubmissionBean startJob(Long jid) {
+    return getJobResourceRequest().start(serverUrl, jid);
   }
 
-  public SubmissionBean createSubmission(Long jid) {
-    return getSubmissionResourceRequest().create(serverUrl, jid);
+  public SubmissionBean stopJob(Long jid) {
+    return getJobResourceRequest().stop(serverUrl, jid);
   }
 
-  public SubmissionBean deleteSubmission(Long jid) {
-    return getSubmissionResourceRequest().delete(serverUrl, jid);
+  public SubmissionsBean readSubmission(Long jid) {
+    return getSubmissionResourceRequest().read(serverUrl, jid);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java b/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java
index 5055783..e3b70bc 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.sqoop.client.request;
 
-import org.apache.sqoop.json.SubmissionBean;
+import org.apache.sqoop.json.SubmissionsBean;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
@@ -27,54 +27,18 @@ import org.json.simple.JSONValue;
  */
 public class SubmissionResourceRequest extends  ResourceRequest {
 
-  public static final String RESOURCE = "v1/submission/";
+  public static final String RESOURCE = "v1/submissions/";
 
-  public static final String ACTION = RESOURCE + "action/";
-
-  public static final String HISTORY = RESOURCE + "history/";
-
-  public SubmissionBean readHistory(String serverUrl, Long jid) {
+  public SubmissionsBean read(String serverUrl, Long jid) {
     String response;
     if (jid == null) {
-      response = super.get(serverUrl + HISTORY + "all");
+      response = super.get(serverUrl + RESOURCE);
     } else {
-      response = super.get(serverUrl + HISTORY + jid);
+      response = super.get(serverUrl + RESOURCE + jid);
     }
-
-    JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
-
-    SubmissionBean submissionBean = new SubmissionBean();
-    submissionBean.restore(jsonObject);
-
-    return submissionBean;
-  }
-
-  public SubmissionBean read(String serverUrl, Long jid) {
-    String response = super.get(serverUrl + ACTION + jid);
-
     JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
-
-    SubmissionBean submissionBean = new SubmissionBean();
+    SubmissionsBean submissionBean = new SubmissionsBean();
     submissionBean.restore(jsonObject);
-
-    return submissionBean;
-  }
-
-  public SubmissionBean create(String serverUrl, Long jid) {
-    String response = super.post(serverUrl + ACTION + jid, null);
-
-    SubmissionBean submissionBean = new SubmissionBean();
-    submissionBean.restore((JSONObject) JSONValue.parse(response));
-
-    return submissionBean;
-  }
-
-  public SubmissionBean delete(String serverUrl, Long id) {
-     String response = super.delete(serverUrl + ACTION + id);
-
-    SubmissionBean submissionBean = new SubmissionBean();
-    submissionBean.restore((JSONObject) JSONValue.parse(response));
-
     return submissionBean;
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/JobBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JobBean.java b/common/src/main/java/org/apache/sqoop/json/JobBean.java
index 082d591..efc2efc 100644
--- a/common/src/main/java/org/apache/sqoop/json/JobBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/JobBean.java
@@ -19,8 +19,6 @@ package org.apache.sqoop.json;
 
 import static org.apache.sqoop.json.util.ConfigInputSerialization.extractConfigList;
 import static org.apache.sqoop.json.util.ConfigInputSerialization.restoreConfigList;
-import static org.apache.sqoop.json.util.ConfigBundleSerialization.extractConfigParamBundle;
-import static org.apache.sqoop.json.util.ConfigBundleSerialization.restoreConfigParamBundle;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -39,7 +37,7 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 /**
- * Json representation of the job config
+ * Json representation of the job
  */
 public class JobBean implements JsonBean {
 
@@ -50,6 +48,7 @@ public class JobBean implements JsonBean {
   static final String FROM_CONFIG = "from-config";
   static final String TO_CONFIG = "to-config";
   static final String DRIVER_CONFIG = "driver-config";
+  private static final String JOB = "job";
 
   // Required
   private List<MJob> jobs;
@@ -102,9 +101,16 @@ public class JobBean implements JsonBean {
   @Override
   @SuppressWarnings("unchecked")
   public JSONObject extract(boolean skipSensitive) {
-    JSONArray array = new JSONArray();
+    JSONArray jobArray = extractJobs(skipSensitive);
+    JSONObject job = new JSONObject();
+    job.put(JOB, jobArray);
+    return job;
+  }
 
-    for(MJob job : jobs) {
+  @SuppressWarnings("unchecked")
+  protected JSONArray extractJobs(boolean skipSensitive) {
+    JSONArray jobArray = new JSONArray();
+    for (MJob job : jobs) {
       JSONObject object = new JSONObject();
 
       object.put(ID, job.getPersistenceId());
@@ -115,6 +121,7 @@ public class JobBean implements JsonBean {
       object.put(UPDATE_USER, job.getLastUpdateUser());
       object.put(UPDATE_DATE, job.getLastUpdateDate().getTime());
       // job link associated connectors
+      // TODO(SQOOP-1634): fix not to require the connectorIds in the post data
       object.put(FROM_CONNECTOR_ID, job.getConnectorId(Direction.FROM));
       object.put(TO_CONNECTOR_ID, job.getConnectorId(Direction.TO));
       // job associated links
@@ -122,25 +129,30 @@ public class JobBean implements JsonBean {
       object.put(TO_LINK_ID, job.getLinkId(Direction.TO));
       // job configs
       MFromConfig fromConfigList = job.getFromJobConfig();
-      object.put(FROM_CONFIG, extractConfigList(fromConfigList.getConfigs(), fromConfigList.getType(), skipSensitive));
+      object.put(FROM_CONFIG,
+          extractConfigList(fromConfigList.getConfigs(), fromConfigList.getType(), skipSensitive));
       MToConfig toConfigList = job.getToJobConfig();
-      object.put(TO_CONFIG, extractConfigList(toConfigList.getConfigs(), toConfigList.getType(), skipSensitive));
+      object.put(TO_CONFIG,
+          extractConfigList(toConfigList.getConfigs(), toConfigList.getType(), skipSensitive));
       MDriverConfig driverConfigList = job.getDriverConfig();
-      object.put(DRIVER_CONFIG, extractConfigList(driverConfigList.getConfigs(), driverConfigList.getType(), skipSensitive));
+      object.put(
+          DRIVER_CONFIG,
+          extractConfigList(driverConfigList.getConfigs(), driverConfigList.getType(),
+              skipSensitive));
 
-    array.add(object);
+      jobArray.add(object);
     }
-
-    JSONObject all = new JSONObject();
-    all.put(ALL, array);
-    return all;
+    return jobArray;
   }
 
   @Override
   public void restore(JSONObject jsonObject) {
-    jobs = new ArrayList<MJob>();
+    JSONArray array = (JSONArray) jsonObject.get(JOB);
+    restoreJobs(array);
+  }
 
-    JSONArray array = (JSONArray) jsonObject.get(ALL);
+  protected void restoreJobs(JSONArray array) {
+    jobs = new ArrayList<MJob>();
 
     for (Object obj : array) {
       JSONObject object = (JSONObject) obj;
@@ -178,4 +190,4 @@ public class JobBean implements JsonBean {
       jobs.add(job);
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/JobsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JobsBean.java b/common/src/main/java/org/apache/sqoop/json/JobsBean.java
new file mode 100644
index 0000000..3c454ea
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/json/JobsBean.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import java.util.List;
+
+import org.apache.sqoop.model.MJob;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+/**
+ * Json representation of the jobs
+ */
+public class JobsBean extends JobBean {
+
+  private static final String JOBS = "jobs";
+
+  public JobsBean(MJob job) {
+    super(job);
+  }
+
+  public JobsBean(List<MJob> jobs) {
+    super(jobs);
+  }
+
+  // For "restore"
+  public JobsBean() {
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public JSONObject extract(boolean skipSensitive) {
+    JSONArray jobArray = super.extractJobs(skipSensitive);
+    JSONObject jobs = new JSONObject();
+    jobs.put(JOBS, jobArray);
+    return jobs;
+  }
+
+  @Override
+  public void restore(JSONObject jsonObject) {
+    JSONArray array = (JSONArray) jsonObject.get(JOBS);
+    restoreJobs(array);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/JsonBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JsonBean.java b/common/src/main/java/org/apache/sqoop/json/JsonBean.java
index ba86511..164b604 100644
--- a/common/src/main/java/org/apache/sqoop/json/JsonBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/JsonBean.java
@@ -25,6 +25,7 @@ public interface JsonBean {
   static final String CONFIGURABLE_VERSION = "version";
   static final String ALL_CONFIGS = "all-configs";
 
+  @Deprecated // should not be used anymore in the rest api
   static final String ALL = "all";
   static final String ID = "id";
   static final String NAME = "name";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
index 4b80338..b7bdaad 100644
--- a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
@@ -17,13 +17,8 @@
  */
 package org.apache.sqoop.json;
 
-import org.apache.sqoop.model.MSubmission;
-import org.apache.sqoop.submission.SubmissionStatus;
-import org.apache.sqoop.submission.counter.Counter;
-import org.apache.sqoop.submission.counter.CounterGroup;
-import org.apache.sqoop.submission.counter.Counters;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
+import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema;
+import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -31,15 +26,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema;
-import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.Counter;
+import org.apache.sqoop.submission.counter.CounterGroup;
+import org.apache.sqoop.submission.counter.Counters;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
 
 /**
  *
  */
 public class SubmissionBean implements JsonBean {
 
-  private static final String ALL = "all";
+  private static final String SUBMISSION = "submission";
   private static final String JOB = "job";
   private static final String CREATION_USER = "creation-user";
   private static final String CREATION_DATE = "creation-date";
@@ -52,8 +52,8 @@ public class SubmissionBean implements JsonBean {
   private static final String EXCEPTION_TRACE = "exception-trace";
   private static final String PROGRESS = "progress";
   private static final String COUNTERS = "counters";
-  private static final String FROM_SCHEMA = "schema-from";
-  private static final String TO_SCHEMA = "schema-to";
+  private static final String FROM_SCHEMA = "from-schema";
+  private static final String TO_SCHEMA = "to-schema";
 
   private List<MSubmission> submissions;
 
@@ -80,79 +80,83 @@ public class SubmissionBean implements JsonBean {
   @Override
   @SuppressWarnings("unchecked")
   public JSONObject extract(boolean skipSensitive) {
-    JSONArray array = new JSONArray();
+    JSONArray submissionArray = extractSubmissions();
+    JSONObject submission = new JSONObject();
+    submission.put(SUBMISSION, submissionArray);
+    return submission;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected JSONArray extractSubmissions() {
+    JSONArray submissionsArray = new JSONArray();
 
-    for(MSubmission submission : this.submissions) {
+    for (MSubmission submission : this.submissions) {
       JSONObject object = new JSONObject();
 
       object.put(JOB, submission.getJobId());
       object.put(STATUS, submission.getStatus().name());
       object.put(PROGRESS, submission.getProgress());
 
-      if(submission.getCreationUser() != null) {
+      if (submission.getCreationUser() != null) {
         object.put(CREATION_USER, submission.getCreationUser());
       }
-      if(submission.getCreationDate() != null) {
+      if (submission.getCreationDate() != null) {
         object.put(CREATION_DATE, submission.getCreationDate().getTime());
       }
-      if(submission.getLastUpdateUser() != null) {
+      if (submission.getLastUpdateUser() != null) {
         object.put(LAST_UPDATE_USER, submission.getLastUpdateUser());
       }
-      if(submission.getLastUpdateDate() != null) {
+      if (submission.getLastUpdateDate() != null) {
         object.put(LAST_UPDATE_DATE, submission.getLastUpdateDate().getTime());
       }
-      if(submission.getExternalId() != null) {
+      if (submission.getExternalId() != null) {
         object.put(EXTERNAL_ID, submission.getExternalId());
       }
-      if(submission.getExternalLink() != null) {
+      if (submission.getExternalLink() != null) {
         object.put(EXTERNAL_LINK, submission.getExternalLink());
       }
-      if(submission.getExceptionInfo() != null) {
+      if (submission.getExceptionInfo() != null) {
         object.put(EXCEPTION, submission.getExceptionInfo());
       }
-      if(submission.getExceptionStackTrace() != null) {
+      if (submission.getExceptionStackTrace() != null) {
         object.put(EXCEPTION_TRACE, submission.getExceptionStackTrace());
       }
-      if(submission.getCounters() != null) {
+      if (submission.getCounters() != null) {
         object.put(COUNTERS, extractCounters(submission.getCounters()));
       }
-      if(submission.getFromSchema() != null)  {
+      if (submission.getFromSchema() != null) {
         object.put(FROM_SCHEMA, extractSchema(submission.getFromSchema()));
       }
-      if(submission.getToSchema() != null) {
+      if (submission.getToSchema() != null) {
         object.put(TO_SCHEMA, extractSchema(submission.getToSchema()));
       }
-
-      array.add(object);
+      submissionsArray.add(object);
     }
-
-    JSONObject all = new JSONObject();
-    all.put(ALL, array);
-
-    return all;
+    return submissionsArray;
   }
 
   @SuppressWarnings("unchecked")
-  public JSONObject extractCounters(Counters counters) {
-    JSONObject ret = new JSONObject();
-    for(CounterGroup group : counters) {
+  private JSONObject extractCounters(Counters counters) {
+    JSONObject counterArray = new JSONObject();
+    for (CounterGroup group : counters) {
       JSONObject counterGroup = new JSONObject();
 
-      for(Counter counter : group) {
+      for (Counter counter : group) {
         counterGroup.put(counter.getName(), counter.getValue());
       }
-
-      ret.put(group.getName(), counterGroup);
+      counterArray.put(group.getName(), counterGroup);
     }
-    return ret;
+    return counterArray;
   }
 
   @Override
   public void restore(JSONObject json) {
-    this.submissions = new ArrayList<MSubmission>();
-
-    JSONArray array = (JSONArray) json.get(ALL);
+    JSONArray submissionArray = (JSONArray) json.get(SUBMISSION);
+    restoreSubmissions(submissionArray);
+  }
 
+  protected void restoreSubmissions(JSONArray array) {
+    this.submissions = new ArrayList<MSubmission>();
     for (Object obj : array) {
       JSONObject object = (JSONObject) obj;
       MSubmission submission = new MSubmission();
@@ -161,38 +165,38 @@ public class SubmissionBean implements JsonBean {
       submission.setStatus(SubmissionStatus.valueOf((String) object.get(STATUS)));
       submission.setProgress((Double) object.get(PROGRESS));
 
-      if(object.containsKey(CREATION_USER)) {
+      if (object.containsKey(CREATION_USER)) {
         submission.setCreationUser((String) object.get(CREATION_USER));
       }
-      if(object.containsKey(CREATION_DATE)) {
+      if (object.containsKey(CREATION_DATE)) {
         submission.setCreationDate(new Date((Long) object.get(CREATION_DATE)));
       }
-      if(object.containsKey(LAST_UPDATE_USER)) {
+      if (object.containsKey(LAST_UPDATE_USER)) {
         submission.setLastUpdateUser((String) object.get(LAST_UPDATE_USER));
       }
-      if(object.containsKey(LAST_UPDATE_DATE)) {
+      if (object.containsKey(LAST_UPDATE_DATE)) {
         submission.setLastUpdateDate(new Date((Long) object.get(LAST_UPDATE_DATE)));
       }
-      if(object.containsKey(EXTERNAL_ID)) {
+      if (object.containsKey(EXTERNAL_ID)) {
         submission.setExternalId((String) object.get(EXTERNAL_ID));
       }
-      if(object.containsKey(EXTERNAL_LINK)) {
+      if (object.containsKey(EXTERNAL_LINK)) {
         submission.setExternalLink((String) object.get(EXTERNAL_LINK));
       }
-      if(object.containsKey(EXCEPTION)) {
+      if (object.containsKey(EXCEPTION)) {
         submission.setExceptionInfo((String) object.get(EXCEPTION));
       }
-      if(object.containsKey(EXCEPTION_TRACE)) {
+      if (object.containsKey(EXCEPTION_TRACE)) {
         submission.setExceptionStackTrace((String) object.get(EXCEPTION_TRACE));
       }
-      if(object.containsKey(COUNTERS)) {
+      if (object.containsKey(COUNTERS)) {
         submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS)));
       }
 
-      if(object.containsKey(FROM_SCHEMA)) {
+      if (object.containsKey(FROM_SCHEMA)) {
         submission.setFromSchema(restoreSchema((JSONObject) object.get(FROM_SCHEMA)));
       }
-      if(object.containsKey(TO_SCHEMA)) {
+      if (object.containsKey(TO_SCHEMA)) {
         submission.setToSchema(restoreSchema((JSONObject) object.get(TO_SCHEMA)));
       }
 
@@ -200,24 +204,20 @@ public class SubmissionBean implements JsonBean {
     }
   }
 
+  @SuppressWarnings("unchecked")
   public Counters restoreCounters(JSONObject object) {
     Set<Map.Entry<String, JSONObject>> groupSet = object.entrySet();
     Counters counters = new Counters();
 
-    for(Map.Entry<String, JSONObject> groupEntry: groupSet) {
-
+    for (Map.Entry<String, JSONObject> groupEntry : groupSet) {
       CounterGroup group = new CounterGroup(groupEntry.getKey());
-
       Set<Map.Entry<String, Long>> counterSet = groupEntry.getValue().entrySet();
-
-      for(Map.Entry<String, Long> counterEntry: counterSet) {
+      for (Map.Entry<String, Long> counterEntry : counterSet) {
         Counter counter = new Counter(counterEntry.getKey(), counterEntry.getValue());
         group.addCounter(counter);
       }
-
       counters.addCounterGroup(group);
     }
-
     return counters;
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java
new file mode 100644
index 0000000..74b6179
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import java.util.List;
+
+import org.apache.sqoop.model.MSubmission;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class SubmissionsBean extends SubmissionBean {
+
+  private static final String SUBMISSIONS = "submissions";
+
+  // For "extract"
+  public SubmissionsBean(MSubmission submission) {
+    super(submission);
+  }
+
+  public SubmissionsBean(List<MSubmission> submissions) {
+    super(submissions);
+
+  }
+
+  // For "restore"
+  public SubmissionsBean() {
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public JSONObject extract(boolean skipSensitive) {
+    JSONArray submissionsArray = super.extractSubmissions();
+    JSONObject submissions = new JSONObject();
+    submissions.put(SUBMISSIONS, submissionsArray);
+    return submissions;
+  }
+
+  @Override
+  public void restore(JSONObject json) {
+    JSONArray submissionsArray = (JSONArray) json.get(SUBMISSIONS);
+    restoreSubmissions(submissionsArray);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/model/MSubmission.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MSubmission.java b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
index 7290df5..2648712 100644
--- a/common/src/main/java/org/apache/sqoop/model/MSubmission.java
+++ b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
@@ -36,9 +36,6 @@ public class MSubmission extends MAccountableEntity {
   /**
    * Job id that this submission object belongs.
    *
-   * By transitivity of metadata structure you can get also connection and
-   * connector ids.
-   *
    * This property is required and will be always present.
    */
   private long jobId;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/TestJobBean.java b/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
index 1fc8dbd..923ad47 100644
--- a/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
+++ b/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
@@ -55,13 +55,13 @@ public class TestJobBean {
 
     // Serialize it to JSON object
     JobBean jobBean = new JobBean(job);
-    JSONObject jobJson = jobBean.extract(false);
+    JSONObject json = jobBean.extract(false);
 
     // "Move" it across network in text form
-    String jobJsonString = jobJson.toJSONString();
+    String jobJsonString = json.toJSONString();
 
     // Retrieved transferred object
-    JSONObject parsedJobJson = (JSONObject)JSONValue.parseWithException(jobJsonString);
+    JSONObject parsedJobJson = (JSONObject)JSONValue.parse(jobJsonString);
     JobBean parsedJobBean = new JobBean();
     parsedJobBean.restore(parsedJobJson);
     MJob target = parsedJobBean.getJobs().get(0);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
index e4d50bf..c5b8781 100644
--- a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
+++ b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
@@ -457,13 +457,13 @@ public class TestSubmissionBean {
    * @return
    */
   private MSubmission transfer(MSubmission submission) {
-    SubmissionBean bean = new SubmissionBean(submission);
+    SubmissionsBean bean = new SubmissionsBean(submission);
     JSONObject json = bean.extract(false);
 
     String string = json.toString();
 
     JSONObject retrievedJson = (JSONObject) JSONValue.parse(string);
-    SubmissionBean retrievedBean = new SubmissionBean();
+    SubmissionsBean retrievedBean = new SubmissionsBean();
     retrievedBean.restore(retrievedJson);
 
     return retrievedBean.getSubmissions().get(0);
@@ -476,13 +476,13 @@ public class TestSubmissionBean {
    * @return
    */
   private List<MSubmission> transfer(List<MSubmission> submissions) {
-    SubmissionBean bean = new SubmissionBean(submissions);
+    SubmissionsBean bean = new SubmissionsBean(submissions);
     JSONObject json = bean.extract(false);
 
     String string = json.toString();
 
     JSONObject retrievedJson = (JSONObject) JSONValue.parse(string);
-    SubmissionBean retrievedBean = new SubmissionBean();
+    SubmissionsBean retrievedBean = new SubmissionsBean();
     retrievedBean.restore(retrievedJson);
 
     return retrievedBean.getSubmissions();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/driver/DriverError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverError.java b/core/src/main/java/org/apache/sqoop/driver/DriverError.java
index ddee282..25a1b70 100644
--- a/core/src/main/java/org/apache/sqoop/driver/DriverError.java
+++ b/core/src/main/java/org/apache/sqoop/driver/DriverError.java
@@ -22,6 +22,8 @@ import org.apache.sqoop.common.ErrorCode;
 /**
  *
  */
+//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): why is this called  Driver Error since it is used in JobManager?
+
 public enum DriverError implements ErrorCode {
 
   DRIVER_0001("Invalid submission engine"),
@@ -40,6 +42,7 @@ public enum DriverError implements ErrorCode {
 
   DRIVER_0008("Invalid combination of submission and execution engines"),
 
+  //TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start terminology difference
   DRIVER_0009("Job has been disabled. Cannot submit this job."),
 
   DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index ba56c77..36ba1cd 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -269,11 +269,11 @@ public class JobManager implements Reconfigurable {
     LOG.info("Submission manager initialized: OK");
   }
 
-  public MSubmission submit(long jobId, HttpEventContext ctx) {
+  public MSubmission start(long jobId, HttpEventContext ctx) {
 
     MSubmission mSubmission = createJobSubmission(ctx, jobId);
     JobRequest jobRequest = createJobRequest(jobId, mSubmission);
-    // Bootstrap job to execute
+    // Bootstrap job to execute in the configured execution engine
     prepareJob(jobRequest);
     // Make sure that this job id is not currently running and submit the job
     // only if it's not.
@@ -283,14 +283,17 @@ public class JobManager implements Reconfigurable {
       if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
         throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId);
       }
-      // TODO(jarcec): We might need to catch all exceptions here to ensure
-      // that Destroyer will be executed in all cases.
       // NOTE: the following is a blocking call
       boolean success = submissionEngine.submit(jobRequest);
       if (!success) {
-        destroySubmission(jobRequest);
+        // TODO(jarcec): We might need to catch all exceptions here to ensure
+        // that Destroyer will be executed in all cases.
+        invokeDestroyerOnJobFailure(jobRequest);
         mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
       }
+      // persist submission record to repository.
+      // on failure we persist the FAILURE status, on success it is the SUCCESS
+      // status ( which is the default one)
       RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
     }
     return mSubmission;
@@ -435,6 +438,7 @@ public class JobManager implements Reconfigurable {
     return job;
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   private void initializeConnector(JobRequest jobRequest, Direction direction) {
     Initializer initializer = getConnectorInitializer(jobRequest, direction);
     InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
@@ -444,6 +448,7 @@ public class JobManager implements Reconfigurable {
         jobRequest.getJobConfig(direction));
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) {
 
     Initializer initializer = getConnectorInitializer(jobRequest, direction);
@@ -453,6 +458,7 @@ public class JobManager implements Reconfigurable {
         jobRequest.getJobConfig(direction));
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
 
     Initializer initializer = getConnectorInitializer(jobRequest, direction);
@@ -462,6 +468,7 @@ public class JobManager implements Reconfigurable {
         jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)));
   }
 
+  @SuppressWarnings({ "rawtypes" })
   private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
     Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
     Class<? extends Initializer> initializerClass = transferable.getInitializer();
@@ -494,7 +501,8 @@ public class JobManager implements Reconfigurable {
    * Callback that will be called only if we failed to submit the job to the
    * remote cluster.
    */
-  void destroySubmission(JobRequest request) {
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  void invokeDestroyerOnJobFailure(JobRequest request) {
     Transferable from = request.getFrom();
     Transferable to = request.getTo();
 
@@ -520,7 +528,6 @@ public class JobManager implements Reconfigurable {
         request.getConnectorContext(Direction.TO), false, request.getSummary()
         .getToSchema());
 
-    // destroy submission from connector perspective
     fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM),
         request.getJobConfig(Direction.FROM));
     toDestroyer.destroy(toDestroyerContext, request.getConnectorLinkConfig(Direction.TO),
@@ -534,7 +541,7 @@ public class JobManager implements Reconfigurable {
 
     if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
       throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
-          + " is not running");
+          + " is not running hence cannot stop");
     }
     submissionEngine.stop(mSubmission.getExternalId());
 
@@ -554,8 +561,7 @@ public class JobManager implements Reconfigurable {
     if (mSubmission == null) {
       return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
     }
-
-    // If the submission is in running state, let's update it
+    // If the submission isin running state, let's update it
     if (mSubmission.getStatus().isRunning()) {
       update(mSubmission);
     }
@@ -696,4 +702,4 @@ public class JobManager implements Reconfigurable {
       LOG.info("Ending submission manager update thread");
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
index 2666320..eed79a5 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
@@ -91,7 +91,7 @@ public class JobRequest {
   MutableMapContext toConnectorContext;
 
   /**
-   * Framework context (submission specific configuration)
+   * Driver context (submission specific configuration)
    */
   MutableMapContext driverContext;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 976223d..f41e60e 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -500,6 +500,20 @@ public class JdbcRepository extends Repository {
     });
   }
 
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MJob findJob(final String name) {
+    return (MJob) doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) {
+        return handler.findJob(name, conn);
+      }
+    });
+  }
+
   /**
    * {@inheritDoc}
    */

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 1e22759..4fe1500 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -335,6 +335,15 @@ public abstract class JdbcRepositoryHandler {
   public abstract MJob findJob(long jobId, Connection conn);
 
   /**
+   * Find job with given name in repository.
+   *
+   * @param name unique name for the job
+   * @param conn Connection to the repository
+   * @return job for a given name that is present in the repository or null if not present
+   */
+  public abstract MJob findJob(String name, Connection conn);
+
+  /**
    * Get all job objects.
    *
    * @param conn Connection to the repository

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 61d6b9b..76e124e 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -254,6 +254,14 @@ public abstract class Repository {
   public abstract MJob findJob(long id);
 
   /**
+   * Find job object with given name.
+   *
+   * @param name unique name for the job
+   * @return job with given name loaded from repository or null if not present
+   */
+  public abstract MJob findJob(String name);
+
+  /**
    * Get all job objects.
    *
    * @return List of all jobs in the repository

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index b324f4f..059a827 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -1593,6 +1593,33 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
+  public MJob findJob(String name, Connection conn) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(STMT_SELECT_JOB_SINGLE_BY_NAME);
+      stmt.setString(1, name);
+
+      List<MJob> jobs = loadJobs(stmt, conn);
+
+      if (jobs.size() != 1) {
+        return null;
+      }
+
+      // Return the first and only one link object
+      return jobs.get(0);
+
+    } catch (SQLException ex) {
+      logException(ex, name);
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public List<MJob> findJobs(Connection conn) {
     PreparedStatement stmt = null;
     try {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java
index c894d06..6c5fad7 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java
@@ -435,6 +435,10 @@ public final class DerbySchemaInsertUpdateDeleteSelectQuery {
    public static final String STMT_SELECT_JOB_SINGLE_BY_ID =
        STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_ID + " = ?";
 
+// DML: Select one specific job
+   public static final String STMT_SELECT_JOB_SINGLE_BY_NAME =
+       STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_NAME + " = ?";
+
    // DML: Select all jobs for a Connector
    public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE =
        STMT_SELECT_JOB

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
index 5547988..8130805 100644
--- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.handler;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 
@@ -28,16 +29,22 @@ import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.driver.Driver;
+import org.apache.sqoop.driver.JobManager;
 import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.JobsBean;
 import org.apache.sqoop.json.JsonBean;
+import org.apache.sqoop.json.SubmissionBean;
 import org.apache.sqoop.json.ValidationResultBean;
 import org.apache.sqoop.model.ConfigUtils;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MFromConfig;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MPersistableEntity;
+import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.model.MToConfig;
 import org.apache.sqoop.repository.Repository;
 import org.apache.sqoop.repository.RepositoryManager;
+import org.apache.sqoop.request.HttpEventContext;
 import org.apache.sqoop.server.RequestContext;
 import org.apache.sqoop.server.RequestHandler;
 import org.apache.sqoop.server.common.ServerError;
@@ -46,41 +53,38 @@ import org.apache.sqoop.validation.Status;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
-/**
- * Job request handler is supporting following resources:
- *
- * GET /v1/job/:jid
- * Return details about one particular job with id :jid or about all of
- * them if :jid equals to "all".
- *
- * POST /v1/job
- * Create new job
- *
- * PUT /v1/job/:jid
- * Update job with id :jid.
- *
- * PUT /v1/job/:jid/enable
- * Enable job with id :jid
- *
- * PUT /v1/job/:jid/disable
- * Disable job with id :jid
- *
- * DELETE /v1/job/:jid
- * Remove job with id :jid
- *
- * Planned resources:
- *
- * GET /v1/job
- * Get brief list of all jobs present in the system. This resource is not yet
- * implemented.
- */
 public class JobRequestHandler implements RequestHandler {
 
-  private static final Logger LOG =
-      Logger.getLogger(JobRequestHandler.class);
+  /** enum for representing the actions supported on the job resource*/
+  enum JobAction {
+    ENABLE("enable"),
+    DISABLE("disable"),
+    START("start"),
+    STOP("stop"),
+    ;
+    JobAction(String name) {
+      this.name = name;
+    }
+
+    String name;
+
+    public static JobAction fromString(String name) {
+      if (name != null) {
+        for (JobAction action : JobAction.values()) {
+          if (name.equalsIgnoreCase(action.name)) {
+            return action;
+          }
+        }
+      }
+      return null;
+    }
+  }
+
+  private static final Logger LOG = Logger.getLogger(JobRequestHandler.class);
 
-  private static final String ENABLE = "enable";
-  private static final String DISABLE = "disable";
+  static final String JOBS_PATH = "jobs";
+  static final String JOB_PATH = "job";
+  static final String STATUS = "status";
 
   public JobRequestHandler() {
     LOG.info("JobRequestHandler initialized");
@@ -89,208 +93,306 @@ public class JobRequestHandler implements RequestHandler {
   @Override
   public JsonBean handleEvent(RequestContext ctx) {
     switch (ctx.getMethod()) {
-      case GET:
-        return getJobs(ctx);
-      case POST:
-          return createUpdateJob(ctx, false);
-      case PUT:
-        if (ctx.getLastURLElement().equals(ENABLE)) {
-          return enableJob(ctx, true);
-        } else if (ctx.getLastURLElement().equals(DISABLE)) {
-          return enableJob(ctx, false);
-        } else {
-          return createUpdateJob(ctx, true);
-        }
-      case DELETE:
-        return deleteJob(ctx);
+    case GET:
+      if (STATUS.equals(ctx.getLastURLElement())) {
+        return getJobStatus(ctx);
+      }
+      return getJobs(ctx);
+    case POST:
+      return createUpdateJob(ctx, true);
+    case PUT:
+      JobAction action = JobAction.fromString(ctx.getLastURLElement());
+      switch (action) {
+      case ENABLE:
+        return enableJob(ctx, true);
+      case DISABLE:
+        return enableJob(ctx, false);
+      case START:
+        return startJob(ctx);
+      case STOP:
+        return stopJob(ctx);
+      default:
+        return createUpdateJob(ctx, false);
+      }
+    case DELETE:
+      return deleteJob(ctx);
     }
 
     return null;
   }
 
   /**
-   * Delete job from  repository.
+   * Delete job from repository.
    *
-   * @param ctx Context object
+   * @param ctx
+   *          Context object
    * @return Empty bean
    */
   private JsonBean deleteJob(RequestContext ctx) {
-    String sxid = ctx.getLastURLElement();
-    long jid = Long.valueOf(sxid);
-
-    AuditLoggerManager.getInstance()
-        .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
-        "delete", "job", sxid);
 
     Repository repository = RepositoryManager.getInstance().getRepository();
-    repository.deleteJob(jid);
 
+    String jobIdentifier = ctx.getLastURLElement();
+    long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+
+    AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+        ctx.getRequest().getRemoteAddr(), "delete", "job", jobIdentifier);
+    repository.deleteJob(jobId);
     return JsonBean.EMPTY_BEAN;
   }
 
   /**
    * Update or create job in repository.
    *
-   * @param ctx Context object
+   * @param ctx
+   *          Context object
    * @return Validation bean object
    */
-  private JsonBean createUpdateJob(RequestContext ctx, boolean update) {
+  private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
+
+    Repository repository = RepositoryManager.getInstance().getRepository();
 
     String username = ctx.getUserName();
     JobBean bean = new JobBean();
 
     try {
-      JSONObject json =
-        (JSONObject) JSONValue.parse(ctx.getRequest().getReader());
+      JSONObject json = (JSONObject) JSONValue.parse(ctx.getRequest().getReader());
       bean.restore(json);
     } catch (IOException e) {
-      throw new SqoopException(ServerError.SERVER_0003,
-        "Can't read request content", e);
+      throw new SqoopException(ServerError.SERVER_0003, "Can't read request content", e);
     }
 
     // Get job object
     List<MJob> jobs = bean.getJobs();
 
-    if(jobs.size() != 1) {
-      throw new SqoopException(ServerError.SERVER_0003,
-        "Expected one job but got " + jobs.size());
+    if (jobs.size() != 1) {
+      throw new SqoopException(ServerError.SERVER_0003, "Expected one job but got " + jobs.size());
     }
 
     // Job object
-    MJob job = jobs.get(0);
+    MJob postedJob = jobs.get(0);
 
     // Verify that user is not trying to spoof us
     MFromConfig fromConfig = ConnectorManager.getInstance()
-        .getConnectorConfigurable(job.getConnectorId(Direction.FROM))
-        .getFromConfig();
+        .getConnectorConfigurable(postedJob.getConnectorId(Direction.FROM)).getFromConfig();
     MToConfig toConfig = ConnectorManager.getInstance()
-        .getConnectorConfigurable(job.getConnectorId(Direction.TO))
-        .getToConfig();
+        .getConnectorConfigurable(postedJob.getConnectorId(Direction.TO)).getToConfig();
     MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig();
 
-    if(!fromConfig.equals(job.getJobConfig(Direction.FROM))
-      || !driverConfig.equals(job.getDriverConfig())
-      || !toConfig.equals(job.getJobConfig(Direction.TO))) {
-      throw new SqoopException(ServerError.SERVER_0003,
-        "Detected incorrect config structure");
+    if (!fromConfig.equals(postedJob.getJobConfig(Direction.FROM))
+        || !driverConfig.equals(postedJob.getDriverConfig())
+        || !toConfig.equals(postedJob.getJobConfig(Direction.TO))) {
+      throw new SqoopException(ServerError.SERVER_0003, "Detected incorrect config structure");
+    }
+
+    // if update get the job id from the request URI
+    if (!create) {
+      String jobIdentifier = ctx.getLastURLElement();
+      // support jobName or jobId for the api
+      long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+      if (postedJob.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) {
+        MJob existingJob = repository.findJob(jobId);
+        postedJob.setPersistenceId(existingJob.getPersistenceId());
+      }
     }
 
     // Corresponding connectors for this
-    SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.FROM));
-    SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.TO));
+    SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(
+        postedJob.getConnectorId(Direction.FROM));
+    SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(
+        postedJob.getConnectorId(Direction.TO));
 
     if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
-      throw new SqoopException(ServerError.SERVER_0004, "Connector " + fromConnector.getClass().getCanonicalName()
-          + " does not support FROM direction.");
+      throw new SqoopException(ServerError.SERVER_0004, "Connector "
+          + fromConnector.getClass().getCanonicalName() + " does not support FROM direction.");
     }
 
     if (!toConnector.getSupportedDirections().contains(Direction.TO)) {
-      throw new SqoopException(ServerError.SERVER_0004, "Connector " + toConnector.getClass().getCanonicalName()
-          + " does not support TO direction.");
+      throw new SqoopException(ServerError.SERVER_0004, "Connector "
+          + toConnector.getClass().getCanonicalName() + " does not support TO direction.");
     }
 
     // Validate user supplied data
     ConfigValidationResult fromConfigValidator = ConfigUtils.validateConfigs(
-      job.getJobConfig(Direction.FROM).getConfigs(),
-      fromConnector.getJobConfigurationClass(Direction.FROM)
-    );
+        postedJob.getJobConfig(Direction.FROM).getConfigs(),
+        fromConnector.getJobConfigurationClass(Direction.FROM));
     ConfigValidationResult toConfigValidator = ConfigUtils.validateConfigs(
-      job.getJobConfig(Direction.TO).getConfigs(),
-      toConnector.getJobConfigurationClass(Direction.TO)
-    );
-    ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs(
-      job.getDriverConfig().getConfigs(),
-      Driver.getInstance().getDriverJobConfigurationClass()
-    );
-
-    Status finalStatus = Status.getWorstStatus(fromConfigValidator.getStatus(), toConfigValidator.getStatus(), driverConfigValidator.getStatus());
-
+        postedJob.getJobConfig(Direction.TO).getConfigs(),
+        toConnector.getJobConfigurationClass(Direction.TO));
+    ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs(postedJob
+        .getDriverConfig().getConfigs(), Driver.getInstance().getDriverJobConfigurationClass());
+    Status finalStatus = Status.getWorstStatus(fromConfigValidator.getStatus(),
+        toConfigValidator.getStatus(), driverConfigValidator.getStatus());
     // Return back validations in all cases
-    ValidationResultBean validationResultBean = new ValidationResultBean(fromConfigValidator, toConfigValidator);
+    ValidationResultBean validationResultBean = new ValidationResultBean(fromConfigValidator,
+        toConfigValidator);
 
     // If we're good enough let's perform the action
-    if(finalStatus.canProceed()) {
-      if(update) {
-        AuditLoggerManager.getInstance()
-            .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
-            "update", "job", String.valueOf(job.getPersistenceId()));
-
-        job.setLastUpdateUser(username);
-        RepositoryManager.getInstance().getRepository().updateJob(job);
+    if (finalStatus.canProceed()) {
+      if (create) {
+        AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+            ctx.getRequest().getRemoteAddr(), "create", "job",
+            String.valueOf(postedJob.getPersistenceId()));
+
+        postedJob.setCreationUser(username);
+        postedJob.setLastUpdateUser(username);
+        repository.createJob(postedJob);
+        validationResultBean.setId(postedJob.getPersistenceId());
       } else {
-        job.setCreationUser(username);
-        job.setLastUpdateUser(username);
-        RepositoryManager.getInstance().getRepository().createJob(job);
-        validationResultBean.setId(job.getPersistenceId());
-
-        AuditLoggerManager.getInstance()
-            .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
-            "create", "job", String.valueOf(job.getPersistenceId()));
-      }
+        AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+            ctx.getRequest().getRemoteAddr(), "update", "job",
+            String.valueOf(postedJob.getPersistenceId()));
 
+        postedJob.setLastUpdateUser(username);
+        repository.updateJob(postedJob);
+      }
     }
-
     return validationResultBean;
   }
 
   private JsonBean getJobs(RequestContext ctx) {
-    String sjid = ctx.getLastURLElement();
-    JobBean bean;
-
-    AuditLoggerManager.getInstance()
-        .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
-        "get", "job", sjid);
-
+    String identifier = ctx.getLastURLElement();
+    JobBean jobBean;
     Locale locale = ctx.getAcceptLanguageHeader();
     Repository repository = RepositoryManager.getInstance().getRepository();
 
-    if (sjid.equals(JsonBean.ALL)) {
-
-      List<MJob> jobs = repository.findJobs();
-      bean = new JobBean(jobs);
-
-      // Add associated resources into the bean
-      for( MJob job : jobs) {
-        long fromConnectorId = job.getConnectorId(Direction.FROM);
-        long toConnectorId = job.getConnectorId(Direction.TO);
-        if(!bean.hasConnectorConfigBundle(fromConnectorId)) {
-          bean.addConnectorConfigBundle(fromConnectorId,
-            ConnectorManager.getInstance().getResourceBundle(fromConnectorId, locale));
-        }
-        if(!bean.hasConnectorConfigBundle(toConnectorId)) {
-          bean.addConnectorConfigBundle(toConnectorId,
-              ConnectorManager.getInstance().getResourceBundle(toConnectorId, locale));
-        }
+    // jobs by connector
+    if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) {
+      identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
+      AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+          ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", identifier);
+      if (repository.findConnector(identifier) != null) {
+        long connectorId = repository.findConnector(identifier).getPersistenceId();
+        jobBean = createJobsBean(repository.findJobsForConnector(connectorId), locale);
+      } else {
+        // this means name nor Id existed
+        throw new SqoopException(ServerError.SERVER_0005, "Invalid connector: " + identifier
+            + " name for jobs given");
       }
+    } else
+    // all jobs in the system
+    if (ctx.getPath().contains(JOBS_PATH)
+        || (ctx.getPath().contains(JOB_PATH) && identifier.equals("all"))) {
+      AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+          ctx.getRequest().getRemoteAddr(), "get", "jobs", "all");
+      jobBean = createJobsBean(repository.findJobs(), locale);
+    }
+    // job by Id
+    else {
+      AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+          ctx.getRequest().getRemoteAddr(), "get", "job", identifier);
+
+      long jobId = getJobIdFromIdentifier(identifier, repository);
+      List<MJob> jobList = new ArrayList<MJob>();
+      // a list of single element
+      jobList.add(repository.findJob(jobId));
+      jobBean = createJobBean(jobList, locale);
+    }
+    return jobBean;
+  }
+
+  private long getJobIdFromIdentifier(String identifier, Repository repository) {
+    // support jobName or jobId for the api
+    // NOTE: jobId is a fallback for older sqoop clients if any, since we want to primarily use unique jobNames
+    long jobId;
+    if (repository.findJob(identifier) != null) {
+      jobId = repository.findJob(identifier).getPersistenceId();
     } else {
-      long jid = Long.valueOf(sjid);
+      try {
+        jobId = Long.valueOf(identifier);
+      } catch (NumberFormatException ex) {
+        // this means name nor Id existed and we want to throw a user friendly message than a number format exception
+        throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier
+            + " requested");
+      }
+    }
+    return jobId;
+  }
+
+  private JobBean createJobBean(List<MJob> jobs, Locale locale) {
+    JobBean jobBean = new JobBean(jobs);
+    addJob(jobs, locale, jobBean);
+    return jobBean;
+  }
 
-      MJob job = repository.findJob(jid);
+  private JobsBean createJobsBean(List<MJob> jobs, Locale locale) {
+    JobsBean jobsBean = new JobsBean(jobs);
+    addJob(jobs, locale, jobsBean);
+    return jobsBean;
+  }
+
+  private void addJob(List<MJob> jobs, Locale locale, JobBean bean) {
+    // Add associated resources into the bean
+    for (MJob job : jobs) {
       long fromConnectorId = job.getConnectorId(Direction.FROM);
       long toConnectorId = job.getConnectorId(Direction.TO);
-      bean = new JobBean(job);
-      if(!bean.hasConnectorConfigBundle(fromConnectorId)) {
-        bean.addConnectorConfigBundle(fromConnectorId,
-            ConnectorManager.getInstance().getResourceBundle(fromConnectorId, locale));
+      // replace it only if it does not already exist
+      if (!bean.hasConnectorConfigBundle(fromConnectorId)) {
+        bean.addConnectorConfigBundle(fromConnectorId, ConnectorManager.getInstance()
+            .getResourceBundle(fromConnectorId, locale));
       }
-      if(!bean.hasConnectorConfigBundle(toConnectorId)) {
-        bean.addConnectorConfigBundle(toConnectorId,
-            ConnectorManager.getInstance().getResourceBundle(toConnectorId, locale));
+      if (!bean.hasConnectorConfigBundle(toConnectorId)) {
+        bean.addConnectorConfigBundle(toConnectorId, ConnectorManager.getInstance()
+            .getResourceBundle(toConnectorId, locale));
       }
     }
-
-    // set driver config bundle
-    bean.setDriverConfigBundle(Driver.getInstance().getBundle(locale));
-    return bean;
   }
 
   private JsonBean enableJob(RequestContext ctx, boolean enabled) {
+    Repository repository = RepositoryManager.getInstance().getRepository();
     String[] elements = ctx.getUrlElements();
-    String sjid = elements[elements.length - 2];
-    long xid = Long.valueOf(sjid);
+    String jobIdentifier = elements[elements.length - 2];
+    long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+    repository.enableJob(jobId, enabled);
+    return JsonBean.EMPTY_BEAN;
+  }
 
+  private JsonBean startJob(RequestContext ctx) {
     Repository repository = RepositoryManager.getInstance().getRepository();
-    repository.enableJob(xid, enabled);
-    return JsonBean.EMPTY_BEAN;
+    String[] elements = ctx.getUrlElements();
+    String jobIdentifier = elements[elements.length - 2];
+    long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+    AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+        ctx.getRequest().getRemoteAddr(), "submit", "job", String.valueOf(jobId));
+    // TODO(SQOOP-1638): This should be outsourced somewhere more suitable than
+    // here
+    if (JobManager.getInstance().getNotificationBaseUrl() == null) {
+      String url = ctx.getRequest().getRequestURL().toString();
+      JobManager.getInstance().setNotificationBaseUrl(
+          url.split("v1")[0] + "/v1/job/status/notification/");
+    }
+
+    MSubmission submission = JobManager.getInstance()
+        .start(jobId, prepareRequestEventContext(ctx));
+    return new SubmissionBean(submission);
   }
+
+  private JsonBean stopJob(RequestContext ctx) {
+    Repository repository = RepositoryManager.getInstance().getRepository();
+    String[] elements = ctx.getUrlElements();
+    String jobIdentifier = elements[elements.length - 2];
+    long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+    AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+        ctx.getRequest().getRemoteAddr(), "stop", "job", String.valueOf(jobId));
+    MSubmission submission = JobManager.getInstance().stop(jobId, prepareRequestEventContext(ctx));
+    return new SubmissionBean(submission);
+  }
+
+  private JsonBean getJobStatus(RequestContext ctx) {
+    Repository repository = RepositoryManager.getInstance().getRepository();
+    String[] elements = ctx.getUrlElements();
+    String jobIdentifier = elements[elements.length - 2];
+    long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+    AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+        ctx.getRequest().getRemoteAddr(), "status", "job", String.valueOf(jobId));
+    MSubmission submission = JobManager.getInstance().status(jobId);
+    return new SubmissionBean(submission);
+  }
+
+  private HttpEventContext prepareRequestEventContext(RequestContext ctx) {
+    HttpEventContext httpEventContext = new HttpEventContext();
+    httpEventContext.setUsername(ctx.getUserName());
+    return httpEventContext;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
index 8555b0c..cfbb524 100644
--- a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
@@ -22,37 +22,19 @@ import java.util.List;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.audit.AuditLoggerManager;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.driver.JobManager;
 import org.apache.sqoop.json.JsonBean;
-import org.apache.sqoop.json.SubmissionBean;
+import org.apache.sqoop.json.SubmissionsBean;
 import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.repository.Repository;
 import org.apache.sqoop.repository.RepositoryManager;
-import org.apache.sqoop.request.HttpEventContext;
 import org.apache.sqoop.server.RequestContext;
+import org.apache.sqoop.server.RequestContext.Method;
 import org.apache.sqoop.server.RequestHandler;
 import org.apache.sqoop.server.common.ServerError;
 
-/**
- * Submission request handler is supporting following resources:
- *
- * GET /v1/submission/action/:jid
- * Get status of last submission for job with id :jid
- *
- * POST /v1/submission/action/:jid
- * Create new submission for job with id :jid
- *
- * DELETE /v1/submission/action/:jid
- * Stop last submission for job with id :jid
- *
- * GET /v1/submission/notification/:jid
- * Notification endpoint to get job status outside normal interval
- *
- * Possible additions in the future: /v1/submission/history/* for history.
- */
 public class SubmissionRequestHandler implements RequestHandler {
 
-  private static final Logger LOG =
-    Logger.getLogger(SubmissionRequestHandler.class);
+  private static final Logger LOG = Logger.getLogger(SubmissionRequestHandler.class);
 
   public SubmissionRequestHandler() {
     LOG.info("SubmissionRequestHandler initialized");
@@ -60,110 +42,45 @@ public class SubmissionRequestHandler implements RequestHandler {
 
   @Override
   public JsonBean handleEvent(RequestContext ctx) {
-    String[] urlElements = ctx.getUrlElements();
-    if (urlElements.length < 2) {
-      throw new SqoopException(ServerError.SERVER_0003,
-        "Invalid URL, too few arguments for this servlet.");
-    }
 
-    // Let's check
-    int length = urlElements.length;
-    String action = urlElements[length - 2];
-
-    if(action.equals("action")) {
-      return handleActionEvent(ctx, urlElements[length - 1]);
+    // submission only support GET requests
+    if (ctx.getMethod() != Method.GET) {
+      throw new SqoopException(ServerError.SERVER_0002, "Unsupported HTTP method for connector:"
+          + ctx.getMethod());
     }
-
-    if(action.equals("notification")) {
-      return handleNotification(ctx, urlElements[length - 1]);
-    }
-
-    if(action.equals("history")) {
-      return handleHistoryEvent(ctx, urlElements[length - 1]);
-    }
-
-    throw new SqoopException(ServerError.SERVER_0003,
-      "Do not know what to do.");
-  }
-
-  private JsonBean handleNotification(RequestContext ctx, String sjid) {
-    LOG.debug("Received notification request for job " + sjid);
-    JobManager.getInstance().status(Long.parseLong(sjid));
-    return JsonBean.EMPTY_BEAN;
-  }
-
-  private JsonBean handleActionEvent(RequestContext ctx, String sjid) {
-    long jid = Long.parseLong(sjid);
-
-    String username = ctx.getUserName();
-    HttpEventContext ectx = new HttpEventContext();
-    ectx.setUsername(username);
-
-    switch (ctx.getMethod()) {
-      case GET:
-        AuditLoggerManager.getInstance()
-            .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
-            "status", "submission", String.valueOf(jid));
-
-        return submissionStatus(jid);
-      case POST:
-        // TODO: This should be outsourced somewhere more suitable than here
-        if(JobManager.getInstance().getNotificationBaseUrl() == null) {
-          String url = ctx.getRequest().getRequestURL().toString();
-          JobManager.getInstance().setNotificationBaseUrl(
-            url.split("v1")[0] + "/v1/submission/notification/");
-        }
-
-        AuditLoggerManager.getInstance()
-            .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
-            "submit", "submission", String.valueOf(jid));
-
-        return submissionSubmit(jid, ectx);
-      case DELETE:
-        AuditLoggerManager.getInstance()
-            .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
-            "stop", "submission", String.valueOf(jid));
-
-        return submissionStop(jid, ectx);
-    }
-
-    return null;
-  }
-
-  private JsonBean handleHistoryEvent(RequestContext ctx, String sjid) {
-    AuditLoggerManager.getInstance()
-        .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
-        "get", "submission", sjid);
-
-    if (sjid.equals("all")) {
-      return getSubmissions();
+    String identifier = ctx.getLastURLElement();
+    Repository repository = RepositoryManager.getInstance().getRepository();
+    // links by connector ordered by updated time
+    // hence the latest submission is on the top
+    if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) {
+      identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
+      AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+          ctx.getRequest().getRemoteAddr(), "get", "submissionsByJob", identifier);
+      if (repository.findJob(identifier) != null) {
+        long jobId = repository.findJob(identifier).getPersistenceId();
+        return getSubmissionsForJob(jobId);
+      } else {
+        // this means name nor Id existed
+        throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier
+            + " name given");
+      }
     } else {
-      return getSubmissionsForJob(Long.parseLong(sjid));
+      // all submissions in the system
+      AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+          ctx.getRequest().getRemoteAddr(), "get", "submissions", "all");
+      return getSubmissions();
     }
   }
 
-  private JsonBean submissionStop(long jid, HttpEventContext ctx) {
-    MSubmission submission = JobManager.getInstance().stop(jid, ctx);
-    return new SubmissionBean(submission);
-  }
-
-  private JsonBean submissionSubmit(long jid, HttpEventContext ctx) {
-    MSubmission submission = JobManager.getInstance().submit(jid, ctx);
-    return new SubmissionBean(submission);
-  }
-
-  private JsonBean submissionStatus(long jid) {
-    MSubmission submission = JobManager.getInstance().status(jid);
-    return new SubmissionBean(submission);
-  }
-
   private JsonBean getSubmissions() {
-    List<MSubmission> submissions = RepositoryManager.getInstance().getRepository().findSubmissions();
-    return new SubmissionBean(submissions);
+    List<MSubmission> submissions = RepositoryManager.getInstance().getRepository()
+        .findSubmissions();
+    return new SubmissionsBean(submissions);
   }
 
   private JsonBean getSubmissionsForJob(long jid) {
-    List<MSubmission> submissions = RepositoryManager.getInstance().getRepository().findSubmissionsForJob(jid);
-    return new SubmissionBean(submissions);
+    List<MSubmission> submissions = RepositoryManager.getInstance().getRepository()
+        .findSubmissionsForJob(jid);
+    return new SubmissionsBean(submissions);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java
index d295237..0d15d0a 100644
--- a/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java
+++ b/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java
@@ -23,7 +23,53 @@ import org.apache.sqoop.server.RequestContext;
 import org.apache.sqoop.server.RequestHandler;
 import org.apache.sqoop.server.SqoopProtocolServlet;
 
+
 /**
+ * Provides operations for job resource
+ *
+ * GET /v1/job/{jid}
+ * Return details about one particular job with id:jid or about all of
+ * them if jid equals to "all"
+ *
+ * POST /v1/job
+ * Create new job
+ * POST /v1/job/ with {from-link-id}, {to-link-id} and other job details in the post data
+ *  Create job with from and to link
+ * PUT /v1/link/ with {from-link-id}, {to-link-id} and other job details in the post data
+ *  Edit/Update job for the from and to link
+ *
+ * PUT /v1/job/{jid} and the job details in the post data
+ * Update job with id jid.
+ *
+ * PUT /v1/job/{jid}/enable
+ *  Enable job with id jid
+ * PUT /v1/job/{jname}s/disable
+ *  Enable job with name jname
+ *
+ * PUT /v1/job/{jid}/disable
+ *  Disable job with id jid
+ * PUT /v1/job/{jname}/disable
+ *  Disable job with name jname
+ *
+ * DELETE /v1/job/{jid}
+ *  Remove job with id jid
+ * DELETE /v1/job/{jname}
+ *  Remove job with name jname
+ *
+ * PUT /v1/job/{jid}/submit
+ *  Submit job with id jid to create a submission record
+ * PUT /v1/job/{jname}/submit
+ *  Submit job with name jname to create a submission record
+ *
+ * PUT /v1/job/{jid}/stop
+ *  Abort/Stop last running associated submission with job id jid
+ * PUT /v1/job/{jname}/stop
+ *  Abort/Stop last running associated submission with job name jname
+ *
+ * GET /v1/job/{jid}/status
+ *  get status of running job with job id jid
+ * GET /v1/job/{jname}/status
+ *  get status of running job with job name jname
  *
  */
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java
new file mode 100644
index 0000000..5184a0b
--- /dev/null
+++ b/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.server.v1;
+
+import org.apache.sqoop.handler.JobRequestHandler;
+import org.apache.sqoop.json.JsonBean;
+import org.apache.sqoop.server.RequestContext;
+import org.apache.sqoop.server.RequestHandler;
+import org.apache.sqoop.server.SqoopProtocolServlet;
+
+
+/**
+ * Displays all or jobs per connector in sqoop
+ *
+ * GET /v1/jobs
+ *  Return details about every jobs that exists in the sqoop system
+ * GET /v1/jobs?cname=
+ *  Return details about job(s) for a given connector name {cname}
+*/
+@SuppressWarnings("serial")
+public class JobsServlet extends SqoopProtocolServlet {
+
+  private RequestHandler jobRequestHandler;
+
+  public JobsServlet() {
+    jobRequestHandler = new JobRequestHandler();
+  }
+
+  @Override
+  protected JsonBean handleGetRequest(RequestContext ctx) throws Exception {
+    return jobRequestHandler.handleEvent(ctx);
+  }
+}


Mime
View raw message