Updated Branches:
refs/heads/sqoop2 a69b1cc66 -> c4467c677
SQOOP-659: Design metadata upgrade procedure
(Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c4467c67
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c4467c67
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c4467c67
Branch: refs/heads/sqoop2
Commit: c4467c6770c99b4f7e231ee0af162fa825791656
Parents: a69b1cc
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Thu Apr 18 21:15:28 2013 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Thu Apr 18 21:15:28 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/sqoop/model/MConnection.java | 12 +
.../src/main/java/org/apache/sqoop/model/MJob.java | 17 +
.../sqoop/connector/jdbc/GenericJdbcConnector.java | 6 +
.../jdbc/GenericJdbcConnectorMetadataUpgrader.java | 70 ++++
.../apache/sqoop/repository/JdbcRepository.java | 149 ++++++++-
.../sqoop/repository/JdbcRepositoryHandler.java | 118 +++++--
.../org/apache/sqoop/repository/Repository.java | 266 +++++++++++++-
.../sqoop/repository/derby/DerbyRepoError.java | 2 +
.../repository/derby/DerbyRepositoryHandler.java | 202 +++++++++---
.../sqoop/repository/derby/DerbySchemaQuery.java | 52 +++-
.../sqoop/connector/spi/MetadataUpgrader.java | 47 +++
.../apache/sqoop/connector/spi/SqoopConnector.java | 7 +
12 files changed, 841 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/common/src/main/java/org/apache/sqoop/model/MConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConnection.java b/common/src/main/java/org/apache/sqoop/model/MConnection.java
index 36dca42..c31eafd 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConnection.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConnection.java
@@ -57,6 +57,18 @@ public class MConnection extends MAccountableEntity {
return connectorId;
}
+ public void setConnectorPart(MConnectionForms connectorPart) {
+ this.connectorPart = connectorPart;
+ }
+
+ public void setFrameworkPart(MConnectionForms frameworkPart) {
+ this.frameworkPart = frameworkPart;
+ }
+
+ public void setConnectorId(long connectorId) {
+ this.connectorId = connectorId;
+ }
+
public MConnectionForms getConnectorPart() {
return connectorPart;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/common/src/main/java/org/apache/sqoop/model/MJob.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java
index a53f04e..5b50bfd 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJob.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJob.java
@@ -98,10 +98,18 @@ public class MJob extends MAccountableEntity {
return connectionId;
}
+ public void setConnectionId(long connectionId) {
+ this.connectionId = connectionId;
+ }
+
public long getConnectorId() {
return connectorId;
}
+ public void setConnectorId(long connectorId) {
+ this.connectorId = connectorId;
+ }
+
public MJobForms getConnectorPart() {
return connectorPart;
}
@@ -110,6 +118,15 @@ public class MJob extends MAccountableEntity {
return frameworkPart;
}
+ public void setConnectorPart(MJobForms connectorPart) {
+ this.connectorPart = connectorPart;
+ }
+
+ public void setFrameworkPart(MJobForms frameworkPart) {
+ this.frameworkPart = frameworkPart;
+ }
+
+
public Type getType() {
return type;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
index c315e48..11c10de 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
@@ -24,6 +24,7 @@ import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.job.etl.Exporter;
import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -94,4 +95,9 @@ public class GenericJdbcConnector extends SqoopConnector {
return new GenericJdbcValidator();
}
+ @Override
+ public MetadataUpgrader getMetadataUpgrader() {
+ return new GenericJdbcConnectorMetadataUpgrader();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
new file mode 100644
index 0000000..cd461f4
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MForm;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.validation.Status;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GenericJdbcConnectorMetadataUpgrader extends MetadataUpgrader {
+ /*
+ * For now, there is no real upgrade. So copy all data over,
+ * set the validation messages and error messages to be the same as for the
+ * inputs in the original one.
+ */
+
+ @Override
+ public void upgrade(MConnectionForms original,
+ MConnectionForms upgradeTarget) {
+ doUpgrade(original.getForms(), upgradeTarget.getForms());
+ }
+
+ @Override
+ public void upgrade(MJobForms original, MJobForms upgradeTarget) {
+ doUpgrade(original.getForms(), upgradeTarget.getForms());
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doUpgrade(List<MForm> original, List<MForm> target) {
+ // Easier to find the form in the original forms list if we use a map.
+ // Since the constructor of MJobForms takes a list,
+ // index is not guaranteed to be the same, so we need to look for
+ // equivalence
+ Map<String, MForm> formMap = new HashMap<String, MForm>();
+ for (MForm form : original) {
+ formMap.put(form.getName(), form);
+ }
+ for (MForm form : target) {
+ List<MInput<?>> inputs = form.getInputs();
+ MForm originalForm = formMap.get(form.getName());
+ for (MInput input : inputs) {
+ MInput originalInput = originalForm.getInput(input.getName());
+ input.setValue(originalInput.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 32df1e5..b2259ce 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -29,7 +29,7 @@ import org.apache.sqoop.model.MFramework;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
-public class JdbcRepository implements Repository {
+public class JdbcRepository extends Repository {
private static final Logger LOG =
Logger.getLogger(JdbcRepository.class);
@@ -58,27 +58,42 @@ public class JdbcRepository implements Repository {
Object doIt(Connection conn) throws Exception;
}
+ private Object doWithConnection(DoWithConnection delegator) {
+ return doWithConnection(delegator, null);
+ }
+
/**
* Handle transaction and connection functionality and delegate action to
* given delegator.
*
* @param delegator Code for specific action
+ * @param tx The transaction to use for the operation. If a transaction is
+ * specified, this method will not commit, rollback or close it.
+ * If null, a new transaction will be created - which will be
+ * committed/closed/rolled back.
* @return Arbitrary value
*/
- private Object doWithConnection(DoWithConnection delegator) {
- JdbcRepositoryTransaction tx = null;
+ private Object doWithConnection(DoWithConnection delegator,
+ JdbcRepositoryTransaction tx) {
+ boolean shouldCloseTxn = false;
try {
// Get transaction and connection
- tx = getTransaction();
- tx.begin();
- Connection conn = tx.getConnection();
+ Connection conn;
+ if (tx == null) {
+ tx = getTransaction();
+ shouldCloseTxn = true;
+ tx.begin();
+ }
+ conn = tx.getConnection();
// Delegate the functionality to our delegator
Object returnValue = delegator.doIt(conn);
- // Commit transaction
- tx.commit();
+ if (shouldCloseTxn) {
+ // Commit transaction
+ tx.commit();
+ }
// Return value that the underlying code needs to return
return returnValue;
@@ -86,12 +101,12 @@ public class JdbcRepository implements Repository {
} catch (SqoopException ex) {
throw ex;
} catch (Exception ex) {
- if (tx != null) {
+ if (tx != null && shouldCloseTxn) {
tx.rollback();
}
throw new SqoopException(RepositoryError.JDBCREPO_0012, ex);
} finally {
- if (tx != null) {
+ if (tx != null && shouldCloseTxn) {
tx.close();
}
}
@@ -121,11 +136,20 @@ public class JdbcRepository implements Repository {
handler.registerConnector(mConnector, conn);
return mConnector;
} else {
+ // Same connector, check if the version is the same.
+ // For now, use the "string" versions itself - later we should
+ // probably include a build number or something that is
+ // monotonically increasing.
+ if (result.getUniqueName().equals(mConnector.getUniqueName()) &&
+ mConnector.getVersion().compareTo(result.getVersion()) > 0) {
+ upgradeConnector(result, mConnector);
+ return mConnector;
+ }
if (!result.equals(mConnector)) {
throw new SqoopException(RepositoryError.JDBCREPO_0013,
"Connector: " + mConnector.getUniqueName()
- + " given: " + mConnector
- + " found: " + result);
+ + " given: " + mConnector
+ + " found: " + result);
}
return result;
}
@@ -137,6 +161,19 @@ public class JdbcRepository implements Repository {
* {@inheritDoc}
*/
@Override
+ public MConnector findConnector(final String shortName) {
+ return (MConnector) doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) throws Exception {
+ return handler.findConnector(shortName, conn);
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public MFramework registerFramework(final MFramework mFramework) {
return (MFramework) doWithConnection(new DoWithConnection() {
@Override
@@ -179,6 +216,15 @@ public class JdbcRepository implements Repository {
*/
@Override
public void updateConnection(final MConnection connection) {
+ updateConnection(connection, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void updateConnection(final MConnection connection,
+ RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
@@ -193,7 +239,7 @@ public class JdbcRepository implements Repository {
handler.updateConnection(connection, conn);
return null;
}
- });
+ }, (JdbcRepositoryTransaction) tx);
}
/**
@@ -269,6 +315,14 @@ public class JdbcRepository implements Repository {
*/
@Override
public void updateJob(final MJob job) {
+ updateJob(job, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void updateJob(final MJob job, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
@@ -283,7 +337,7 @@ public class JdbcRepository implements Repository {
handler.updateJob(job, conn);
return null;
}
- });
+ }, (JdbcRepositoryTransaction) tx);
}
/**
@@ -420,4 +474,71 @@ public class JdbcRepository implements Repository {
}
});
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<MConnection> findConnectionsForConnector(final long
+ connectorID) {
+ return (List<MConnection>) doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) throws Exception {
+ return handler.findConnectionsForConnector(connectorID, conn);
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<MJob> findJobsForConnector(final long connectorID) {
+ return (List<MJob>) doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) throws Exception {
+ return handler.findJobsForConnector(connectorID, conn);
+ }
+ });
+ }
+
+ @Override
+ protected void deleteJobInputs(final long jobID, RepositoryTransaction tx) {
+ doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) throws Exception {
+ handler.deleteJobInputs(jobID, conn);
+ return null;
+ }
+ }, (JdbcRepositoryTransaction) tx);
+
+ }
+
+ @Override
+ protected void deleteConnectionInputs(final long connectionID,
+ RepositoryTransaction tx) {
+ doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) throws Exception {
+ handler.deleteConnectionInputs(connectionID, conn);
+ return null;
+ }
+ }, (JdbcRepositoryTransaction) tx);
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void updateConnector(final MConnector newConnector,
+ RepositoryTransaction tx) {
+ doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) throws Exception {
+ handler.updateConnector(newConnector, conn);
+ return null;
+ }
+ }, (JdbcRepositoryTransaction) tx);
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index ca51313..1f88b6d 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -30,14 +30,14 @@ import org.apache.sqoop.model.MSubmission;
/**
* Set of methods required from each JDBC based repository.
*/
-public interface JdbcRepositoryHandler {
+public abstract class JdbcRepositoryHandler {
/**
* Initialize JDBC based repository.
*
* @param repoContext Context for this instance
*/
- void initialize(JdbcRepositoryContext repoContext);
+ public abstract void initialize(JdbcRepositoryContext repoContext);
/**
* Search for connector with given name in repository.
@@ -49,7 +49,7 @@ public interface JdbcRepositoryHandler {
* @return null if connector is not yet registered in repository or
* loaded representation.
*/
- MConnector findConnector(String shortName, Connection conn);
+ public abstract MConnector findConnector(String shortName, Connection conn);
/**
* Register given connector in repository.
@@ -60,7 +60,42 @@ public interface JdbcRepositoryHandler {
* @param mc Connector that should be registered.
* @param conn JDBC connection for querying repository.
*/
- void registerConnector(MConnector mc, Connection conn);
+ public abstract void registerConnector(MConnector mc, Connection conn);
+
+
+ /**
+ * Retrieve connections which use the given connector.
+ * @param connectorID Connector ID whose connections should be fetched
+ * @param conn JDBC connection for querying repository
+ * @return List of MConnections that use <code>connectorID</code>.
+ */
+ public abstract List<MConnection> findConnectionsForConnector(long
+ connectorID, Connection conn);
+
+ /**
+ * Retrieve jobs which use the given connection.
+ *
+ * @param connectorID Connector ID whose jobs should be fetched
+ * @param conn JDBC connection for querying repository
+ * @return List of MJobs that use <code>connectionID</code>.
+ */
+ public abstract List<MJob> findJobsForConnector(long connectorID,
+ Connection conn);
+
+ /**
+ * Update the connector with the new data supplied in the <tt>newConnector</tt>.
+ * Also Update all forms associated with this connector in the repository
+ * with the forms specified in <tt>mConnector</tt>. <tt>mConnector </tt> must
+ * minimally have the connectorID and all required forms (including ones
+ * which may not have changed). After this operation the repository is
+ * guaranteed to only have the new forms specified in this object.
+ *
+ * @param mConnector The new data to be inserted into the repository for
+ * this connector.
+ * @param conn JDBC connection for querying repository
+ */
+
+ public abstract void updateConnector(MConnector mConnector, Connection conn);
/**
* Search for framework metadata in the repository.
@@ -69,7 +104,7 @@ public interface JdbcRepositoryHandler {
* @return null if framework metadata are not yet present in repository or
* loaded representation.
*/
- MFramework findFramework(Connection conn);
+ public abstract MFramework findFramework(Connection conn);
/**
* Register framework metadata in repository.
@@ -80,26 +115,26 @@ public interface JdbcRepositoryHandler {
* @param mf Framework metadata that should be registered.
* @param conn JDBC connection for querying repository.
*/
- void registerFramework(MFramework mf, Connection conn);
+ public abstract void registerFramework(MFramework mf, Connection conn);
/**
* Check if schema is already present in the repository.
*
* @return true if schema is already present or false if it's not
*/
- boolean schemaExists();
+ public abstract boolean schemaExists();
/**
* Create required schema in repository.
*/
- void createSchema();
+ public abstract void createSchema();
/**
* Termination callback for repository.
*
* Should clean up all resources and commit all uncommitted data.
*/
- void shutdown();
+ public abstract void shutdown();
/**
* Specify query that Sqoop framework can use to validate connection to
@@ -108,7 +143,7 @@ public interface JdbcRepositoryHandler {
* @return Query or NULL in case that this repository do not support or do not
* want to validate live connections.
*/
- String validationQuery();
+ public abstract String validationQuery();
/**
* Save given connection to repository. This connection must not be already
@@ -117,7 +152,8 @@ public interface JdbcRepositoryHandler {
* @param connection Connection object to serialize into repository.
* @param conn Connection to metadata repository
*/
- void createConnection(MConnection connection, Connection conn);
+ public abstract void createConnection(MConnection connection,
+ Connection conn);
/**
* Update given connection representation in repository. This connection
@@ -127,7 +163,8 @@ public interface JdbcRepositoryHandler {
* @param connection Connection object that should be updated in repository.
* @param conn Connection to metadata repository
*/
- void updateConnection(MConnection connection, Connection conn);
+ public abstract void updateConnection(MConnection connection,
+ Connection conn);
/**
* Check if given connection exists in metastore.
@@ -136,7 +173,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return True if the connection exists
*/
- boolean existsConnection(long connetionId, Connection conn);
+ public abstract boolean existsConnection(long connetionId, Connection conn);
/**
* Check if given Connection id is referenced somewhere and thus can't
@@ -146,7 +183,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return
*/
- boolean inUseConnection(long connectionId, Connection conn);
+ public abstract boolean inUseConnection(long connectionId, Connection conn);
/**
* Delete connection with given id from metadata repository.
@@ -154,16 +191,24 @@ public interface JdbcRepositoryHandler {
* @param connectionId Connection object that should be removed from repository
* @param conn Connection to metadata repository
*/
- void deleteConnection(long connectionId, Connection conn);
+ public abstract void deleteConnection(long connectionId, Connection conn);
/**
+ * Delete the input values for the connection with given id from the
+ * repository.
+ * @param id Connection object whose inputs should be removed from repository
+ * @param conn Connection to metadata repository
+ */
+ public abstract void deleteConnectionInputs(long id, Connection conn);
+ /**
* Find connection with given id in repository.
*
* @param connectionId Connection id
* @param conn Connection to metadata repository
* @return Deserialized form of the connection that is saved in repository
*/
- MConnection findConnection(long connectionId, Connection conn);
+ public abstract MConnection findConnection(long connectionId,
+ Connection conn);
/**
* Get all connection objects.
@@ -171,7 +216,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return List will all saved connection objects
*/
- List<MConnection> findConnections(Connection conn);
+ public abstract List<MConnection> findConnections(Connection conn);
/**
@@ -181,7 +226,7 @@ public interface JdbcRepositoryHandler {
* @param job Job object to serialize into repository.
* @param conn Connection to metadata repository
*/
- void createJob(MJob job, Connection conn);
+ public abstract void createJob(MJob job, Connection conn);
/**
* Update given job representation in repository. This job object must
@@ -191,7 +236,7 @@ public interface JdbcRepositoryHandler {
* @param job Job object that should be updated in repository.
* @param conn Connection to metadata repository
*/
- void updateJob(MJob job, Connection conn);
+ public abstract void updateJob(MJob job, Connection conn);
/**
* Check if given job exists in metastore.
@@ -200,7 +245,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return True if the job exists
*/
- boolean existsJob(long jobId, Connection conn);
+ public abstract boolean existsJob(long jobId, Connection conn);
/**
* Check if given job id is referenced somewhere and thus can't
@@ -210,15 +255,23 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return
*/
- boolean inUseJob(long jobId, Connection conn);
+ public abstract boolean inUseJob(long jobId, Connection conn);
+
/**
- * Delete job with given id from metadata repository.
+ * Delete the input values for the job with given id from the repository.
+ * @param id Job object whose inputs should be removed from repository
+ * @param conn Connection to metadata repository
+ */
+ public abstract void deleteJobInputs(long id, Connection conn);
+ /**
+ * Delete job with given id from metadata repository. This method will
+ * delete all inputs for this job also.
*
* @param jobId Job object that should be removed from repository
* @param conn Connection to metadata repository
*/
- void deleteJob(long jobId, Connection conn);
+ public abstract void deleteJob(long jobId, Connection conn);
/**
* Find job with given id in repository.
@@ -227,7 +280,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return Deserialized form of the job that is present in the repository
*/
- MJob findJob(long jobId, Connection conn);
+ public abstract MJob findJob(long jobId, Connection conn);
/**
* Get all job objects.
@@ -235,7 +288,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return List will all saved job objects
*/
- List<MJob> findJobs(Connection conn);
+ public abstract List<MJob> findJobs(Connection conn);
/**
* Save given submission in repository.
@@ -243,7 +296,8 @@ public interface JdbcRepositoryHandler {
* @param submission Submission object
* @param conn Connection to metadata repository
*/
- void createSubmission(MSubmission submission, Connection conn);
+ public abstract void createSubmission(MSubmission submission,
+ Connection conn);
/**
* Check if submission with given id already exists in repository.
@@ -251,7 +305,7 @@ public interface JdbcRepositoryHandler {
* @param submissionId Submission internal id
* @param conn Connection to metadata repository
*/
- boolean existsSubmission(long submissionId, Connection conn);
+ public abstract boolean existsSubmission(long submissionId, Connection conn);
/**
* Update given submission in repository.
@@ -259,7 +313,8 @@ public interface JdbcRepositoryHandler {
* @param submission Submission object
* @param conn Connection to metadata repository
*/
- void updateSubmission(MSubmission submission, Connection conn);
+ public abstract void updateSubmission(MSubmission submission,
+ Connection conn);
/**
* Remove submissions older then threshold from repository.
@@ -267,7 +322,7 @@ public interface JdbcRepositoryHandler {
* @param threshold Threshold date
* @param conn Connection to metadata repository
*/
- void purgeSubmissions(Date threshold, Connection conn);
+ public abstract void purgeSubmissions(Date threshold, Connection conn);
/**
* Return list of unfinished submissions (as far as repository is concerned).
@@ -275,7 +330,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return List of unfinished submissions.
*/
- List<MSubmission> findSubmissionsUnfinished(Connection conn);
+ public abstract List<MSubmission> findSubmissionsUnfinished(Connection conn);
/**
* Find last submission for given jobId.
@@ -284,5 +339,6 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return Most recent submission
*/
- MSubmission findSubmissionLastForJob(long jobId, Connection conn);
+ public abstract MSubmission findSubmissionLastForJob(long jobId,
+ Connection conn);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index d6ec303..57c9be4 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -17,12 +17,27 @@
*/
package org.apache.sqoop.repository;
+import org.apache.sqoop.common.ErrorCode;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MEnumInput;
+import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MIntegerInput;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.model.MMapInput;
+import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.model.ModelError;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -32,9 +47,9 @@ import java.util.List;
* Sqoop to store metadata, statistics and other state relevant to Sqoop
* Jobs in the system.
*/
-public interface Repository {
+public abstract class Repository {
- RepositoryTransaction getTransaction();
+ public abstract RepositoryTransaction getTransaction();
/**
* Registers given connector in the repository and return registered
@@ -44,7 +59,18 @@ public interface Repository {
* @param mConnector the connector metadata to be registered
* @return Registered connector structure
*/
- MConnector registerConnector(MConnector mConnector);
+ public abstract MConnector registerConnector(MConnector mConnector);
+
+ /**
+ * Search for connector with given name in repository.
+ *
+ * And return corresponding metadata structure.
+ *
+ * @param shortName Connector unique name
+ * @return null if connector is not yet registered in repository or
+ * loaded representation.
+ */
+ public abstract MConnector findConnector(String shortName);
/**
@@ -55,7 +81,7 @@ public interface Repository {
* @param mFramework framework metadata to be registered
* @return Registered connector structure
*/
- MFramework registerFramework(MFramework mFramework);
+ public abstract MFramework registerFramework(MFramework mFramework);
/**
* Save given connection to repository. This connection must not be already
@@ -63,7 +89,16 @@ public interface Repository {
*
* @param connection Connection object to serialize into repository.
*/
- void createConnection(MConnection connection);
+ public abstract void createConnection(MConnection connection);
+
+ /**
+ * Update given connection representation in repository. This connection
+ * object must already exists in the repository otherwise exception will be
+ * thrown.
+ *
+ * @param connection Connection object that should be updated in repository.
+ */
+ public abstract void updateConnection(MConnection connection);
/**
* Update given connection representation in repository. This connection
@@ -71,15 +106,20 @@ public interface Repository {
* thrown.
*
* @param connection Connection object that should be updated in repository.
+ * @param tx The repository transaction to use to push the data to the
+ * repository. If this is null, a new transaction will be created.
+ * method will not call begin, commit,
+ * rollback or close on this transaction.
*/
- void updateConnection(MConnection connection);
+ public abstract void updateConnection(final MConnection connection,
+ RepositoryTransaction tx);
/**
* Delete connection with given id from metadata repository.
*
* @param id Connection object that should be removed from repository
*/
- void deleteConnection(long id);
+ public abstract void deleteConnection(long id);
/**
* Find connection with given id in repository.
@@ -87,14 +127,14 @@ public interface Repository {
* @param id Connection id
* @return Deserialized form of the connection that is saved in repository
*/
- MConnection findConnection(long id);
+ public abstract MConnection findConnection(long id);
/**
* Get all connection objects.
*
* @return List will all saved connection objects
*/
- List<MConnection> findConnections();
+ public abstract List<MConnection> findConnections();
/**
* Save given job to repository. This job object must not be already present
@@ -102,7 +142,7 @@ public interface Repository {
*
* @param job Job object that should be saved to repository
*/
- void createJob(MJob job);
+ public abstract void createJob(MJob job);
/**
* Update given job metadata in repository. This object must already be saved
@@ -110,14 +150,26 @@ public interface Repository {
*
* @param job Job object that should be updated in the repository
*/
- void updateJob(MJob job);
+ public abstract void updateJob(MJob job);
+
+ /**
+ * Update given job metadata in repository. This object must already be saved
+ * in repository otherwise exception will be thrown.
+ *
+ * @param job Job object that should be updated in the repository
+ * @param tx The repository transaction to use to push the data to the
+ * repository. If this is null, a new transaction will be created.
+ * method will not call begin, commit,
+ * rollback or close on this transaction.
+ */
+ public abstract void updateJob(MJob job, RepositoryTransaction tx);
/**
* Delete job with given id from metadata repository.
*
* @param id Job id that should be removed
*/
- void deleteJob(long id);
+ public abstract void deleteJob(long id);
/**
* Find job object with given id.
@@ -125,42 +177,42 @@ public interface Repository {
* @param id Job id
* @return Deserialized form of job loaded from repository
*/
- MJob findJob(long id);
+ public abstract MJob findJob(long id);
/**
* Get all job objects.
*
* @return List of all jobs in the repository
*/
- List<MJob> findJobs();
+ public abstract List<MJob> findJobs();
/**
* Create new submission record in repository.
*
* @param submission Submission object that should be serialized to repository
*/
- void createSubmission(MSubmission submission);
+ public abstract void createSubmission(MSubmission submission);
/**
* Update already existing submission record in repository.
*
* @param submission Submission object that should be updated
*/
- void updateSubmission(MSubmission submission);
+ public abstract void updateSubmission(MSubmission submission);
/**
* Remove submissions older then given date from repository.
*
* @param threshold Threshold date
*/
- void purgeSubmissions(Date threshold);
+ public abstract void purgeSubmissions(Date threshold);
/**
* Return all unfinished submissions as far as repository is concerned.
*
* @return List of unfinished submissions
*/
- List<MSubmission> findSubmissionsUnfinished();
+ public abstract List<MSubmission> findSubmissionsUnfinished();
/**
* Find last submission for given jobId.
@@ -168,5 +220,181 @@ public interface Repository {
* @param jobId Job id
* @return Most recent submission
*/
- MSubmission findSubmissionLastForJob(long jobId);
+ public abstract MSubmission findSubmissionLastForJob(long jobId);
+
+ /**
+ * Retrieve connections which use the given connector.
+ * @param connectorID Connector ID whose connections should be fetched
+ * @return List of MConnections that use <code>connectorID</code>.
+ */
+ public abstract List<MConnection> findConnectionsForConnector(long
+ connectorID);
+
+ /**
+ * Retrieve jobs which use the given connection.
+ *
+ * @param connectorID Connector ID whose jobs should be fetched
+ * @return List of MJobs that use <code>connectionID</code>.
+ */
+ public abstract List<MJob> findJobsForConnector(long
+ connectorID);
+
+ /**
+ * Update the connector with the new data supplied in the
+ * <tt>newConnector</tt>. Also Update all forms associated with this
+ * connector in the repository with the forms specified in
+ * <tt>mConnector</tt>. <tt>mConnector </tt> must
+ * minimally have the connectorID and all required forms (including ones
+ * which may not have changed). After this operation the repository is
+ * guaranteed to only have the new forms specified in this object.
+ *
+ * @param newConnector The new data to be inserted into the repository for
+ * this connector.
+ * @param tx The repository transaction to use to push the data to the
+ * repository. If this is null, a new transaction will be created.
+ * method will not call begin, commit,
+ * rollback or close on this transaction.
+ */
+ protected abstract void updateConnector(MConnector newConnector,
+ RepositoryTransaction tx);
+
+ /**
+ * Delete all inputs for a job
+ * @param jobId The id of the job whose inputs are to be deleted.
+ * @param tx A transaction on the repository. This
+ * method will not call <code>begin, commit,
+ * rollback or close on this transaction.</code>
+ */
+ protected abstract void deleteJobInputs(long jobId, RepositoryTransaction tx);
+
+ /**
+ * Delete all inputs for a connection
+ * @param connectionID The id of the connection whose inputs are to be
+ * deleted.
+ * @param tx The repository transaction to use to push the data to the
+ * repository. If this is null, a new transaction will be created.
+ * method will not call begin, commit,
+ * rollback or close on this transaction.
+ */
+ protected abstract void deleteConnectionInputs(long connectionID,
+ RepositoryTransaction tx);
+
+ /**
+ * Upgrade the connector with the same {@linkplain MConnector#uniqueName}
+ * in the repository with values from <code>newConnector</code>.
+ * <p/>
+ * All connections and jobs associated with this connector will be upgraded
+ * automatically.
+ *
+ * @param oldConnector The old connector that should be upgraded.
+ * @param newConnector New properties for the Connector that should be
+ * upgraded.
+ */
+ public final void upgradeConnector(MConnector oldConnector,
+ MConnector newConnector) {
+ long connectorID = oldConnector.getPersistenceId();
+ newConnector.setPersistenceId(connectorID);
+ /* Algorithms:
+ * 1. Get an upgrader for the connector.
+ * 2. Get all connections associated with the connector.
+ * 3. Get all jobs associated with the connector.
+ * 4. Delete the inputs for all of the jobs and connections (in that order)
+ * 5. Remove all inputs and forms associated with the connector, and
+ * register the new forms and inputs.
+ * 6. Create new connections and jobs with connector part being the ones
+ * returned by the upgrader.
+ * 7. Insert the connection inputs followed by job inputs (using
+ * updateJob and updateConnection)
+ */
+ RepositoryTransaction tx = null;
+ try {
+ SqoopConnector connector = ConnectorManager.getInstance().getConnector(
+ connectorID);
+ MetadataUpgrader upgrader = connector.getMetadataUpgrader();
+ List<MConnection> connections = findConnectionsForConnector(
+ connectorID);
+ List<MJob> jobs = findJobsForConnector(connectorID);
+ // -- BEGIN TXN --
+ tx = getTransaction();
+ tx.begin();
+ for (MJob job : jobs) {
+ deleteJobInputs(job.getPersistenceId(), tx);
+ }
+ for (MConnection connection : connections) {
+ deleteConnectionInputs(connection.getPersistenceId(), tx);
+ }
+ updateConnector(newConnector, tx);
+ for (MConnection connection : connections) {
+ long connectionID = connection.getPersistenceId();
+ // Make a new copy of the forms from the connector,
+ // else the values will get set in the forms in the connector for
+ // each connection.
+ List<MForm> forms = cloneForms(newConnector.getConnectionForms()
+ .getForms());
+ MConnectionForms newConnectionForms = new MConnectionForms(forms);
+ upgrader.upgrade(connection.getConnectorPart(), newConnectionForms);
+ MConnection newConnection = new MConnection(connectorID,
+ newConnectionForms, connection.getFrameworkPart());
+ newConnection.setPersistenceId(connectionID);
+ updateConnection(newConnection, tx);
+ }
+ for (MJob job : jobs) {
+ // Make a new copy of the forms from the connector,
+ // else the values will get set in the forms in the connector for
+ // each connection.
+ List<MForm> forms = cloneForms(newConnector.getJobForms(job.getType())
+ .getForms());
+ MJobForms newJobForms = new MJobForms(job.getType(), forms);
+ upgrader.upgrade(job.getConnectorPart(), newJobForms);
+ MJob newJob = new MJob(connectorID, job.getConnectionId(),
+ job.getType(), newJobForms, job.getFrameworkPart());
+ updateJob(newJob, tx);
+ }
+ tx.commit();
+ } catch (Exception ex) {
+ if(tx != null) {
+ tx.rollback();
+ }
+ throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
+ } finally {
+ if(tx != null) {
+ tx.close();
+ }
+ }
+ }
+
+ /**
+ * Clones the forms, but does not set the actual data,
+ * validation message etc in the inputs, but only the persistence id of the
+ * inputs.
+ * @param mForms MForms which must be cloned
+ * @return Cloned MForms
+ * @throws Exception
+ */
+ private List<MForm> cloneForms(List<MForm> mForms) throws Exception {
+ List<MForm> forms = new ArrayList<MForm>();
+ for(MForm mForm : mForms) {
+ List<MInput<?>> inputs = new ArrayList<MInput<?>>();
+ for (MInput<?> input : mForm.getInputs()) {
+ MInput newInput;
+ if(input instanceof MEnumInput) {
+ newInput = new MEnumInput(input.getName(), input.isSensitive(),
+ ((MEnumInput) input).getValues());
+ } else if (input instanceof MMapInput) {
+ newInput = new MMapInput(input.getName(), input.isSensitive());
+ } else if(input instanceof MStringInput) {
+ newInput = new MStringInput(input.getName(), input.isSensitive(),
+ ((MStringInput) input).getMaxLength());
+ } else if (input instanceof MIntegerInput) {
+ newInput = new MIntegerInput(input.getName(), input.isSensitive());
+ } else {
+ throw new SqoopException(ModelError.MODEL_003);
+ }
+ newInput.setPersistenceId(input.getPersistenceId());
+ inputs.add(newInput);
+ }
+ forms.add(new MForm(mForm.getName(), inputs));
+ }
+ return forms;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
index 95f6570..327896c 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
@@ -161,6 +161,8 @@ public enum DerbyRepoError implements ErrorCode {
/** Can't retrieve unfinished submissions **/
DERBYREPO_0037("Can't retrieve unfinished submissions"),
+ DERBYREPO_0038("Update of connector failed"),
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 32cef8a..556241e 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -67,7 +67,7 @@ import org.apache.sqoop.utils.StringUtils;
*
* Repository implementation for Derby database.
*/
-public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
+public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
private static final Logger LOG =
Logger.getLogger(DerbyRepositoryHandler.class);
@@ -86,15 +86,54 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
public void registerConnector(MConnector mc, Connection conn) {
if (mc.hasPersistenceId()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
- mc.getUniqueName());
+ mc.getUniqueName());
}
+ mc.setPersistenceId(getConnectorId(mc, conn));
+ insertFormsForConnector(mc, conn);
+ }
- PreparedStatement baseConnectorStmt = null;
+ /**
+ * Helper method to insert the forms from the MConnector into the
+ * repository. The job and connector forms within <code>mc</code> will get
+ * updated with the id of the forms when this function returns.
+ * @param mc The connector to use for updating forms
+ * @param conn JDBC connection to use for updating the forms
+ */
+ private void insertFormsForConnector (MConnector mc, Connection conn) {
+ long connectorId = mc.getPersistenceId();
PreparedStatement baseFormStmt = null;
PreparedStatement baseInputStmt = null;
+ try{
+ baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE,
+ Statement.RETURN_GENERATED_KEYS);
+
+ baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+ Statement.RETURN_GENERATED_KEYS);
+
+ // Register connector forms
+ registerForms(connectorId, null, mc.getConnectionForms().getForms(),
+ MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
+
+ // Register all jobs
+ for (MJobForms jobForms : mc.getAllJobsForms().values()) {
+ registerForms(connectorId, jobForms.getType(), jobForms.getForms(),
+ MFormType.JOB.name(), baseFormStmt, baseInputStmt);
+ }
+
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
+ mc.toString(), ex);
+ } finally {
+ closeStatements(baseFormStmt, baseInputStmt);
+ }
+
+ }
+
+ private long getConnectorId(MConnector mc, Connection conn) {
+ PreparedStatement baseConnectorStmt = null;
try {
baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE,
- Statement.RETURN_GENERATED_KEYS);
+ Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, mc.getUniqueName());
baseConnectorStmt.setString(2, mc.getClassName());
baseConnectorStmt.setString(3, mc.getVersion());
@@ -110,31 +149,12 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
if (!rsetConnectorId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
-
- long connectorId = rsetConnectorId.getLong(1);
- mc.setPersistenceId(connectorId);
-
- baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE,
- Statement.RETURN_GENERATED_KEYS);
-
- baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
- Statement.RETURN_GENERATED_KEYS);
-
- // Register connector forms
- registerForms(connectorId, null, mc.getConnectionForms().getForms(),
- MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
-
- // Register all jobs
- for (MJobForms jobForms : mc.getAllJobsForms().values()) {
- registerForms(connectorId, jobForms.getType(), jobForms.getForms(),
- MFormType.JOB.name(), baseFormStmt, baseInputStmt);
- }
-
+ return rsetConnectorId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
- mc.toString(), ex);
+ mc.toString(), ex);
} finally {
- closeStatements(baseConnectorStmt, baseFormStmt, baseInputStmt);
+ closeStatements(baseConnectorStmt);
}
}
@@ -227,7 +247,6 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
}
String sqoopSchemaId = rset.getString(1);
LOG.debug("SQOOP schema ID: " + sqoopSchemaId);
-
connection.commit();
} catch (SQLException ex) {
if (connection != null) {
@@ -550,21 +569,33 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
@Override
public void deleteConnection(long id, Connection conn) {
PreparedStatement dltConn = null;
- PreparedStatement dltConnInput = null;
+
try {
- dltConnInput = conn.prepareStatement(STMT_DELETE_CONNECTION_INPUT);
+ deleteConnectionInputs(id, conn);
dltConn = conn.prepareStatement(STMT_DELETE_CONNECTION);
-
- dltConnInput.setLong(1, id);
dltConn.setLong(1, id);
-
- dltConnInput.executeUpdate();
dltConn.executeUpdate();
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex);
+ } finally {
+ closeStatements(dltConn);
+ }
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void deleteConnectionInputs(long id, Connection conn) {
+ PreparedStatement dltConnInput = null;
+ try {
+ dltConnInput = conn.prepareStatement(STMT_DELETE_CONNECTION_INPUT);
+ dltConnInput.setLong(1, id);
+ dltConnInput.executeUpdate();
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex);
} finally {
- closeStatements(dltConn, dltConnInput);
+ closeStatements(dltConnInput);
}
}
@@ -613,6 +644,63 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
}
}
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ */
+ @Override
+ public List<MConnection> findConnectionsForConnector(long
+ connectorID, Connection conn) {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(STMT_SELECT_CONNECTION_FOR_CONNECTOR);
+ stmt.setLong(1, connectorID);
+
+ return loadConnections(stmt, conn);
+
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex);
+ } finally {
+ closeStatements(stmt);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void updateConnector(MConnector mConnector, Connection conn) {
+ PreparedStatement updateConnectorStatement = null;
+ PreparedStatement deleteForm = null;
+ PreparedStatement deleteInput = null;
+ try {
+ updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR);
+ deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR);
+ deleteForm = conn.prepareStatement(STMT_DELETE_FORMS_FOR_CONNECTOR);
+ updateConnectorStatement.setString(1, mConnector.getUniqueName());
+ updateConnectorStatement.setString(2, mConnector.getClassName());
+ updateConnectorStatement.setString(3, mConnector.getVersion());
+ updateConnectorStatement.setLong(4, mConnector.getPersistenceId());
+
+ if (updateConnectorStatement.executeUpdate() != 1) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
+ }
+ deleteInput.setLong(1, mConnector.getPersistenceId());
+ deleteForm.setLong(1, mConnector.getPersistenceId());
+ deleteInput.executeUpdate();
+ deleteForm.executeUpdate();
+
+ } catch (SQLException e) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e);
+ } finally {
+ closeStatements(updateConnectorStatement, deleteForm, deleteInput);
+ }
+ insertFormsForConnector(mConnector, conn);
+
+ }
+
/**
* {@inheritDoc}
*/
@@ -746,21 +834,32 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
@Override
public void deleteJob(long id, Connection conn) {
PreparedStatement dlt = null;
- PreparedStatement dltInput = null;
try {
- dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
+ deleteJobInputs(id, conn);
dlt = conn.prepareStatement(STMT_DELETE_JOB);
-
- dltInput.setLong(1, id);
dlt.setLong(1, id);
-
- dltInput.executeUpdate();
dlt.executeUpdate();
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex);
+ } finally {
+ closeStatements(dlt);
+ }
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void deleteJobInputs(long id, Connection conn) {
+ PreparedStatement dltInput = null;
+ try {
+ dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
+ dltInput.setLong(1, id);
+ dltInput.executeUpdate();
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex);
} finally {
- closeStatements(dlt, dltInput);
+ closeStatements(dltInput);
}
}
@@ -813,6 +912,24 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
* {@inheritDoc}
*/
@Override
+ public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR);
+ stmt.setLong(1, connectorId);
+ return loadJobs(stmt, conn);
+
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
+ } finally {
+ closeStatements(stmt);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void createSubmission(MSubmission submission, Connection conn) {
PreparedStatement stmt = null;
int result;
@@ -1297,7 +1414,8 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
}
/**
- * Register forms in derby database.
+ * Register forms in derby database. This method will insert the ids
+ * generated by the repository into the forms passed in itself.
*
* Use given prepared statements to create entire form structure in database.
*
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index ea458ac..4968c0d 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -447,6 +447,30 @@ public final class DerbySchemaQuery {
+ COLUMN_SQI_ENUMVALS
+ ") VALUES (?, ?, ?, ?, ?, ?, ?)";
+ // Delete all forms for a given connector
+ public static final String STMT_DELETE_FORMS_FOR_CONNECTOR =
+ "DELETE FROM " + TABLE_SQ_FORM
+ + " WHERE " + COLUMN_SQN_CONNECTOR + " = ?";
+
+ // Delete all inputs for a given connector
+ public static final String STMT_DELETE_INPUTS_FOR_CONNECTOR =
+ "DELETE FROM " + TABLE_SQ_INPUT
+ + " WHERE "
+ + COLUMN_SQI_FORM
+ + " IN (SELECT "
+ + COLUMN_SQF_ID
+ + " FROM " + TABLE_SQ_FORM
+ + " WHERE "
+ + COLUMN_SQF_CONNECTOR + " = ?)";
+
+ // Update the connector
+ public static final String STMT_UPDATE_CONNECTOR =
+ "UPDATE " + TABLE_SQ_CONNECTOR
+ + " SET " + COLUMN_SQC_NAME + " = ?, "
+ + COLUMN_SQC_CLASS + " = ?, "
+ + COLUMN_SQC_VERSION + " = ? "
+ + " WHERE " + COLUMN_SQC_ID + " = ?";
+
// DML: Insert new connection
public static final String STMT_INSERT_CONNECTION =
"INSERT INTO " + TABLE_SQ_CONNECTION + " ("
@@ -502,6 +526,17 @@ public final class DerbySchemaQuery {
+ COLUMN_SQN_UPDATE_DATE
+ " FROM " + TABLE_SQ_CONNECTION;
+ // DML: Select all connections for a specific connector.
+ public static final String STMT_SELECT_CONNECTION_FOR_CONNECTOR =
+ "SELECT "
+ + COLUMN_SQN_ID + ", "
+ + COLUMN_SQN_NAME + ", "
+ + COLUMN_SQN_CONNECTOR + ", "
+ + COLUMN_SQN_CREATION_DATE + ", "
+ + COLUMN_SQN_UPDATE_DATE
+ + " FROM " + TABLE_SQ_CONNECTION
+ + " WHERE " + COLUMN_SQN_CONNECTOR + " = ?";
+
// DML: Check if given connection exists
public static final String STMT_SELECT_CONNECTION_CHECK =
"SELECT count(*) FROM " + TABLE_SQ_CONNECTION
@@ -567,7 +602,7 @@ public final class DerbySchemaQuery {
+ COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
- + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+ + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+ " WHERE " + COLUMN_SQB_ID + " = ?";
// DML: Select all jobs
@@ -584,6 +619,21 @@ public final class DerbySchemaQuery {
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
+ " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID;
+ // DML: Select all jobs for a Connector
+ public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR =
+ "SELECT "
+ + COLUMN_SQN_CONNECTOR + ", "
+ + COLUMN_SQB_ID + ", "
+ + COLUMN_SQB_NAME + ", "
+ + COLUMN_SQB_CONNECTION + ", "
+ + COLUMN_SQB_TYPE + ", "
+ + COLUMN_SQB_CREATION_DATE + ", "
+ + COLUMN_SQB_UPDATE_DATE
+ + " FROM " + TABLE_SQ_JOB
+ + " LEFT JOIN " + TABLE_SQ_CONNECTION
+ + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+ + " AND " + COLUMN_SQN_CONNECTOR + " = ? ";
+
// DML: Insert new submission
public static final String STMT_INSERT_SUBMISSION =
"INSERT INTO " + TABLE_SQ_SUBMISSION + "("
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java b/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java
new file mode 100644
index 0000000..d840a78
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.spi;
+
+import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MJobForms;
+
+public abstract class MetadataUpgrader {
+
+ /**
+ * Upgrade the original connection and fill into the upgradeTarget. Note
+ * that any metadata already in {@code upgradeTarget} maybe overwritten.
+ * @param original - original connection metadata as in the repository
+ * @param upgradeTarget - the instance that will be filled in with the
+ * upgraded metadata.
+ */
+ public abstract void upgrade(MConnectionForms original,
+ MConnectionForms upgradeTarget);
+ /**
+ * Upgrade the original job and fill into the upgradeTarget. Note
+ * that any metadata already in {@code upgradeTarget} maybe overwritten.
+ * This method must be called only after the connection metadata has
+ * already been upgraded.
+ * @param original - original connection metadata as in the repository
+ * @param upgradeTarget - the instance that will be filled in with the
+ * upgraded metadata.
+ */
+ public abstract void upgrade(MJobForms original, MJobForms upgradeTarget);
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 540303a..2becc56 100644
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -72,4 +72,11 @@ public abstract class SqoopConnector {
*/
public abstract Validator getValidator();
+ /**
+ * Returns an {@linkplain MetadataUpgrader} object that can upgrade the
+ * connection and job metadata.
+ * @return MetadataUpgrader object
+ */
+ public abstract MetadataUpgrader getMetadataUpgrader();
+
}
|