SQOOP-1510: Sqoop2: Refactor JobRequestHandler for submit/abort job and SubmissionHandler for get operation only
(Veena Basavaraj via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/268a4755
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/268a4755
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/268a4755
Branch: refs/heads/sqoop2
Commit: 268a47552f5758c2d9caa5e859bd3a631baaedc9
Parents: d6319e3
Author: Abraham Elmahrek <abraham@elmahrek.com>
Authored: Mon Nov 3 21:23:18 2014 -0800
Committer: Abraham Elmahrek <abraham@elmahrek.com>
Committed: Mon Nov 3 21:24:19 2014 -0800
----------------------------------------------------------------------
.../org/apache/sqoop/client/SqoopClient.java | 42 +-
.../apache/sqoop/client/SubmissionCallback.java | 3 +-
.../client/request/JobResourceRequest.java | 59 ++-
.../client/request/SqoopResourceRequests.java | 21 +-
.../request/SubmissionResourceRequest.java | 48 +--
.../java/org/apache/sqoop/json/JobBean.java | 44 +-
.../java/org/apache/sqoop/json/JobsBean.java | 59 +++
.../java/org/apache/sqoop/json/JsonBean.java | 1 +
.../org/apache/sqoop/json/SubmissionBean.java | 120 +++---
.../org/apache/sqoop/json/SubmissionsBean.java | 59 +++
.../org/apache/sqoop/model/MSubmission.java | 3 -
.../java/org/apache/sqoop/json/TestJobBean.java | 6 +-
.../apache/sqoop/json/TestSubmissionBean.java | 8 +-
.../org/apache/sqoop/driver/DriverError.java | 3 +
.../org/apache/sqoop/driver/JobManager.java | 28 +-
.../org/apache/sqoop/driver/JobRequest.java | 2 +-
.../apache/sqoop/repository/JdbcRepository.java | 14 +
.../sqoop/repository/JdbcRepositoryHandler.java | 9 +
.../org/apache/sqoop/repository/Repository.java | 8 +
.../derby/DerbyRepositoryHandler.java | 27 ++
...erbySchemaInsertUpdateDeleteSelectQuery.java | 4 +
.../apache/sqoop/handler/JobRequestHandler.java | 408 ++++++++++++-------
.../sqoop/handler/SubmissionRequestHandler.java | 151 ++-----
.../org/apache/sqoop/server/v1/JobServlet.java | 46 +++
.../org/apache/sqoop/server/v1/JobsServlet.java | 48 +++
.../sqoop/server/v1/SubmissionServlet.java | 52 ---
.../sqoop/server/v1/SubmissionsServlet.java | 49 +++
server/src/main/webapp/WEB-INF/web.xml | 22 +-
.../sqoop/shell/ShowJobStatusFunction.java | 59 +++
.../apache/sqoop/shell/StartJobFunction.java | 4 +-
.../org/apache/sqoop/shell/StatusCommand.java | 2 +-
.../apache/sqoop/shell/StatusJobFunction.java | 59 ---
.../org/apache/sqoop/shell/StopJobFunction.java | 2 +-
.../apache/sqoop/shell/UpdateJobFunction.java | 2 +-
.../apache/sqoop/shell/UpdateLinkFunction.java | 2 +-
.../sqoop/test/testcases/ConnectorTestCase.java | 2 +-
.../jdbc/generic/FromRDBMSToHDFSTest.java | 4 +-
.../sqoop/tools/tool/RepositoryDumpTool.java | 4 +-
38 files changed, 905 insertions(+), 579 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
index 33a0c3c..e139132 100644
--- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
+++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
@@ -81,6 +81,8 @@ public class SqoopClient {
/**
* Status flags used when updating the submission callback status
*/
+ //TODO(https://issues.apache.org/jira/browse/SQOOP-1652): Why do wee need a duplicate status enum in client when shell is using the server status?
+ // NOTE: the getStatus method is on the job resource and this needs to be revisited
private enum SubmissionStatus {
SUBMITTED,
UPDATED,
@@ -438,8 +440,8 @@ public class SqoopClient {
* @param jobId Job id
* @return
*/
- public MSubmission startSubmission(long jobId) {
- return resourceRequests.createSubmission(jobId).getSubmissions().get(0);
+ public MSubmission startJob(long jobId) {
+ return resourceRequests.startJob(jobId).getSubmissions().get(0);
}
/**
@@ -452,24 +454,27 @@ public class SqoopClient {
* @return MSubmission - Final status of job submission
* @throws InterruptedException
*/
- public MSubmission startSubmission(long jobId, SubmissionCallback callback, long pollTime)
+ public MSubmission startJob(long jobId, SubmissionCallback callback, long pollTime)
throws InterruptedException {
if(pollTime <= 0) {
throw new SqoopException(ClientError.CLIENT_0002);
}
+ //TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start/first terminology difference
+ // What does first even mean in s distributed client/server model?
boolean first = true;
- MSubmission submission = resourceRequests.createSubmission(jobId).getSubmissions().get(0);
+ MSubmission submission = resourceRequests.startJob(jobId).getSubmissions().get(0);
+ // what happens when the server fails, do we just say finished?
while(submission.getStatus().isRunning()) {
if(first) {
- submissionCallback(callback, submission, SubmissionStatus.SUBMITTED);
+ invokeSubmissionCallback(callback, submission, SubmissionStatus.SUBMITTED);
first = false;
} else {
- submissionCallback(callback, submission, SubmissionStatus.UPDATED);
+ invokeSubmissionCallback(callback, submission, SubmissionStatus.UPDATED);
}
Thread.sleep(pollTime);
- submission = getSubmissionStatus(jobId);
+ submission = getJobStatus(jobId);
}
- submissionCallback(callback, submission, SubmissionStatus.FINISHED);
+ invokeSubmissionCallback(callback, submission, SubmissionStatus.FINISHED);
return submission;
}
@@ -480,7 +485,7 @@ public class SqoopClient {
* @param submission
* @param status
*/
- private void submissionCallback(SubmissionCallback callback, MSubmission submission,
+ private void invokeSubmissionCallback(SubmissionCallback callback, MSubmission submission,
SubmissionStatus status) {
if (callback == null) {
return;
@@ -494,17 +499,19 @@ public class SqoopClient {
break;
case FINISHED:
callback.finished(submission);
+ default:
+ break;
}
}
/**
- * Stop job with given id.
+ * stop job with given id.
*
* @param jid Job id
* @return
*/
- public MSubmission stopSubmission(long jid) {
- return resourceRequests.deleteSubmission(jid).getSubmissions().get(0);
+ public MSubmission stopJob(long jid) {
+ return resourceRequests.stopJob(jid).getSubmissions().get(0);
}
/**
@@ -513,8 +520,8 @@ public class SqoopClient {
* @param jid Job id
* @return
*/
- public MSubmission getSubmissionStatus(long jid) {
- return resourceRequests.readSubmission(jid).getSubmissions().get(0);
+ public MSubmission getJobStatus(long jid) {
+ return resourceRequests.getJobStatus(jid).getSubmissions().get(0);
}
/**
@@ -523,7 +530,7 @@ public class SqoopClient {
* @return
*/
public List<MSubmission> getSubmissions() {
- return resourceRequests.readHistory(null).getSubmissions();
+ return resourceRequests.readSubmission(null).getSubmissions();
}
/**
@@ -533,7 +540,7 @@ public class SqoopClient {
* @return
*/
public List<MSubmission> getSubmissionsForJob(long jobId) {
- return resourceRequests.readHistory(jobId).getSubmissions();
+ return resourceRequests.readSubmission(jobId).getSubmissions();
}
private Status applyLinkValidations(ValidationResultBean bean, MLink link) {
@@ -541,12 +548,13 @@ public class SqoopClient {
// Apply validation results
ConfigUtils.applyValidation(link.getConnectorLinkConfig().getConfigs(), linkConfig);
Long id = bean.getId();
- if(id != null) {
+ if (id != null) {
link.setPersistenceId(id);
}
return Status.getWorstStatus(linkConfig.getStatus());
}
+
private Status applyJobValidations(ValidationResultBean bean, MJob job) {
ConfigValidationResult fromConfig = bean.getValidationResults()[0];
ConfigValidationResult toConfig = bean.getValidationResults()[1];
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java b/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
index de7211a..e2e4860 100644
--- a/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
+++ b/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
@@ -20,8 +20,9 @@ package org.apache.sqoop.client;
import org.apache.sqoop.model.MSubmission;
/**
- * Callback interface for Synchronous job submission
+ * Callback interface for synchronous job submission
*/
+//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start consistent usage
public interface SubmissionCallback {
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java b/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java
index 83c08b3..55c8db2 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java
@@ -18,14 +18,16 @@
package org.apache.sqoop.client.request;
import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.JobsBean;
+import org.apache.sqoop.json.SubmissionBean;
import org.apache.sqoop.json.ValidationResultBean;
import org.apache.sqoop.model.MJob;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
/**
- * Provide CRUD semantics over RESTfull HTTP API for jobs. All operations
- * are normally supported.
+ * Provide CRUD semantics over RESTfull HTTP API for jobs. All operations are
+ * normally supported.
*/
public class JobResourceRequest extends ResourceRequest {
@@ -33,18 +35,25 @@ public class JobResourceRequest extends ResourceRequest {
private static final String ENABLE = "/enable";
private static final String DISABLE = "/disable";
+ private static final String START = "/start";
+ private static final String STOP = "/stop";
+ private static final String STATUS = "/status";
- public JobBean read(String serverUrl, Long linkId) {
+ public JobBean read(String serverUrl, Long jobId) {
String response;
- if (linkId == null) {
+ if (jobId == null) {
response = super.get(serverUrl + RESOURCE + "all");
} else {
- response = super.get(serverUrl + RESOURCE + linkId);
+ response = super.get(serverUrl + RESOURCE + jobId);
}
JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
- JobBean jobBean = new JobBean();
- jobBean.restore(jsonObject);
- return jobBean;
+ // defaults to all
+ JobBean bean = new JobsBean();
+ if (jobId != null) {
+ bean = new JobBean();
+ }
+ bean.restore(jsonObject);
+ return bean;
}
public ValidationResultBean create(String serverUrl, MJob job) {
@@ -61,21 +70,43 @@ public class JobResourceRequest extends ResourceRequest {
JobBean jobBean = new JobBean(job);
// Extract all config inputs including sensitive inputs
JSONObject jobJson = jobBean.extract(false);
- String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(), jobJson.toJSONString());
+ String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(),
+ jobJson.toJSONString());
ValidationResultBean validationBean = new ValidationResultBean();
validationBean.restore((JSONObject) JSONValue.parse(response));
return validationBean;
}
- public void delete(String serverUrl, Long id) {
- super.delete(serverUrl + RESOURCE + id);
+ public void delete(String serverUrl, Long jobId) {
+ super.delete(serverUrl + RESOURCE + jobId);
}
- public void enable(String serverUrl, Long id, Boolean enabled) {
+ public void enable(String serverUrl, Long jobId, Boolean enabled) {
if (enabled) {
- super.put(serverUrl + RESOURCE + id + ENABLE, null);
+ super.put(serverUrl + RESOURCE + jobId + ENABLE, null);
} else {
- super.put(serverUrl + RESOURCE + id + DISABLE, null);
+ super.put(serverUrl + RESOURCE + jobId + DISABLE, null);
}
}
+
+ public SubmissionBean start(String serverUrl, Long jobId) {
+ String response = super.put(serverUrl + RESOURCE + jobId + START, null);
+ return createJobSubmissionResponse(response);
+ }
+
+ public SubmissionBean stop(String serverUrl, Long jobId) {
+ String response = super.put(serverUrl + RESOURCE + jobId + STOP, null);
+ return createJobSubmissionResponse(response);
+ }
+
+ public SubmissionBean status(String serverUrl, Long jobId) {
+ String response = super.get(serverUrl + RESOURCE + jobId + STATUS);
+ return createJobSubmissionResponse(response);
+ }
+
+ private SubmissionBean createJobSubmissionResponse(String response) {
+ SubmissionBean submissionBean = new SubmissionBean();
+ submissionBean.restore((JSONObject) JSONValue.parse(response));
+ return submissionBean;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java b/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java
index 4a56bb7..fe528f2 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java
@@ -17,14 +17,15 @@
*/
package org.apache.sqoop.client.request;
-import org.apache.sqoop.json.LinkBean;
import org.apache.sqoop.json.ConnectorBean;
import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.LinkBean;
import org.apache.sqoop.json.SubmissionBean;
+import org.apache.sqoop.json.SubmissionsBean;
import org.apache.sqoop.json.ValidationResultBean;
-import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
/**
* Unified class for all request objects.
@@ -131,19 +132,19 @@ public class SqoopResourceRequests {
getJobResourceRequest().delete(serverUrl, jid);
}
- public SubmissionBean readHistory(Long jid) {
- return getSubmissionResourceRequest().readHistory(serverUrl, jid);
+ public SubmissionBean getJobStatus(Long jid) {
+ return getJobResourceRequest().status(serverUrl, jid);
}
- public SubmissionBean readSubmission(Long jid) {
- return getSubmissionResourceRequest().read(serverUrl, jid);
+ public SubmissionBean startJob(Long jid) {
+ return getJobResourceRequest().start(serverUrl, jid);
}
- public SubmissionBean createSubmission(Long jid) {
- return getSubmissionResourceRequest().create(serverUrl, jid);
+ public SubmissionBean stopJob(Long jid) {
+ return getJobResourceRequest().stop(serverUrl, jid);
}
- public SubmissionBean deleteSubmission(Long jid) {
- return getSubmissionResourceRequest().delete(serverUrl, jid);
+ public SubmissionsBean readSubmission(Long jid) {
+ return getSubmissionResourceRequest().read(serverUrl, jid);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java b/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java
index 5055783..e3b70bc 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java
@@ -17,7 +17,7 @@
*/
package org.apache.sqoop.client.request;
-import org.apache.sqoop.json.SubmissionBean;
+import org.apache.sqoop.json.SubmissionsBean;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -27,54 +27,18 @@ import org.json.simple.JSONValue;
*/
public class SubmissionResourceRequest extends ResourceRequest {
- public static final String RESOURCE = "v1/submission/";
+ public static final String RESOURCE = "v1/submissions/";
- public static final String ACTION = RESOURCE + "action/";
-
- public static final String HISTORY = RESOURCE + "history/";
-
- public SubmissionBean readHistory(String serverUrl, Long jid) {
+ public SubmissionsBean read(String serverUrl, Long jid) {
String response;
if (jid == null) {
- response = super.get(serverUrl + HISTORY + "all");
+ response = super.get(serverUrl + RESOURCE);
} else {
- response = super.get(serverUrl + HISTORY + jid);
+ response = super.get(serverUrl + RESOURCE + jid);
}
-
- JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
-
- SubmissionBean submissionBean = new SubmissionBean();
- submissionBean.restore(jsonObject);
-
- return submissionBean;
- }
-
- public SubmissionBean read(String serverUrl, Long jid) {
- String response = super.get(serverUrl + ACTION + jid);
-
JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
-
- SubmissionBean submissionBean = new SubmissionBean();
+ SubmissionsBean submissionBean = new SubmissionsBean();
submissionBean.restore(jsonObject);
-
- return submissionBean;
- }
-
- public SubmissionBean create(String serverUrl, Long jid) {
- String response = super.post(serverUrl + ACTION + jid, null);
-
- SubmissionBean submissionBean = new SubmissionBean();
- submissionBean.restore((JSONObject) JSONValue.parse(response));
-
- return submissionBean;
- }
-
- public SubmissionBean delete(String serverUrl, Long id) {
- String response = super.delete(serverUrl + ACTION + id);
-
- SubmissionBean submissionBean = new SubmissionBean();
- submissionBean.restore((JSONObject) JSONValue.parse(response));
-
return submissionBean;
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/JobBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JobBean.java b/common/src/main/java/org/apache/sqoop/json/JobBean.java
index 082d591..efc2efc 100644
--- a/common/src/main/java/org/apache/sqoop/json/JobBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/JobBean.java
@@ -19,8 +19,6 @@ package org.apache.sqoop.json;
import static org.apache.sqoop.json.util.ConfigInputSerialization.extractConfigList;
import static org.apache.sqoop.json.util.ConfigInputSerialization.restoreConfigList;
-import static org.apache.sqoop.json.util.ConfigBundleSerialization.extractConfigParamBundle;
-import static org.apache.sqoop.json.util.ConfigBundleSerialization.restoreConfigParamBundle;
import java.util.ArrayList;
import java.util.Date;
@@ -39,7 +37,7 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
/**
- * Json representation of the job config
+ * Json representation of the job
*/
public class JobBean implements JsonBean {
@@ -50,6 +48,7 @@ public class JobBean implements JsonBean {
static final String FROM_CONFIG = "from-config";
static final String TO_CONFIG = "to-config";
static final String DRIVER_CONFIG = "driver-config";
+ private static final String JOB = "job";
// Required
private List<MJob> jobs;
@@ -102,9 +101,16 @@ public class JobBean implements JsonBean {
@Override
@SuppressWarnings("unchecked")
public JSONObject extract(boolean skipSensitive) {
- JSONArray array = new JSONArray();
+ JSONArray jobArray = extractJobs(skipSensitive);
+ JSONObject job = new JSONObject();
+ job.put(JOB, jobArray);
+ return job;
+ }
- for(MJob job : jobs) {
+ @SuppressWarnings("unchecked")
+ protected JSONArray extractJobs(boolean skipSensitive) {
+ JSONArray jobArray = new JSONArray();
+ for (MJob job : jobs) {
JSONObject object = new JSONObject();
object.put(ID, job.getPersistenceId());
@@ -115,6 +121,7 @@ public class JobBean implements JsonBean {
object.put(UPDATE_USER, job.getLastUpdateUser());
object.put(UPDATE_DATE, job.getLastUpdateDate().getTime());
// job link associated connectors
+ // TODO(SQOOP-1634): fix not to require the connectorIds in the post data
object.put(FROM_CONNECTOR_ID, job.getConnectorId(Direction.FROM));
object.put(TO_CONNECTOR_ID, job.getConnectorId(Direction.TO));
// job associated links
@@ -122,25 +129,30 @@ public class JobBean implements JsonBean {
object.put(TO_LINK_ID, job.getLinkId(Direction.TO));
// job configs
MFromConfig fromConfigList = job.getFromJobConfig();
- object.put(FROM_CONFIG, extractConfigList(fromConfigList.getConfigs(), fromConfigList.getType(), skipSensitive));
+ object.put(FROM_CONFIG,
+ extractConfigList(fromConfigList.getConfigs(), fromConfigList.getType(), skipSensitive));
MToConfig toConfigList = job.getToJobConfig();
- object.put(TO_CONFIG, extractConfigList(toConfigList.getConfigs(), toConfigList.getType(), skipSensitive));
+ object.put(TO_CONFIG,
+ extractConfigList(toConfigList.getConfigs(), toConfigList.getType(), skipSensitive));
MDriverConfig driverConfigList = job.getDriverConfig();
- object.put(DRIVER_CONFIG, extractConfigList(driverConfigList.getConfigs(), driverConfigList.getType(), skipSensitive));
+ object.put(
+ DRIVER_CONFIG,
+ extractConfigList(driverConfigList.getConfigs(), driverConfigList.getType(),
+ skipSensitive));
- array.add(object);
+ jobArray.add(object);
}
-
- JSONObject all = new JSONObject();
- all.put(ALL, array);
- return all;
+ return jobArray;
}
@Override
public void restore(JSONObject jsonObject) {
- jobs = new ArrayList<MJob>();
+ JSONArray array = (JSONArray) jsonObject.get(JOB);
+ restoreJobs(array);
+ }
- JSONArray array = (JSONArray) jsonObject.get(ALL);
+ protected void restoreJobs(JSONArray array) {
+ jobs = new ArrayList<MJob>();
for (Object obj : array) {
JSONObject object = (JSONObject) obj;
@@ -178,4 +190,4 @@ public class JobBean implements JsonBean {
jobs.add(job);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/JobsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JobsBean.java b/common/src/main/java/org/apache/sqoop/json/JobsBean.java
new file mode 100644
index 0000000..3c454ea
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/json/JobsBean.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import java.util.List;
+
+import org.apache.sqoop.model.MJob;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+/**
+ * Json representation of the jobs
+ */
+public class JobsBean extends JobBean {
+
+ private static final String JOBS = "jobs";
+
+ public JobsBean(MJob job) {
+ super(job);
+ }
+
+ public JobsBean(List<MJob> jobs) {
+ super(jobs);
+ }
+
+ // For "restore"
+ public JobsBean() {
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public JSONObject extract(boolean skipSensitive) {
+ JSONArray jobArray = super.extractJobs(skipSensitive);
+ JSONObject jobs = new JSONObject();
+ jobs.put(JOBS, jobArray);
+ return jobs;
+ }
+
+ @Override
+ public void restore(JSONObject jsonObject) {
+ JSONArray array = (JSONArray) jsonObject.get(JOBS);
+ restoreJobs(array);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/JsonBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JsonBean.java b/common/src/main/java/org/apache/sqoop/json/JsonBean.java
index ba86511..164b604 100644
--- a/common/src/main/java/org/apache/sqoop/json/JsonBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/JsonBean.java
@@ -25,6 +25,7 @@ public interface JsonBean {
static final String CONFIGURABLE_VERSION = "version";
static final String ALL_CONFIGS = "all-configs";
+ @Deprecated // should not be used anymore in the rest api
static final String ALL = "all";
static final String ID = "id";
static final String NAME = "name";
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
index 4b80338..b7bdaad 100644
--- a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
@@ -17,13 +17,8 @@
*/
package org.apache.sqoop.json;
-import org.apache.sqoop.model.MSubmission;
-import org.apache.sqoop.submission.SubmissionStatus;
-import org.apache.sqoop.submission.counter.Counter;
-import org.apache.sqoop.submission.counter.CounterGroup;
-import org.apache.sqoop.submission.counter.Counters;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
+import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema;
+import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema;
import java.util.ArrayList;
import java.util.Date;
@@ -31,15 +26,20 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema;
-import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.Counter;
+import org.apache.sqoop.submission.counter.CounterGroup;
+import org.apache.sqoop.submission.counter.Counters;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
/**
*
*/
public class SubmissionBean implements JsonBean {
- private static final String ALL = "all";
+ private static final String SUBMISSION = "submission";
private static final String JOB = "job";
private static final String CREATION_USER = "creation-user";
private static final String CREATION_DATE = "creation-date";
@@ -52,8 +52,8 @@ public class SubmissionBean implements JsonBean {
private static final String EXCEPTION_TRACE = "exception-trace";
private static final String PROGRESS = "progress";
private static final String COUNTERS = "counters";
- private static final String FROM_SCHEMA = "schema-from";
- private static final String TO_SCHEMA = "schema-to";
+ private static final String FROM_SCHEMA = "from-schema";
+ private static final String TO_SCHEMA = "to-schema";
private List<MSubmission> submissions;
@@ -80,79 +80,83 @@ public class SubmissionBean implements JsonBean {
@Override
@SuppressWarnings("unchecked")
public JSONObject extract(boolean skipSensitive) {
- JSONArray array = new JSONArray();
+ JSONArray submissionArray = extractSubmissions();
+ JSONObject submission = new JSONObject();
+ submission.put(SUBMISSION, submissionArray);
+ return submission;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected JSONArray extractSubmissions() {
+ JSONArray submissionsArray = new JSONArray();
- for(MSubmission submission : this.submissions) {
+ for (MSubmission submission : this.submissions) {
JSONObject object = new JSONObject();
object.put(JOB, submission.getJobId());
object.put(STATUS, submission.getStatus().name());
object.put(PROGRESS, submission.getProgress());
- if(submission.getCreationUser() != null) {
+ if (submission.getCreationUser() != null) {
object.put(CREATION_USER, submission.getCreationUser());
}
- if(submission.getCreationDate() != null) {
+ if (submission.getCreationDate() != null) {
object.put(CREATION_DATE, submission.getCreationDate().getTime());
}
- if(submission.getLastUpdateUser() != null) {
+ if (submission.getLastUpdateUser() != null) {
object.put(LAST_UPDATE_USER, submission.getLastUpdateUser());
}
- if(submission.getLastUpdateDate() != null) {
+ if (submission.getLastUpdateDate() != null) {
object.put(LAST_UPDATE_DATE, submission.getLastUpdateDate().getTime());
}
- if(submission.getExternalId() != null) {
+ if (submission.getExternalId() != null) {
object.put(EXTERNAL_ID, submission.getExternalId());
}
- if(submission.getExternalLink() != null) {
+ if (submission.getExternalLink() != null) {
object.put(EXTERNAL_LINK, submission.getExternalLink());
}
- if(submission.getExceptionInfo() != null) {
+ if (submission.getExceptionInfo() != null) {
object.put(EXCEPTION, submission.getExceptionInfo());
}
- if(submission.getExceptionStackTrace() != null) {
+ if (submission.getExceptionStackTrace() != null) {
object.put(EXCEPTION_TRACE, submission.getExceptionStackTrace());
}
- if(submission.getCounters() != null) {
+ if (submission.getCounters() != null) {
object.put(COUNTERS, extractCounters(submission.getCounters()));
}
- if(submission.getFromSchema() != null) {
+ if (submission.getFromSchema() != null) {
object.put(FROM_SCHEMA, extractSchema(submission.getFromSchema()));
}
- if(submission.getToSchema() != null) {
+ if (submission.getToSchema() != null) {
object.put(TO_SCHEMA, extractSchema(submission.getToSchema()));
}
-
- array.add(object);
+ submissionsArray.add(object);
}
-
- JSONObject all = new JSONObject();
- all.put(ALL, array);
-
- return all;
+ return submissionsArray;
}
@SuppressWarnings("unchecked")
- public JSONObject extractCounters(Counters counters) {
- JSONObject ret = new JSONObject();
- for(CounterGroup group : counters) {
+ private JSONObject extractCounters(Counters counters) {
+ JSONObject counterArray = new JSONObject();
+ for (CounterGroup group : counters) {
JSONObject counterGroup = new JSONObject();
- for(Counter counter : group) {
+ for (Counter counter : group) {
counterGroup.put(counter.getName(), counter.getValue());
}
-
- ret.put(group.getName(), counterGroup);
+ counterArray.put(group.getName(), counterGroup);
}
- return ret;
+ return counterArray;
}
@Override
public void restore(JSONObject json) {
- this.submissions = new ArrayList<MSubmission>();
-
- JSONArray array = (JSONArray) json.get(ALL);
+ JSONArray submissionArray = (JSONArray) json.get(SUBMISSION);
+ restoreSubmissions(submissionArray);
+ }
+ protected void restoreSubmissions(JSONArray array) {
+ this.submissions = new ArrayList<MSubmission>();
for (Object obj : array) {
JSONObject object = (JSONObject) obj;
MSubmission submission = new MSubmission();
@@ -161,38 +165,38 @@ public class SubmissionBean implements JsonBean {
submission.setStatus(SubmissionStatus.valueOf((String) object.get(STATUS)));
submission.setProgress((Double) object.get(PROGRESS));
- if(object.containsKey(CREATION_USER)) {
+ if (object.containsKey(CREATION_USER)) {
submission.setCreationUser((String) object.get(CREATION_USER));
}
- if(object.containsKey(CREATION_DATE)) {
+ if (object.containsKey(CREATION_DATE)) {
submission.setCreationDate(new Date((Long) object.get(CREATION_DATE)));
}
- if(object.containsKey(LAST_UPDATE_USER)) {
+ if (object.containsKey(LAST_UPDATE_USER)) {
submission.setLastUpdateUser((String) object.get(LAST_UPDATE_USER));
}
- if(object.containsKey(LAST_UPDATE_DATE)) {
+ if (object.containsKey(LAST_UPDATE_DATE)) {
submission.setLastUpdateDate(new Date((Long) object.get(LAST_UPDATE_DATE)));
}
- if(object.containsKey(EXTERNAL_ID)) {
+ if (object.containsKey(EXTERNAL_ID)) {
submission.setExternalId((String) object.get(EXTERNAL_ID));
}
- if(object.containsKey(EXTERNAL_LINK)) {
+ if (object.containsKey(EXTERNAL_LINK)) {
submission.setExternalLink((String) object.get(EXTERNAL_LINK));
}
- if(object.containsKey(EXCEPTION)) {
+ if (object.containsKey(EXCEPTION)) {
submission.setExceptionInfo((String) object.get(EXCEPTION));
}
- if(object.containsKey(EXCEPTION_TRACE)) {
+ if (object.containsKey(EXCEPTION_TRACE)) {
submission.setExceptionStackTrace((String) object.get(EXCEPTION_TRACE));
}
- if(object.containsKey(COUNTERS)) {
+ if (object.containsKey(COUNTERS)) {
submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS)));
}
- if(object.containsKey(FROM_SCHEMA)) {
+ if (object.containsKey(FROM_SCHEMA)) {
submission.setFromSchema(restoreSchema((JSONObject) object.get(FROM_SCHEMA)));
}
- if(object.containsKey(TO_SCHEMA)) {
+ if (object.containsKey(TO_SCHEMA)) {
submission.setToSchema(restoreSchema((JSONObject) object.get(TO_SCHEMA)));
}
@@ -200,24 +204,20 @@ public class SubmissionBean implements JsonBean {
}
}
+ @SuppressWarnings("unchecked")
public Counters restoreCounters(JSONObject object) {
Set<Map.Entry<String, JSONObject>> groupSet = object.entrySet();
Counters counters = new Counters();
- for(Map.Entry<String, JSONObject> groupEntry: groupSet) {
-
+ for (Map.Entry<String, JSONObject> groupEntry : groupSet) {
CounterGroup group = new CounterGroup(groupEntry.getKey());
-
Set<Map.Entry<String, Long>> counterSet = groupEntry.getValue().entrySet();
-
- for(Map.Entry<String, Long> counterEntry: counterSet) {
+ for (Map.Entry<String, Long> counterEntry : counterSet) {
Counter counter = new Counter(counterEntry.getKey(), counterEntry.getValue());
group.addCounter(counter);
}
-
counters.addCounterGroup(group);
}
-
return counters;
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java
new file mode 100644
index 0000000..74b6179
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import java.util.List;
+
+import org.apache.sqoop.model.MSubmission;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class SubmissionsBean extends SubmissionBean {
+
+ private static final String SUBMISSIONS = "submissions";
+
+ // For "extract"
+ public SubmissionsBean(MSubmission submission) {
+ super(submission);
+ }
+
+ public SubmissionsBean(List<MSubmission> submissions) {
+ super(submissions);
+
+ }
+
+ // For "restore"
+ public SubmissionsBean() {
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public JSONObject extract(boolean skipSensitive) {
+ JSONArray submissionsArray = super.extractSubmissions();
+ JSONObject submissions = new JSONObject();
+ submissions.put(SUBMISSIONS, submissionsArray);
+ return submissions;
+ }
+
+ @Override
+ public void restore(JSONObject json) {
+ JSONArray submissionsArray = (JSONArray) json.get(SUBMISSIONS);
+ restoreSubmissions(submissionsArray);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/main/java/org/apache/sqoop/model/MSubmission.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MSubmission.java b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
index 7290df5..2648712 100644
--- a/common/src/main/java/org/apache/sqoop/model/MSubmission.java
+++ b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
@@ -36,9 +36,6 @@ public class MSubmission extends MAccountableEntity {
/**
* Job id that this submission object belongs.
*
- * By transitivity of metadata structure you can get also connection and
- * connector ids.
- *
* This property is required and will be always present.
*/
private long jobId;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/TestJobBean.java b/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
index 1fc8dbd..923ad47 100644
--- a/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
+++ b/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
@@ -55,13 +55,13 @@ public class TestJobBean {
// Serialize it to JSON object
JobBean jobBean = new JobBean(job);
- JSONObject jobJson = jobBean.extract(false);
+ JSONObject json = jobBean.extract(false);
// "Move" it across network in text form
- String jobJsonString = jobJson.toJSONString();
+ String jobJsonString = json.toJSONString();
// Retrieved transferred object
- JSONObject parsedJobJson = (JSONObject)JSONValue.parseWithException(jobJsonString);
+ JSONObject parsedJobJson = (JSONObject)JSONValue.parse(jobJsonString);
JobBean parsedJobBean = new JobBean();
parsedJobBean.restore(parsedJobJson);
MJob target = parsedJobBean.getJobs().get(0);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
index e4d50bf..c5b8781 100644
--- a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
+++ b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
@@ -457,13 +457,13 @@ public class TestSubmissionBean {
* @return
*/
private MSubmission transfer(MSubmission submission) {
- SubmissionBean bean = new SubmissionBean(submission);
+ SubmissionsBean bean = new SubmissionsBean(submission);
JSONObject json = bean.extract(false);
String string = json.toString();
JSONObject retrievedJson = (JSONObject) JSONValue.parse(string);
- SubmissionBean retrievedBean = new SubmissionBean();
+ SubmissionsBean retrievedBean = new SubmissionsBean();
retrievedBean.restore(retrievedJson);
return retrievedBean.getSubmissions().get(0);
@@ -476,13 +476,13 @@ public class TestSubmissionBean {
* @return
*/
private List<MSubmission> transfer(List<MSubmission> submissions) {
- SubmissionBean bean = new SubmissionBean(submissions);
+ SubmissionsBean bean = new SubmissionsBean(submissions);
JSONObject json = bean.extract(false);
String string = json.toString();
JSONObject retrievedJson = (JSONObject) JSONValue.parse(string);
- SubmissionBean retrievedBean = new SubmissionBean();
+ SubmissionsBean retrievedBean = new SubmissionsBean();
retrievedBean.restore(retrievedJson);
return retrievedBean.getSubmissions();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/driver/DriverError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverError.java b/core/src/main/java/org/apache/sqoop/driver/DriverError.java
index ddee282..25a1b70 100644
--- a/core/src/main/java/org/apache/sqoop/driver/DriverError.java
+++ b/core/src/main/java/org/apache/sqoop/driver/DriverError.java
@@ -22,6 +22,8 @@ import org.apache.sqoop.common.ErrorCode;
/**
*
*/
+//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): why is this called Driver Error since it is used in JobManager?
+
public enum DriverError implements ErrorCode {
DRIVER_0001("Invalid submission engine"),
@@ -40,6 +42,7 @@ public enum DriverError implements ErrorCode {
DRIVER_0008("Invalid combination of submission and execution engines"),
+ //TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start terminology difference
DRIVER_0009("Job has been disabled. Cannot submit this job."),
DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index ba56c77..36ba1cd 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -269,11 +269,11 @@ public class JobManager implements Reconfigurable {
LOG.info("Submission manager initialized: OK");
}
- public MSubmission submit(long jobId, HttpEventContext ctx) {
+ public MSubmission start(long jobId, HttpEventContext ctx) {
MSubmission mSubmission = createJobSubmission(ctx, jobId);
JobRequest jobRequest = createJobRequest(jobId, mSubmission);
- // Bootstrap job to execute
+ // Bootstrap job to execute in the configured execution engine
prepareJob(jobRequest);
// Make sure that this job id is not currently running and submit the job
// only if it's not.
@@ -283,14 +283,17 @@ public class JobManager implements Reconfigurable {
if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId);
}
- // TODO(jarcec): We might need to catch all exceptions here to ensure
- // that Destroyer will be executed in all cases.
// NOTE: the following is a blocking call
boolean success = submissionEngine.submit(jobRequest);
if (!success) {
- destroySubmission(jobRequest);
+ // TODO(jarcec): We might need to catch all exceptions here to ensure
+ // that Destroyer will be executed in all cases.
+ invokeDestroyerOnJobFailure(jobRequest);
mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
}
+ // persist submission record to repository.
+ // on failure we persist the FAILURE status, on success it is the SUCCESS
+ // status ( which is the default one)
RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
}
return mSubmission;
@@ -435,6 +438,7 @@ public class JobManager implements Reconfigurable {
return job;
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
private void initializeConnector(JobRequest jobRequest, Direction direction) {
Initializer initializer = getConnectorInitializer(jobRequest, direction);
InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
@@ -444,6 +448,7 @@ public class JobManager implements Reconfigurable {
jobRequest.getJobConfig(direction));
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) {
Initializer initializer = getConnectorInitializer(jobRequest, direction);
@@ -453,6 +458,7 @@ public class JobManager implements Reconfigurable {
jobRequest.getJobConfig(direction));
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
Initializer initializer = getConnectorInitializer(jobRequest, direction);
@@ -462,6 +468,7 @@ public class JobManager implements Reconfigurable {
jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)));
}
+ @SuppressWarnings({ "rawtypes" })
private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
Class<? extends Initializer> initializerClass = transferable.getInitializer();
@@ -494,7 +501,8 @@ public class JobManager implements Reconfigurable {
* Callback that will be called only if we failed to submit the job to the
* remote cluster.
*/
- void destroySubmission(JobRequest request) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ void invokeDestroyerOnJobFailure(JobRequest request) {
Transferable from = request.getFrom();
Transferable to = request.getTo();
@@ -520,7 +528,6 @@ public class JobManager implements Reconfigurable {
request.getConnectorContext(Direction.TO), false, request.getSummary()
.getToSchema());
- // destroy submission from connector perspective
fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM),
request.getJobConfig(Direction.FROM));
toDestroyer.destroy(toDestroyerContext, request.getConnectorLinkConfig(Direction.TO),
@@ -534,7 +541,7 @@ public class JobManager implements Reconfigurable {
if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
- + " is not running");
+ + " is not running hence cannot stop");
}
submissionEngine.stop(mSubmission.getExternalId());
@@ -554,8 +561,7 @@ public class JobManager implements Reconfigurable {
if (mSubmission == null) {
return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
}
-
- // If the submission is in running state, let's update it
+ // If the submission isin running state, let's update it
if (mSubmission.getStatus().isRunning()) {
update(mSubmission);
}
@@ -696,4 +702,4 @@ public class JobManager implements Reconfigurable {
LOG.info("Ending submission manager update thread");
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
index 2666320..eed79a5 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
@@ -91,7 +91,7 @@ public class JobRequest {
MutableMapContext toConnectorContext;
/**
- * Framework context (submission specific configuration)
+ * Driver context (submission specific configuration)
*/
MutableMapContext driverContext;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 976223d..f41e60e 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -500,6 +500,20 @@ public class JdbcRepository extends Repository {
});
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public MJob findJob(final String name) {
+ return (MJob) doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) {
+ return handler.findJob(name, conn);
+ }
+ });
+ }
+
/**
* {@inheritDoc}
*/
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 1e22759..4fe1500 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -335,6 +335,15 @@ public abstract class JdbcRepositoryHandler {
public abstract MJob findJob(long jobId, Connection conn);
/**
+ * Find job with given name in repository.
+ *
+ * @param name unique name for the job
+ * @param conn Connection to the repository
+ * @return job for a given name that is present in the repository or null if not present
+ */
+ public abstract MJob findJob(String name, Connection conn);
+
+ /**
* Get all job objects.
*
* @param conn Connection to the repository
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 61d6b9b..76e124e 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -254,6 +254,14 @@ public abstract class Repository {
public abstract MJob findJob(long id);
/**
+ * Find job object with given name.
+ *
+ * @param name unique name for the job
+ * @return job with given name loaded from repository or null if not present
+ */
+ public abstract MJob findJob(String name);
+
+ /**
* Get all job objects.
*
* @return List of all jobs in the repository
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index b324f4f..059a827 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -1593,6 +1593,33 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* {@inheritDoc}
*/
@Override
+ public MJob findJob(String name, Connection conn) {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(STMT_SELECT_JOB_SINGLE_BY_NAME);
+ stmt.setString(1, name);
+
+ List<MJob> jobs = loadJobs(stmt, conn);
+
+ if (jobs.size() != 1) {
+ return null;
+ }
+
+ // Return the first and only one link object
+ return jobs.get(0);
+
+ } catch (SQLException ex) {
+ logException(ex, name);
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
+ } finally {
+ closeStatements(stmt);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public List<MJob> findJobs(Connection conn) {
PreparedStatement stmt = null;
try {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java
index c894d06..6c5fad7 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java
@@ -435,6 +435,10 @@ public final class DerbySchemaInsertUpdateDeleteSelectQuery {
public static final String STMT_SELECT_JOB_SINGLE_BY_ID =
STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_ID + " = ?";
+// DML: Select one specific job
+ public static final String STMT_SELECT_JOB_SINGLE_BY_NAME =
+ STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_NAME + " = ?";
+
// DML: Select all jobs for a Connector
public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE =
STMT_SELECT_JOB
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
index 5547988..8130805 100644
--- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.handler;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@@ -28,16 +29,22 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.driver.Driver;
+import org.apache.sqoop.driver.JobManager;
import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.JobsBean;
import org.apache.sqoop.json.JsonBean;
+import org.apache.sqoop.json.SubmissionBean;
import org.apache.sqoop.json.ValidationResultBean;
import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MPersistableEntity;
+import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
+import org.apache.sqoop.request.HttpEventContext;
import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.common.ServerError;
@@ -46,41 +53,38 @@ import org.apache.sqoop.validation.Status;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-/**
- * Job request handler is supporting following resources:
- *
- * GET /v1/job/:jid
- * Return details about one particular job with id :jid or about all of
- * them if :jid equals to "all".
- *
- * POST /v1/job
- * Create new job
- *
- * PUT /v1/job/:jid
- * Update job with id :jid.
- *
- * PUT /v1/job/:jid/enable
- * Enable job with id :jid
- *
- * PUT /v1/job/:jid/disable
- * Disable job with id :jid
- *
- * DELETE /v1/job/:jid
- * Remove job with id :jid
- *
- * Planned resources:
- *
- * GET /v1/job
- * Get brief list of all jobs present in the system. This resource is not yet
- * implemented.
- */
public class JobRequestHandler implements RequestHandler {
- private static final Logger LOG =
- Logger.getLogger(JobRequestHandler.class);
+ /** enum for representing the actions supported on the job resource*/
+ enum JobAction {
+ ENABLE("enable"),
+ DISABLE("disable"),
+ START("start"),
+ STOP("stop"),
+ ;
+ JobAction(String name) {
+ this.name = name;
+ }
+
+ String name;
+
+ public static JobAction fromString(String name) {
+ if (name != null) {
+ for (JobAction action : JobAction.values()) {
+ if (name.equalsIgnoreCase(action.name)) {
+ return action;
+ }
+ }
+ }
+ return null;
+ }
+ }
+
+ private static final Logger LOG = Logger.getLogger(JobRequestHandler.class);
- private static final String ENABLE = "enable";
- private static final String DISABLE = "disable";
+ static final String JOBS_PATH = "jobs";
+ static final String JOB_PATH = "job";
+ static final String STATUS = "status";
public JobRequestHandler() {
LOG.info("JobRequestHandler initialized");
@@ -89,208 +93,306 @@ public class JobRequestHandler implements RequestHandler {
@Override
public JsonBean handleEvent(RequestContext ctx) {
switch (ctx.getMethod()) {
- case GET:
- return getJobs(ctx);
- case POST:
- return createUpdateJob(ctx, false);
- case PUT:
- if (ctx.getLastURLElement().equals(ENABLE)) {
- return enableJob(ctx, true);
- } else if (ctx.getLastURLElement().equals(DISABLE)) {
- return enableJob(ctx, false);
- } else {
- return createUpdateJob(ctx, true);
- }
- case DELETE:
- return deleteJob(ctx);
+ case GET:
+ if (STATUS.equals(ctx.getLastURLElement())) {
+ return getJobStatus(ctx);
+ }
+ return getJobs(ctx);
+ case POST:
+ return createUpdateJob(ctx, true);
+ case PUT:
+ JobAction action = JobAction.fromString(ctx.getLastURLElement());
+ switch (action) {
+ case ENABLE:
+ return enableJob(ctx, true);
+ case DISABLE:
+ return enableJob(ctx, false);
+ case START:
+ return startJob(ctx);
+ case STOP:
+ return stopJob(ctx);
+ default:
+ return createUpdateJob(ctx, false);
+ }
+ case DELETE:
+ return deleteJob(ctx);
}
return null;
}
/**
- * Delete job from repository.
+ * Delete job from repository.
*
- * @param ctx Context object
+ * @param ctx
+ * Context object
* @return Empty bean
*/
private JsonBean deleteJob(RequestContext ctx) {
- String sxid = ctx.getLastURLElement();
- long jid = Long.valueOf(sxid);
-
- AuditLoggerManager.getInstance()
- .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
- "delete", "job", sxid);
Repository repository = RepositoryManager.getInstance().getRepository();
- repository.deleteJob(jid);
+ String jobIdentifier = ctx.getLastURLElement();
+ long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "delete", "job", jobIdentifier);
+ repository.deleteJob(jobId);
return JsonBean.EMPTY_BEAN;
}
/**
* Update or create job in repository.
*
- * @param ctx Context object
+ * @param ctx
+ * Context object
* @return Validation bean object
*/
- private JsonBean createUpdateJob(RequestContext ctx, boolean update) {
+ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
+
+ Repository repository = RepositoryManager.getInstance().getRepository();
String username = ctx.getUserName();
JobBean bean = new JobBean();
try {
- JSONObject json =
- (JSONObject) JSONValue.parse(ctx.getRequest().getReader());
+ JSONObject json = (JSONObject) JSONValue.parse(ctx.getRequest().getReader());
bean.restore(json);
} catch (IOException e) {
- throw new SqoopException(ServerError.SERVER_0003,
- "Can't read request content", e);
+ throw new SqoopException(ServerError.SERVER_0003, "Can't read request content", e);
}
// Get job object
List<MJob> jobs = bean.getJobs();
- if(jobs.size() != 1) {
- throw new SqoopException(ServerError.SERVER_0003,
- "Expected one job but got " + jobs.size());
+ if (jobs.size() != 1) {
+ throw new SqoopException(ServerError.SERVER_0003, "Expected one job but got " + jobs.size());
}
// Job object
- MJob job = jobs.get(0);
+ MJob postedJob = jobs.get(0);
// Verify that user is not trying to spoof us
MFromConfig fromConfig = ConnectorManager.getInstance()
- .getConnectorConfigurable(job.getConnectorId(Direction.FROM))
- .getFromConfig();
+ .getConnectorConfigurable(postedJob.getConnectorId(Direction.FROM)).getFromConfig();
MToConfig toConfig = ConnectorManager.getInstance()
- .getConnectorConfigurable(job.getConnectorId(Direction.TO))
- .getToConfig();
+ .getConnectorConfigurable(postedJob.getConnectorId(Direction.TO)).getToConfig();
MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig();
- if(!fromConfig.equals(job.getJobConfig(Direction.FROM))
- || !driverConfig.equals(job.getDriverConfig())
- || !toConfig.equals(job.getJobConfig(Direction.TO))) {
- throw new SqoopException(ServerError.SERVER_0003,
- "Detected incorrect config structure");
+ if (!fromConfig.equals(postedJob.getJobConfig(Direction.FROM))
+ || !driverConfig.equals(postedJob.getDriverConfig())
+ || !toConfig.equals(postedJob.getJobConfig(Direction.TO))) {
+ throw new SqoopException(ServerError.SERVER_0003, "Detected incorrect config structure");
+ }
+
+ // if update get the job id from the request URI
+ if (!create) {
+ String jobIdentifier = ctx.getLastURLElement();
+ // support jobName or jobId for the api
+ long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+ if (postedJob.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) {
+ MJob existingJob = repository.findJob(jobId);
+ postedJob.setPersistenceId(existingJob.getPersistenceId());
+ }
}
// Corresponding connectors for this
- SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.FROM));
- SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.TO));
+ SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(
+ postedJob.getConnectorId(Direction.FROM));
+ SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(
+ postedJob.getConnectorId(Direction.TO));
if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
- throw new SqoopException(ServerError.SERVER_0004, "Connector " + fromConnector.getClass().getCanonicalName()
- + " does not support FROM direction.");
+ throw new SqoopException(ServerError.SERVER_0004, "Connector "
+ + fromConnector.getClass().getCanonicalName() + " does not support FROM direction.");
}
if (!toConnector.getSupportedDirections().contains(Direction.TO)) {
- throw new SqoopException(ServerError.SERVER_0004, "Connector " + toConnector.getClass().getCanonicalName()
- + " does not support TO direction.");
+ throw new SqoopException(ServerError.SERVER_0004, "Connector "
+ + toConnector.getClass().getCanonicalName() + " does not support TO direction.");
}
// Validate user supplied data
ConfigValidationResult fromConfigValidator = ConfigUtils.validateConfigs(
- job.getJobConfig(Direction.FROM).getConfigs(),
- fromConnector.getJobConfigurationClass(Direction.FROM)
- );
+ postedJob.getJobConfig(Direction.FROM).getConfigs(),
+ fromConnector.getJobConfigurationClass(Direction.FROM));
ConfigValidationResult toConfigValidator = ConfigUtils.validateConfigs(
- job.getJobConfig(Direction.TO).getConfigs(),
- toConnector.getJobConfigurationClass(Direction.TO)
- );
- ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs(
- job.getDriverConfig().getConfigs(),
- Driver.getInstance().getDriverJobConfigurationClass()
- );
-
- Status finalStatus = Status.getWorstStatus(fromConfigValidator.getStatus(), toConfigValidator.getStatus(), driverConfigValidator.getStatus());
-
+ postedJob.getJobConfig(Direction.TO).getConfigs(),
+ toConnector.getJobConfigurationClass(Direction.TO));
+ ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs(postedJob
+ .getDriverConfig().getConfigs(), Driver.getInstance().getDriverJobConfigurationClass());
+ Status finalStatus = Status.getWorstStatus(fromConfigValidator.getStatus(),
+ toConfigValidator.getStatus(), driverConfigValidator.getStatus());
// Return back validations in all cases
- ValidationResultBean validationResultBean = new ValidationResultBean(fromConfigValidator, toConfigValidator);
+ ValidationResultBean validationResultBean = new ValidationResultBean(fromConfigValidator,
+ toConfigValidator);
// If we're good enough let's perform the action
- if(finalStatus.canProceed()) {
- if(update) {
- AuditLoggerManager.getInstance()
- .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
- "update", "job", String.valueOf(job.getPersistenceId()));
-
- job.setLastUpdateUser(username);
- RepositoryManager.getInstance().getRepository().updateJob(job);
+ if (finalStatus.canProceed()) {
+ if (create) {
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "create", "job",
+ String.valueOf(postedJob.getPersistenceId()));
+
+ postedJob.setCreationUser(username);
+ postedJob.setLastUpdateUser(username);
+ repository.createJob(postedJob);
+ validationResultBean.setId(postedJob.getPersistenceId());
} else {
- job.setCreationUser(username);
- job.setLastUpdateUser(username);
- RepositoryManager.getInstance().getRepository().createJob(job);
- validationResultBean.setId(job.getPersistenceId());
-
- AuditLoggerManager.getInstance()
- .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
- "create", "job", String.valueOf(job.getPersistenceId()));
- }
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "update", "job",
+ String.valueOf(postedJob.getPersistenceId()));
+ postedJob.setLastUpdateUser(username);
+ repository.updateJob(postedJob);
+ }
}
-
return validationResultBean;
}
private JsonBean getJobs(RequestContext ctx) {
- String sjid = ctx.getLastURLElement();
- JobBean bean;
-
- AuditLoggerManager.getInstance()
- .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
- "get", "job", sjid);
-
+ String identifier = ctx.getLastURLElement();
+ JobBean jobBean;
Locale locale = ctx.getAcceptLanguageHeader();
Repository repository = RepositoryManager.getInstance().getRepository();
- if (sjid.equals(JsonBean.ALL)) {
-
- List<MJob> jobs = repository.findJobs();
- bean = new JobBean(jobs);
-
- // Add associated resources into the bean
- for( MJob job : jobs) {
- long fromConnectorId = job.getConnectorId(Direction.FROM);
- long toConnectorId = job.getConnectorId(Direction.TO);
- if(!bean.hasConnectorConfigBundle(fromConnectorId)) {
- bean.addConnectorConfigBundle(fromConnectorId,
- ConnectorManager.getInstance().getResourceBundle(fromConnectorId, locale));
- }
- if(!bean.hasConnectorConfigBundle(toConnectorId)) {
- bean.addConnectorConfigBundle(toConnectorId,
- ConnectorManager.getInstance().getResourceBundle(toConnectorId, locale));
- }
+ // jobs by connector
+ if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) {
+ identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", identifier);
+ if (repository.findConnector(identifier) != null) {
+ long connectorId = repository.findConnector(identifier).getPersistenceId();
+ jobBean = createJobsBean(repository.findJobsForConnector(connectorId), locale);
+ } else {
+ // this means name nor Id existed
+ throw new SqoopException(ServerError.SERVER_0005, "Invalid connector: " + identifier
+ + " name for jobs given");
}
+ } else
+ // all jobs in the system
+ if (ctx.getPath().contains(JOBS_PATH)
+ || (ctx.getPath().contains(JOB_PATH) && identifier.equals("all"))) {
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "get", "jobs", "all");
+ jobBean = createJobsBean(repository.findJobs(), locale);
+ }
+ // job by Id
+ else {
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "get", "job", identifier);
+
+ long jobId = getJobIdFromIdentifier(identifier, repository);
+ List<MJob> jobList = new ArrayList<MJob>();
+ // a list of single element
+ jobList.add(repository.findJob(jobId));
+ jobBean = createJobBean(jobList, locale);
+ }
+ return jobBean;
+ }
+
+ private long getJobIdFromIdentifier(String identifier, Repository repository) {
+ // support jobName or jobId for the api
+ // NOTE: jobId is a fallback for older sqoop clients if any, since we want to primarily use unique jobNames
+ long jobId;
+ if (repository.findJob(identifier) != null) {
+ jobId = repository.findJob(identifier).getPersistenceId();
} else {
- long jid = Long.valueOf(sjid);
+ try {
+ jobId = Long.valueOf(identifier);
+ } catch (NumberFormatException ex) {
+ // this means name nor Id existed and we want to throw a user friendly message than a number format exception
+ throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier
+ + " requested");
+ }
+ }
+ return jobId;
+ }
+
+ private JobBean createJobBean(List<MJob> jobs, Locale locale) {
+ JobBean jobBean = new JobBean(jobs);
+ addJob(jobs, locale, jobBean);
+ return jobBean;
+ }
- MJob job = repository.findJob(jid);
+ private JobsBean createJobsBean(List<MJob> jobs, Locale locale) {
+ JobsBean jobsBean = new JobsBean(jobs);
+ addJob(jobs, locale, jobsBean);
+ return jobsBean;
+ }
+
+ private void addJob(List<MJob> jobs, Locale locale, JobBean bean) {
+ // Add associated resources into the bean
+ for (MJob job : jobs) {
long fromConnectorId = job.getConnectorId(Direction.FROM);
long toConnectorId = job.getConnectorId(Direction.TO);
- bean = new JobBean(job);
- if(!bean.hasConnectorConfigBundle(fromConnectorId)) {
- bean.addConnectorConfigBundle(fromConnectorId,
- ConnectorManager.getInstance().getResourceBundle(fromConnectorId, locale));
+ // replace it only if it does not already exist
+ if (!bean.hasConnectorConfigBundle(fromConnectorId)) {
+ bean.addConnectorConfigBundle(fromConnectorId, ConnectorManager.getInstance()
+ .getResourceBundle(fromConnectorId, locale));
}
- if(!bean.hasConnectorConfigBundle(toConnectorId)) {
- bean.addConnectorConfigBundle(toConnectorId,
- ConnectorManager.getInstance().getResourceBundle(toConnectorId, locale));
+ if (!bean.hasConnectorConfigBundle(toConnectorId)) {
+ bean.addConnectorConfigBundle(toConnectorId, ConnectorManager.getInstance()
+ .getResourceBundle(toConnectorId, locale));
}
}
-
- // set driver config bundle
- bean.setDriverConfigBundle(Driver.getInstance().getBundle(locale));
- return bean;
}
private JsonBean enableJob(RequestContext ctx, boolean enabled) {
+ Repository repository = RepositoryManager.getInstance().getRepository();
String[] elements = ctx.getUrlElements();
- String sjid = elements[elements.length - 2];
- long xid = Long.valueOf(sjid);
+ String jobIdentifier = elements[elements.length - 2];
+ long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+ repository.enableJob(jobId, enabled);
+ return JsonBean.EMPTY_BEAN;
+ }
+ private JsonBean startJob(RequestContext ctx) {
Repository repository = RepositoryManager.getInstance().getRepository();
- repository.enableJob(xid, enabled);
- return JsonBean.EMPTY_BEAN;
+ String[] elements = ctx.getUrlElements();
+ String jobIdentifier = elements[elements.length - 2];
+ long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "submit", "job", String.valueOf(jobId));
+ // TODO(SQOOP-1638): This should be outsourced somewhere more suitable than
+ // here
+ if (JobManager.getInstance().getNotificationBaseUrl() == null) {
+ String url = ctx.getRequest().getRequestURL().toString();
+ JobManager.getInstance().setNotificationBaseUrl(
+ url.split("v1")[0] + "/v1/job/status/notification/");
+ }
+
+ MSubmission submission = JobManager.getInstance()
+ .start(jobId, prepareRequestEventContext(ctx));
+ return new SubmissionBean(submission);
}
+
+ private JsonBean stopJob(RequestContext ctx) {
+ Repository repository = RepositoryManager.getInstance().getRepository();
+ String[] elements = ctx.getUrlElements();
+ String jobIdentifier = elements[elements.length - 2];
+ long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "stop", "job", String.valueOf(jobId));
+ MSubmission submission = JobManager.getInstance().stop(jobId, prepareRequestEventContext(ctx));
+ return new SubmissionBean(submission);
+ }
+
+ private JsonBean getJobStatus(RequestContext ctx) {
+ Repository repository = RepositoryManager.getInstance().getRepository();
+ String[] elements = ctx.getUrlElements();
+ String jobIdentifier = elements[elements.length - 2];
+ long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "status", "job", String.valueOf(jobId));
+ MSubmission submission = JobManager.getInstance().status(jobId);
+ return new SubmissionBean(submission);
+ }
+
+ private HttpEventContext prepareRequestEventContext(RequestContext ctx) {
+ HttpEventContext httpEventContext = new HttpEventContext();
+ httpEventContext.setUsername(ctx.getUserName());
+ return httpEventContext;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
index 8555b0c..cfbb524 100644
--- a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
@@ -22,37 +22,19 @@ import java.util.List;
import org.apache.log4j.Logger;
import org.apache.sqoop.audit.AuditLoggerManager;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.driver.JobManager;
import org.apache.sqoop.json.JsonBean;
-import org.apache.sqoop.json.SubmissionBean;
+import org.apache.sqoop.json.SubmissionsBean;
import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
-import org.apache.sqoop.request.HttpEventContext;
import org.apache.sqoop.server.RequestContext;
+import org.apache.sqoop.server.RequestContext.Method;
import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.common.ServerError;
-/**
- * Submission request handler is supporting following resources:
- *
- * GET /v1/submission/action/:jid
- * Get status of last submission for job with id :jid
- *
- * POST /v1/submission/action/:jid
- * Create new submission for job with id :jid
- *
- * DELETE /v1/submission/action/:jid
- * Stop last submission for job with id :jid
- *
- * GET /v1/submission/notification/:jid
- * Notification endpoint to get job status outside normal interval
- *
- * Possible additions in the future: /v1/submission/history/* for history.
- */
public class SubmissionRequestHandler implements RequestHandler {
- private static final Logger LOG =
- Logger.getLogger(SubmissionRequestHandler.class);
+ private static final Logger LOG = Logger.getLogger(SubmissionRequestHandler.class);
public SubmissionRequestHandler() {
LOG.info("SubmissionRequestHandler initialized");
@@ -60,110 +42,45 @@ public class SubmissionRequestHandler implements RequestHandler {
@Override
public JsonBean handleEvent(RequestContext ctx) {
- String[] urlElements = ctx.getUrlElements();
- if (urlElements.length < 2) {
- throw new SqoopException(ServerError.SERVER_0003,
- "Invalid URL, too few arguments for this servlet.");
- }
- // Let's check
- int length = urlElements.length;
- String action = urlElements[length - 2];
-
- if(action.equals("action")) {
- return handleActionEvent(ctx, urlElements[length - 1]);
+ // submission only support GET requests
+ if (ctx.getMethod() != Method.GET) {
+ throw new SqoopException(ServerError.SERVER_0002, "Unsupported HTTP method for connector:"
+ + ctx.getMethod());
}
-
- if(action.equals("notification")) {
- return handleNotification(ctx, urlElements[length - 1]);
- }
-
- if(action.equals("history")) {
- return handleHistoryEvent(ctx, urlElements[length - 1]);
- }
-
- throw new SqoopException(ServerError.SERVER_0003,
- "Do not know what to do.");
- }
-
- private JsonBean handleNotification(RequestContext ctx, String sjid) {
- LOG.debug("Received notification request for job " + sjid);
- JobManager.getInstance().status(Long.parseLong(sjid));
- return JsonBean.EMPTY_BEAN;
- }
-
- private JsonBean handleActionEvent(RequestContext ctx, String sjid) {
- long jid = Long.parseLong(sjid);
-
- String username = ctx.getUserName();
- HttpEventContext ectx = new HttpEventContext();
- ectx.setUsername(username);
-
- switch (ctx.getMethod()) {
- case GET:
- AuditLoggerManager.getInstance()
- .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
- "status", "submission", String.valueOf(jid));
-
- return submissionStatus(jid);
- case POST:
- // TODO: This should be outsourced somewhere more suitable than here
- if(JobManager.getInstance().getNotificationBaseUrl() == null) {
- String url = ctx.getRequest().getRequestURL().toString();
- JobManager.getInstance().setNotificationBaseUrl(
- url.split("v1")[0] + "/v1/submission/notification/");
- }
-
- AuditLoggerManager.getInstance()
- .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
- "submit", "submission", String.valueOf(jid));
-
- return submissionSubmit(jid, ectx);
- case DELETE:
- AuditLoggerManager.getInstance()
- .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
- "stop", "submission", String.valueOf(jid));
-
- return submissionStop(jid, ectx);
- }
-
- return null;
- }
-
- private JsonBean handleHistoryEvent(RequestContext ctx, String sjid) {
- AuditLoggerManager.getInstance()
- .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
- "get", "submission", sjid);
-
- if (sjid.equals("all")) {
- return getSubmissions();
+ String identifier = ctx.getLastURLElement();
+ Repository repository = RepositoryManager.getInstance().getRepository();
+ // links by connector ordered by updated time
+ // hence the latest submission is on the top
+ if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) {
+ identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "get", "submissionsByJob", identifier);
+ if (repository.findJob(identifier) != null) {
+ long jobId = repository.findJob(identifier).getPersistenceId();
+ return getSubmissionsForJob(jobId);
+ } else {
+ // this means name nor Id existed
+ throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier
+ + " name given");
+ }
} else {
- return getSubmissionsForJob(Long.parseLong(sjid));
+ // all submissions in the system
+ AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
+ ctx.getRequest().getRemoteAddr(), "get", "submissions", "all");
+ return getSubmissions();
}
}
- private JsonBean submissionStop(long jid, HttpEventContext ctx) {
- MSubmission submission = JobManager.getInstance().stop(jid, ctx);
- return new SubmissionBean(submission);
- }
-
- private JsonBean submissionSubmit(long jid, HttpEventContext ctx) {
- MSubmission submission = JobManager.getInstance().submit(jid, ctx);
- return new SubmissionBean(submission);
- }
-
- private JsonBean submissionStatus(long jid) {
- MSubmission submission = JobManager.getInstance().status(jid);
- return new SubmissionBean(submission);
- }
-
private JsonBean getSubmissions() {
- List<MSubmission> submissions = RepositoryManager.getInstance().getRepository().findSubmissions();
- return new SubmissionBean(submissions);
+ List<MSubmission> submissions = RepositoryManager.getInstance().getRepository()
+ .findSubmissions();
+ return new SubmissionsBean(submissions);
}
private JsonBean getSubmissionsForJob(long jid) {
- List<MSubmission> submissions = RepositoryManager.getInstance().getRepository().findSubmissionsForJob(jid);
- return new SubmissionBean(submissions);
+ List<MSubmission> submissions = RepositoryManager.getInstance().getRepository()
+ .findSubmissionsForJob(jid);
+ return new SubmissionsBean(submissions);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java
index d295237..0d15d0a 100644
--- a/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java
+++ b/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java
@@ -23,7 +23,53 @@ import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.SqoopProtocolServlet;
+
/**
+ * Provides operations for job resource
+ *
+ * GET /v1/job/{jid}
+ * Return details about one particular job with id:jid or about all of
+ * them if jid equals to "all"
+ *
+ * POST /v1/job
+ * Create new job
+ * POST /v1/job/ with {from-link-id}, {to-link-id} and other job details in the post data
+ * Create job with from and to link
+ * PUT /v1/link/ with {from-link-id}, {to-link-id} and other job details in the post data
+ * Edit/Update job for the from and to link
+ *
+ * PUT /v1/job/{jid} and the job details in the post data
+ * Update job with id jid.
+ *
+ * PUT /v1/job/{jid}/enable
+ * Enable job with id jid
+ * PUT /v1/job/{jname}s/disable
+ * Enable job with name jname
+ *
+ * PUT /v1/job/{jid}/disable
+ * Disable job with id jid
+ * PUT /v1/job/{jname}/disable
+ * Disable job with name jname
+ *
+ * DELETE /v1/job/{jid}
+ * Remove job with id jid
+ * DELETE /v1/job/{jname}
+ * Remove job with name jname
+ *
+ * PUT /v1/job/{jid}/submit
+ * Submit job with id jid to create a submission record
+ * PUT /v1/job/{jname}/submit
+ * Submit job with name jname to create a submission record
+ *
+ * PUT /v1/job/{jid}/stop
+ * Abort/Stop last running associated submission with job id jid
+ * PUT /v1/job/{jname}/stop
+ * Abort/Stop last running associated submission with job name jname
+ *
+ * GET /v1/job/{jid}/status
+ * get status of running job with job id jid
+ * GET /v1/job/{jname}/status
+ * get status of running job with job name jname
*
*/
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/sqoop/blob/268a4755/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java
new file mode 100644
index 0000000..5184a0b
--- /dev/null
+++ b/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.server.v1;
+
+import org.apache.sqoop.handler.JobRequestHandler;
+import org.apache.sqoop.json.JsonBean;
+import org.apache.sqoop.server.RequestContext;
+import org.apache.sqoop.server.RequestHandler;
+import org.apache.sqoop.server.SqoopProtocolServlet;
+
+
+/**
+ * Displays all or jobs per connector in sqoop
+ *
+ * GET /v1/jobs
+ * Return details about every jobs that exists in the sqoop system
+ * GET /v1/jobs?cname=
+ * Return details about job(s) for a given connector name {cname}
+*/
+@SuppressWarnings("serial")
+public class JobsServlet extends SqoopProtocolServlet {
+
+ private RequestHandler jobRequestHandler;
+
+ public JobsServlet() {
+ jobRequestHandler = new JobRequestHandler();
+ }
+
+ @Override
+ protected JsonBean handleGetRequest(RequestContext ctx) throws Exception {
+ return jobRequestHandler.handleEvent(ctx);
+ }
+}
|