Author: blee
Date: Tue Nov 1 07:39:43 2011
New Revision: 1195857
URL: http://svn.apache.org/viewvc?rev=1195857&view=rev
Log:
SQOOP-375: Migrate metastore and metastore.hsqldb packages to new name space
Added:
incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/
incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobData.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorage.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorageFactory.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/
incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java (with props)
incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbMetaStore.java (with props)
Modified:
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobData.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorage.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorageFactory.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobData.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobData.java?rev=1195857&r1=1195856&r2=1195857&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobData.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobData.java Tue Nov 1 07:39:43 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,47 +22,17 @@ import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.tool.SqoopTool;
/**
- * Container for all job data that should be stored to a
- * permanent resource.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class JobData {
- private SqoopOptions opts;
- private SqoopTool tool;
+public class JobData
+ extends org.apache.sqoop.metastore.JobData {
public JobData() {
+ super();
}
public JobData(SqoopOptions options, SqoopTool sqoopTool) {
- this.opts = options;
- this.tool = sqoopTool;
- }
-
- /**
- * Gets the SqoopOptions.
- */
- public SqoopOptions getSqoopOptions() {
- return this.opts;
- }
-
- /**
- * Gets the SqoopTool.
- */
- public SqoopTool getSqoopTool() {
- return this.tool;
- }
-
- /**
- * Sets the SqoopOptions.
- */
- public void setSqoopOptions(SqoopOptions options) {
- this.opts = options;
- }
-
- /**
- * Sets the SqoopTool.
- */
- public void setSqoopTool(SqoopTool sqoopTool) {
- this.tool = sqoopTool;
+ super(options, sqoopTool);
}
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorage.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorage.java?rev=1195857&r1=1195856&r2=1195857&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorage.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorage.java Tue Nov 1 07:39:43 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,76 +18,10 @@
package com.cloudera.sqoop.metastore;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configured;
-
/**
- * API that defines how jobs are saved, restored, and manipulated.
- *
- * <p>
- * JobStorage instances may be created and then not used; the
- * JobStorage factory may create additional JobStorage instances
- * that return false from accept() and then discard them. The close()
- * method will only be triggered for a JobStorage if the connect()
- * method is called. Connection should not be triggered by a call to
- * accept().</p>
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public abstract class JobStorage extends Configured implements Closeable {
-
- /**
- * Returns true if the JobStorage system can use the metadata in
- * the descriptor to connect to an underlying storage resource.
- */
- public abstract boolean canAccept(Map<String, String> descriptor);
-
-
- /**
- * Opens / connects to the underlying storage resource specified by the
- * descriptor.
- */
- public abstract void open(Map<String, String> descriptor)
- throws IOException;
-
- /**
- * Given a job name, reconstitute a JobData that contains all
- * configuration information required for the job. Returns null if the
- * job name does not match an available job.
- */
- public abstract JobData read(String jobName)
- throws IOException;
-
- /**
- * Forget about a saved job.
- */
- public abstract void delete(String jobName) throws IOException;
-
- /**
- * Given a job name and the data describing a configured job, record the job
- * information to the storage medium.
- */
- public abstract void create(String jobName, JobData data)
- throws IOException;
-
- /**
- * Given a job name and configured job data, update the underlying resource
- * to match the current job configuration.
- */
- public abstract void update(String jobName, JobData data)
- throws IOException;
-
- /**
- * Close any resources opened by the JobStorage system.
- */
- public void close() throws IOException {
- }
-
- /**
- * Enumerate all jobs held in the connected resource.
- */
- public abstract List<String> list() throws IOException;
+public abstract class JobStorage
+ extends org.apache.sqoop.metastore.JobStorage {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorageFactory.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorageFactory.java?rev=1195857&r1=1195856&r2=1195857&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorageFactory.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/JobStorageFactory.java Tue Nov 1 07:39:43 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,55 +18,20 @@
package com.cloudera.sqoop.metastore;
-import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
/**
- * Factory that produces the correct JobStorage system to work with
- * a particular job descriptor.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class JobStorageFactory {
+public class JobStorageFactory
+ extends org.apache.sqoop.metastore.JobStorageFactory {
- private Configuration conf;
-
- /**
- * Configuration key describing the list of JobStorage implementations
- * to use to handle jobs.
- */
public static final String AVAILABLE_STORAGES_KEY =
- "sqoop.job.storage.implementations";
-
- /** The default list of available JobStorage implementations. */
- private static final String DEFAULT_AVAILABLE_STORAGES =
- "com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage,"
- + "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage";
+ org.apache.sqoop.metastore.JobStorageFactory.AVAILABLE_STORAGES_KEY;
public JobStorageFactory(Configuration config) {
- this.conf = config;
-
- // Ensure that we always have an available storages list.
- if (this.conf.get(AVAILABLE_STORAGES_KEY) == null) {
- this.conf.set(AVAILABLE_STORAGES_KEY, DEFAULT_AVAILABLE_STORAGES);
- }
+ super(config);
}
- /**
- * Given a storage descriptor, determine the correct JobStorage
- * implementation to use to connect to the storage resource and return an
- * instance of it -- or null if no JobStorage instance is appropriate.
- */
- public JobStorage getJobStorage(Map<String, String> descriptor) {
- List<JobStorage> storages = this.conf.getInstances(
- AVAILABLE_STORAGES_KEY, JobStorage.class);
- for (JobStorage stor : storages) {
- if (stor.canAccept(descriptor)) {
- return stor;
- }
- }
-
- return null;
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java?rev=1195857&r1=1195856&r2=1195857&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java Tue Nov 1 07:39:43 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,97 +18,24 @@
package com.cloudera.sqoop.metastore.hsqldb;
-import java.io.File;
-import java.io.IOException;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
/**
- * JobStorage implementation that auto-configures an HSQLDB
- * local-file-based instance to hold jobs.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class AutoHsqldbStorage extends HsqldbJobStorage {
+public class AutoHsqldbStorage
+ extends org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage {
- public static final Log LOG = LogFactory.getLog(
- AutoHsqldbStorage.class.getName());
-
- /**
- * Configuration key specifying whether this storage agent is active.
- * Defaults to "on" to allow zero-conf local users.
- */
public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
- "sqoop.metastore.client.enable.autoconnect";
-
- /**
- * Configuration key specifying the connect string used by this
- * storage agent.
- */
+ org.apache.sqoop.metastore.hsqldb.
+ AutoHsqldbStorage.AUTO_STORAGE_IS_ACTIVE_KEY;
public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
- "sqoop.metastore.client.autoconnect.url";
-
- /**
- * Configuration key specifying the username to bind with.
- */
+ org.apache.sqoop.metastore.hsqldb.
+ AutoHsqldbStorage.AUTO_STORAGE_CONNECT_STRING_KEY;
public static final String AUTO_STORAGE_USER_KEY =
- "sqoop.metastore.client.autoconnect.username";
-
-
- /** HSQLDB default user is named 'SA'. */
- private static final String DEFAULT_AUTO_USER = "SA";
-
- /**
- * Configuration key specifying the password to bind with.
- */
+ org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_USER_KEY;
public static final String AUTO_STORAGE_PASS_KEY =
- "sqoop.metastore.client.autoconnect.password";
-
- /** HSQLDB default user has an empty password. */
- public static final String DEFAULT_AUTO_PASSWORD = "";
-
- @Override
- /** {@inheritDoc} */
- public boolean canAccept(Map<String, String> descriptor) {
- Configuration conf = this.getConf();
- return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
- }
-
- /**
- * Determine the user's home directory and return a connect
- * string to HSQLDB that uses ~/.sqoop/ as the storage location
- * for the metastore database.
- */
- private String getHomeDirFileConnectStr() {
- String homeDir = System.getProperty("user.home");
-
- File homeDirObj = new File(homeDir);
- File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
- File databaseFileObj = new File(sqoopDataDirObj, "metastore.db");
-
- String dbFileStr = databaseFileObj.toString();
- return "jdbc:hsqldb:file:" + dbFileStr
- + ";hsqldb.write_delay=false;shutdown=true";
- }
-
- @Override
- /**
- * Set the connection information to use the auto-inferred connection
- * string.
- */
- public void open(Map<String, String> descriptor) throws IOException {
- Configuration conf = getConf();
- setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY,
- getHomeDirFileConnectStr()));
- setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER));
- setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY,
- DEFAULT_AUTO_PASSWORD));
- setConnectedDescriptor(descriptor);
+ org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_PASS_KEY;
+ public static final String DEFAULT_AUTO_PASSWORD =
+ org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.DEFAULT_AUTO_PASSWORD;
- init();
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java?rev=1195857&r1=1195856&r2=1195857&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java Tue Nov 1 07:39:43 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,793 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-
package com.cloudera.sqoop.metastore.hsqldb;
-import java.io.IOException;
-
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.metastore.JobData;
-import com.cloudera.sqoop.metastore.JobStorage;
-import com.cloudera.sqoop.tool.SqoopTool;
-
/**
- * JobStorage implementation that uses an HSQLDB-backed database to
- * hold job information.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class HsqldbJobStorage extends JobStorage {
-
- public static final Log LOG = LogFactory.getLog(
- HsqldbJobStorage.class.getName());
-
- /** descriptor key identifying the connect string for the metastore. */
- public static final String META_CONNECT_KEY = "metastore.connect.string";
-
- /** descriptor key identifying the username to use when connecting
- * to the metastore.
- */
- public static final String META_USERNAME_KEY = "metastore.username";
-
- /** descriptor key identifying the password to use when connecting
- * to the metastore.
- */
- public static final String META_PASSWORD_KEY = "metastore.password";
-
-
- /** Default name for the root metadata table in HSQLDB. */
- private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT";
+public class HsqldbJobStorage
+ extends org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage {
- /** Configuration key used to override root table name. */
+ public static final String META_CONNECT_KEY =
+ org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_CONNECT_KEY;
+ public static final String META_USERNAME_KEY =
+ org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_USERNAME_KEY;
+ public static final String META_PASSWORD_KEY =
+ org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_PASSWORD_KEY;
public static final String ROOT_TABLE_NAME_KEY =
- "sqoop.hsqldb.root.table.name";
-
- /** root metadata table key used to define the current schema version. */
- private static final String STORAGE_VERSION_KEY =
- "sqoop.hsqldb.job.storage.version";
-
- /** The current version number for the schema edition. */
- private static final int CUR_STORAGE_VERSION = 0;
-
- /** root metadata table key used to define the job table name. */
- private static final String SESSION_TABLE_KEY =
- "sqoop.hsqldb.job.info.table";
-
- /** Default value for SESSION_TABLE_KEY. */
- private static final String DEFAULT_SESSION_TABLE_NAME =
- "SQOOP_SESSIONS";
-
- /** Per-job key with propClass 'schema' that defines the set of
- * properties valid to be defined for propClass 'SqoopOptions'. */
- private static final String PROPERTY_SET_KEY =
- "sqoop.property.set.id";
-
- /** Current value for PROPERTY_SET_KEY. */
- private static final String CUR_PROPERTY_SET_ID = "0";
-
- // The following are values for propClass in the v0 schema which
- // describe different aspects of the stored metadata.
-
- /** Property class for properties about the stored data itself. */
- private static final String PROPERTY_CLASS_SCHEMA = "schema";
-
- /** Property class for properties that are loaded into SqoopOptions. */
- private static final String PROPERTY_CLASS_SQOOP_OPTIONS = "SqoopOptions";
-
- /** Property class for properties that are loaded into a Configuration. */
- private static final String PROPERTY_CLASS_CONFIG = "config";
-
- /**
- * Per-job key with propClass 'schema' that specifies the SqoopTool
- * to load.
- */
- private static final String SQOOP_TOOL_KEY = "sqoop.tool";
-
-
- private Map<String, String> connectedDescriptor;
- private String metastoreConnectStr;
- private String metastoreUser;
- private String metastorePassword;
- private Connection connection;
-
- protected Connection getConnection() {
- return this.connection;
- }
-
- // After connection to the database and initialization of the
- // schema, this holds the name of the job table.
- private String jobTableName;
-
- protected void setMetastoreConnectStr(String connectStr) {
- this.metastoreConnectStr = connectStr;
- }
-
- protected void setMetastoreUser(String user) {
- this.metastoreUser = user;
- }
-
- protected void setMetastorePassword(String pass) {
- this.metastorePassword = pass;
- }
-
- private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver";
-
- /**
- * Set the descriptor used to open() this storage.
- */
- protected void setConnectedDescriptor(Map<String, String> descriptor) {
- this.connectedDescriptor = descriptor;
- }
-
- @Override
- /**
- * Initialize the connection to the database.
- */
- public void open(Map<String, String> descriptor) throws IOException {
- setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
- setMetastoreUser(descriptor.get(META_USERNAME_KEY));
- setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
- setConnectedDescriptor(descriptor);
-
- init();
- }
-
- protected void init() throws IOException {
- try {
- // Load/initialize the JDBC driver.
- Class.forName(DB_DRIVER_CLASS);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException("Could not load HSQLDB JDBC driver", cnfe);
- }
-
- try {
- if (null == metastoreUser) {
- this.connection = DriverManager.getConnection(metastoreConnectStr);
- } else {
- this.connection = DriverManager.getConnection(metastoreConnectStr,
- metastoreUser, metastorePassword);
- }
-
- connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
- connection.setAutoCommit(false);
-
- // Initialize the root schema.
- if (!rootTableExists()) {
- createRootTable();
- }
-
- // Check the schema version.
- String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, null);
- int actualStorageVer = -1;
- try {
- actualStorageVer = Integer.valueOf(curStorageVerStr);
- } catch (NumberFormatException nfe) {
- LOG.warn("Could not interpret as a number: " + curStorageVerStr);
- }
- if (actualStorageVer != CUR_STORAGE_VERSION) {
- LOG.error("Can not interpret metadata schema");
- LOG.error("The metadata schema version is " + curStorageVerStr);
- LOG.error("The highest version supported is " + CUR_STORAGE_VERSION);
- LOG.error("To use this version of Sqoop, "
- + "you must downgrade your metadata schema.");
- throw new IOException("Invalid metadata version.");
- }
-
- // Initialize the versioned schema.
- initV0Schema();
- } catch (SQLException sqle) {
- if (null != connection) {
- try {
- connection.rollback();
- } catch (SQLException e2) {
- LOG.warn("Error rolling back transaction in error handler: " + e2);
- }
- }
-
- throw new IOException("Exception creating SQL connection", sqle);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (null != this.connection) {
- try {
- LOG.debug("Flushing current transaction");
- this.connection.commit();
- } catch (SQLException sqlE) {
- throw new IOException("Exception committing connection", sqlE);
- }
-
- try {
- LOG.debug("Closing connection");
- this.connection.close();
- } catch (SQLException sqlE) {
- throw new IOException("Exception closing connection", sqlE);
- } finally {
- this.connection = null;
- }
- }
- }
-
- @Override
- /** {@inheritDoc} */
- public boolean canAccept(Map<String, String> descriptor) {
- // We return true if the desciptor contains a connect string to find
- // the database.
- return descriptor.get(META_CONNECT_KEY) != null;
- }
-
- @Override
- /** {@inheritDoc} */
- public JobData read(String jobName) throws IOException {
- try {
- if (!jobExists(jobName)) {
- LOG.error("Cannot restore job: " + jobName);
- LOG.error("(No such job)");
- throw new IOException("Cannot restore missing job " + jobName);
- }
-
- LOG.debug("Restoring job: " + jobName);
- Properties schemaProps = getV0Properties(jobName,
- PROPERTY_CLASS_SCHEMA);
- Properties sqoopOptProps = getV0Properties(jobName,
- PROPERTY_CLASS_SQOOP_OPTIONS);
- Properties configProps = getV0Properties(jobName,
- PROPERTY_CLASS_CONFIG);
-
- // Check that we're not using a saved job from a previous
- // version whose functionality has been deprecated.
- String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY);
- LOG.debug("System property set: " + CUR_PROPERTY_SET_ID);
- LOG.debug("Stored property set: " + thisPropSetId);
- if (!CUR_PROPERTY_SET_ID.equals(thisPropSetId)) {
- LOG.warn("The property set present in this database was written by");
- LOG.warn("an incompatible version of Sqoop. This may result in an");
- LOG.warn("incomplete operation.");
- // TODO(aaron): Should this fail out-right?
- }
-
- String toolName = schemaProps.getProperty(SQOOP_TOOL_KEY);
- if (null == toolName) {
- // Don't know what tool to create.
- throw new IOException("Incomplete metadata; missing "
- + SQOOP_TOOL_KEY);
- }
-
- SqoopTool tool = SqoopTool.getTool(toolName);
- if (null == tool) {
- throw new IOException("Error in job metadata: invalid tool "
- + toolName);
- }
-
- Configuration conf = new Configuration();
- for (Map.Entry<Object, Object> entry : configProps.entrySet()) {
- conf.set(entry.getKey().toString(), entry.getValue().toString());
- }
-
- SqoopOptions opts = new SqoopOptions();
- opts.setConf(conf);
- opts.loadProperties(sqoopOptProps);
-
- // Set the job connection information for this job.
- opts.setJobName(jobName);
- opts.setStorageDescriptor(connectedDescriptor);
-
- return new JobData(opts, tool);
- } catch (SQLException sqlE) {
- throw new IOException("Error communicating with database", sqlE);
- }
- }
-
- private boolean jobExists(String jobName) throws SQLException {
- PreparedStatement s = connection.prepareStatement(
- "SELECT COUNT(job_name) FROM " + this.jobTableName
- + " WHERE job_name = ? GROUP BY job_name");
- ResultSet rs = null;
- try {
- s.setString(1, jobName);
- rs = s.executeQuery();
- if (rs.next()) {
- return true; // We got a result, meaning the job exists.
- }
- } finally {
- if (null != rs) {
- try {
- rs.close();
- } catch (SQLException sqlE) {
- LOG.warn("Error closing result set: " + sqlE);
- }
- }
-
- s.close();
- }
-
- return false; // No result.
- }
-
- @Override
- /** {@inheritDoc} */
- public void delete(String jobName) throws IOException {
- try {
- if (!jobExists(jobName)) {
- LOG.error("No such job: " + jobName);
- } else {
- LOG.debug("Deleting job: " + jobName);
- PreparedStatement s = connection.prepareStatement("DELETE FROM "
- + this.jobTableName + " WHERE job_name = ?");
- try {
- s.setString(1, jobName);
- s.executeUpdate();
- } finally {
- s.close();
- }
- connection.commit();
- }
- } catch (SQLException sqlEx) {
- try {
- connection.rollback();
- } catch (SQLException e2) {
- LOG.warn("Error rolling back transaction in error handler: " + e2);
- }
- throw new IOException("Error communicating with database", sqlEx);
- }
- }
-
- @Override
- /** {@inheritDoc} */
- public void create(String jobName, JobData data)
- throws IOException {
- try {
- if (jobExists(jobName)) {
- LOG.error("Cannot create job " + jobName
- + ": it already exists");
- throw new IOException("Job " + jobName + " already exists");
- }
- } catch (SQLException sqlE) {
- throw new IOException("Error communicating with database", sqlE);
- }
-
- createInternal(jobName, data);
- }
-
- /**
- * Actually insert/update the resources for this job.
- */
- private void createInternal(String jobName, JobData data)
- throws IOException {
- try {
- LOG.debug("Creating job: " + jobName);
-
- // Save the name of the Sqoop tool.
- setV0Property(jobName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY,
- data.getSqoopTool().getToolName());
-
- // Save the property set id.
- setV0Property(jobName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY,
- CUR_PROPERTY_SET_ID);
-
- // Save all properties of the SqoopOptions.
- Properties props = data.getSqoopOptions().writeProperties();
- setV0Properties(jobName, PROPERTY_CLASS_SQOOP_OPTIONS, props);
-
- // And save all unique properties of the configuration.
- Configuration saveConf = data.getSqoopOptions().getConf();
- Configuration baseConf = new Configuration();
-
- for (Map.Entry<String, String> entry : saveConf) {
- String key = entry.getKey();
- String rawVal = saveConf.getRaw(key);
- String baseVal = baseConf.getRaw(key);
- if (baseVal != null && rawVal.equals(baseVal)) {
- continue; // Don't save this; it's set in the base configuration.
- }
-
- LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal);
- setV0Property(jobName, PROPERTY_CLASS_CONFIG, key, rawVal);
- }
-
- connection.commit();
- } catch (SQLException sqlE) {
- try {
- connection.rollback();
- } catch (SQLException sqlE2) {
- LOG.warn("Exception rolling back transaction during error handling: "
- + sqlE2);
- }
- throw new IOException("Error communicating with database", sqlE);
- }
- }
-
- @Override
- /** {@inheritDoc} */
- public void update(String jobName, JobData data)
- throws IOException {
- try {
- if (!jobExists(jobName)) {
- LOG.error("Cannot update job " + jobName + ": not found");
- throw new IOException("Job " + jobName + " does not exist");
- }
- } catch (SQLException sqlE) {
- throw new IOException("Error communicating with database", sqlE);
- }
-
- // Since we set properties with update-or-insert, this is the same
- // as create on this system.
- createInternal(jobName, data);
- }
-
- @Override
- /** {@inheritDoc} */
- public List<String> list() throws IOException {
- ResultSet rs = null;
- try {
- PreparedStatement s = connection.prepareStatement(
- "SELECT DISTINCT job_name FROM " + this.jobTableName);
- try {
- rs = s.executeQuery();
- ArrayList<String> jobs = new ArrayList<String>();
- while (rs.next()) {
- jobs.add(rs.getString(1));
- }
-
- return jobs;
- } finally {
- if (null != rs) {
- try {
- rs.close();
- } catch (SQLException sqlE) {
- LOG.warn("Error closing resultset: " + sqlE);
- }
- }
-
- if (null != s) {
- s.close();
- }
- }
- } catch (SQLException sqlE) {
- throw new IOException("Error communicating with database", sqlE);
- }
- }
-
- // Determine the name to use for the root metadata table.
- private String getRootTableName() {
- Configuration conf = getConf();
- return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME);
- }
-
- private boolean tableExists(String table) throws SQLException {
- LOG.debug("Checking for table: " + table);
- DatabaseMetaData dbmd = connection.getMetaData();
- String [] tableTypes = { "TABLE" };
- ResultSet rs = dbmd.getTables(null, null, null, tableTypes);
- if (null != rs) {
- try {
- while (rs.next()) {
- if (table.equalsIgnoreCase(rs.getString("TABLE_NAME"))) {
- LOG.debug("Found table: " + table);
- return true;
- }
- }
- } finally {
- rs.close();
- }
- }
-
- LOG.debug("Could not find table.");
- return false;
- }
-
- private boolean rootTableExists() throws SQLException {
- String rootTableName = getRootTableName();
- return tableExists(rootTableName);
- }
-
- private void createRootTable() throws SQLException {
- String rootTableName = getRootTableName();
- LOG.debug("Creating root table: " + rootTableName);
-
- // TODO: Sanity-check the value of rootTableName to ensure it is
- // not a SQL-injection attack vector.
- Statement s = connection.createStatement();
- try {
- s.executeUpdate("CREATE TABLE " + rootTableName + " ("
- + "version INT, "
- + "propname VARCHAR(128) NOT NULL, "
- + "propval VARCHAR(256), "
- + "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))");
- } finally {
- s.close();
- }
-
- setRootProperty(STORAGE_VERSION_KEY, null,
- Integer.toString(CUR_STORAGE_VERSION));
-
- LOG.debug("Saving root table.");
- connection.commit();
- }
-
- /**
- * Look up a value for the specified version (may be null) in the
- * root metadata table.
- */
- private String getRootProperty(String propertyName, Integer version)
- throws SQLException {
- LOG.debug("Looking up property " + propertyName + " for version "
- + version);
- PreparedStatement s = null;
- ResultSet rs = null;
-
- try {
- if (null == version) {
- s = connection.prepareStatement(
- "SELECT propval FROM " + getRootTableName()
- + " WHERE version IS NULL AND propname = ?");
- s.setString(1, propertyName);
- } else {
- s = connection.prepareStatement(
- "SELECT propval FROM " + getRootTableName() + " WHERE version = ? "
- + " AND propname = ?");
- s.setInt(1, version);
- s.setString(2, propertyName);
- }
-
- rs = s.executeQuery();
- if (!rs.next()) {
- LOG.debug(" => (no result)");
- return null; // No such result.
- } else {
- String result = rs.getString(1); // Return the only result col.
- LOG.debug(" => " + result);
- return result;
- }
- } finally {
- if (null != rs) {
- try {
- rs.close();
- } catch (SQLException sqlE) {
- LOG.warn("Error closing resultset: " + sqlE);
- }
- }
-
- if (null != s) {
- s.close();
- }
- }
- }
-
- /**
- * Set a value for the specified version (may be null) in the root
- * metadata table.
- */
- private void setRootProperty(String propertyName, Integer version,
- String val) throws SQLException {
- LOG.debug("Setting property " + propertyName + " for version "
- + version + " => " + val);
-
- PreparedStatement s;
- String curVal = getRootProperty(propertyName, version);
- if (null == curVal) {
- // INSERT the row.
- s = connection.prepareStatement("INSERT INTO " + getRootTableName()
- + " (propval, propname, version) VALUES ( ? , ? , ? )");
- } else if (version == null) {
- // UPDATE an existing row with a null version
- s = connection.prepareStatement("UPDATE " + getRootTableName()
- + " SET propval = ? WHERE propname = ? AND version IS NULL");
- } else {
- // UPDATE an existing row with non-null version.
- s = connection.prepareStatement("UPDATE " + getRootTableName()
- + " SET propval = ? WHERE propname = ? AND version = ?");
- }
-
- try {
- s.setString(1, val);
- s.setString(2, propertyName);
- if (null != version) {
- s.setInt(3, version);
- }
- s.executeUpdate();
- } finally {
- s.close();
- }
- }
-
- /**
- * Create the jobs table in the V0 schema.
- */
- private void createJobTable() throws SQLException {
- String curTableName = DEFAULT_SESSION_TABLE_NAME;
- int tableNum = -1;
- while (true) {
- if (tableExists(curTableName)) {
- tableNum++;
- curTableName = DEFAULT_SESSION_TABLE_NAME + "_" + tableNum;
- } else {
- break;
- }
- }
-
- // curTableName contains a table name that does not exist.
- // Create this table.
- LOG.debug("Creating job storage table: " + curTableName);
- Statement s = connection.createStatement();
- try {
- s.executeUpdate("CREATE TABLE " + curTableName + " ("
- + "job_name VARCHAR(64) NOT NULL, "
- + "propname VARCHAR(128) NOT NULL, "
- + "propval VARCHAR(1024), "
- + "propclass VARCHAR(32) NOT NULL, "
- + "CONSTRAINT " + curTableName + "_unq UNIQUE "
- + "(job_name, propname, propclass))");
-
- // Then set a property in the root table pointing to it.
- setRootProperty(SESSION_TABLE_KEY, 0, curTableName);
- connection.commit();
- } finally {
- s.close();
- }
-
- this.jobTableName = curTableName;
- }
-
- /**
- * Given a root schema that exists,
- * initialize a version-0 key/value storage schema on top of it,
- * if it does not already exist.
- */
- private void initV0Schema() throws SQLException {
- this.jobTableName = getRootProperty(SESSION_TABLE_KEY, 0);
- if (null == this.jobTableName) {
- createJobTable();
- }
- if (!tableExists(this.jobTableName)) {
- LOG.debug("Could not find job table: " + jobTableName);
- createJobTable();
- }
- }
-
- /**
- * INSERT or UPDATE a single (job, propname, class) to point
- * to the specified property value.
- */
- private void setV0Property(String jobName, String propClass,
- String propName, String propVal) throws SQLException {
- LOG.debug("Job: " + jobName + "; Setting property "
- + propName + " with class " + propClass + " => " + propVal);
-
- PreparedStatement s = null;
- try {
- String curValue = getV0Property(jobName, propClass, propName);
- if (null == curValue) {
- // Property is not yet set.
- s = connection.prepareStatement("INSERT INTO " + this.jobTableName
- + " (propval, job_name, propclass, propname) "
- + "VALUES (?, ?, ?, ?)");
- } else {
- // Overwrite existing property.
- s = connection.prepareStatement("UPDATE " + this.jobTableName
- + " SET propval = ? WHERE job_name = ? AND propclass = ? "
- + "AND propname = ?");
- }
-
- s.setString(1, propVal);
- s.setString(2, jobName);
- s.setString(3, propClass);
- s.setString(4, propName);
-
- s.executeUpdate();
- } finally {
- if (null != s) {
- s.close();
- }
- }
- }
-
- /**
- * Return a string containing the value of a specified property,
- * or null if it is not set.
- */
- private String getV0Property(String jobName, String propClass,
- String propertyName) throws SQLException {
- LOG.debug("Job: " + jobName + "; Getting property "
- + propertyName + " with class " + propClass);
-
- ResultSet rs = null;
- PreparedStatement s = connection.prepareStatement(
- "SELECT propval FROM " + this.jobTableName
- + " WHERE job_name = ? AND propclass = ? AND propname = ?");
-
- try {
- s.setString(1, jobName);
- s.setString(2, propClass);
- s.setString(3, propertyName);
- rs = s.executeQuery();
-
- if (!rs.next()) {
- LOG.debug(" => (no result)");
- return null;
- }
-
- String result = rs.getString(1);
- LOG.debug(" => " + result);
- return result;
- } finally {
- if (null != rs) {
- try {
- rs.close();
- } catch (SQLException sqlE) {
- LOG.warn("Error closing resultset: " + sqlE);
- }
- }
-
- s.close();
- }
- }
-
- /**
- * Get a java.util.Properties containing all propName -> propVal
- * bindings for a given (jobName, propClass).
- */
- private Properties getV0Properties(String jobName, String propClass)
- throws SQLException {
- LOG.debug("Job: " + jobName
- + "; Getting properties with class " + propClass);
-
- ResultSet rs = null;
- PreparedStatement s = connection.prepareStatement(
- "SELECT propname, propval FROM " + this.jobTableName
- + " WHERE job_name = ? AND propclass = ?");
- try {
- s.setString(1, jobName);
- s.setString(2, propClass);
- rs = s.executeQuery();
-
- Properties p = new Properties();
- while (rs.next()) {
- p.setProperty(rs.getString(1), rs.getString(2));
- }
-
- return p;
- } finally {
- if (null != rs) {
- try {
- rs.close();
- } catch (SQLException sqlE) {
- LOG.warn("Error closing result set: " + sqlE);
- }
- }
-
- s.close();
- }
- }
-
- private void setV0Properties(String jobName, String propClass,
- Properties properties) throws SQLException {
- LOG.debug("Job: " + jobName
- + "; Setting bulk properties for class " + propClass);
+ org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.ROOT_TABLE_NAME_KEY;
- for (Map.Entry<Object, Object> entry : properties.entrySet()) {
- String key = entry.getKey().toString();
- String val = entry.getValue().toString();
- setV0Property(jobName, propClass, key, val);
- }
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java?rev=1195857&r1=1195856&r2=1195857&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaStore.java Tue Nov 1 07:39:43 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,164 +19,24 @@
package com.cloudera.sqoop.metastore.hsqldb;
-import java.io.File;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-
-import org.hsqldb.Server;
-import org.hsqldb.ServerConstants;
-
-import com.cloudera.sqoop.SqoopOptions;
-
-import com.cloudera.sqoop.manager.HsqldbManager;
-
/**
- * Container for an HSQLDB-backed metastore.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class HsqldbMetaStore {
-
- public static final Log LOG = LogFactory.getLog(
- HsqldbMetaStore.class.getName());
+public class HsqldbMetaStore
+ extends org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore {
- /** Where on the local fs does the metastore put files? */
public static final String META_STORAGE_LOCATION_KEY =
- "sqoop.metastore.server.location";
-
- /**
- * What port does the metastore listen on?
- */
+ org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore.META_STORAGE_LOCATION_KEY;
public static final String META_SERVER_PORT_KEY =
- "sqoop.metastore.server.port";
-
- /** Default to this port if unset. */
- public static final int DEFAULT_PORT = 16000;
-
- private int port;
- private String fileLocation;
- private Server server;
- private Configuration conf;
+ org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore.META_SERVER_PORT_KEY;
+ public static final int DEFAULT_PORT =
+ org.apache.sqoop.metastore.hsqldb.HsqldbMetaStore.DEFAULT_PORT;
public HsqldbMetaStore(Configuration config) {
- this.conf = config;
- init();
+ super(config);
}
- /**
- * Determine the user's home directory and return a file path
- * under this root where the shared metastore can be placed.
- */
- private String getHomeDirFilePath() {
- String homeDir = System.getProperty("user.home");
-
- File homeDirObj = new File(homeDir);
- File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
- File databaseFileObj = new File(sqoopDataDirObj, "shared-metastore.db");
-
- return databaseFileObj.toString();
- }
-
- private void init() {
- if (null != server) {
- LOG.debug("init(): server already exists.");
- return;
- }
-
- fileLocation = conf.get(META_STORAGE_LOCATION_KEY, null);
- if (null == fileLocation) {
- fileLocation = getHomeDirFilePath();
- LOG.warn("The location for metastore data has not been explicitly set. "
- + "Placing shared metastore files in " + fileLocation);
- }
-
- this.port = conf.getInt(META_SERVER_PORT_KEY, DEFAULT_PORT);
- }
-
-
- public void start() {
- try {
- if (server != null) {
- server.checkRunning(false);
- }
- } catch (RuntimeException re) {
- LOG.info("Server is already started.");
- return;
- }
-
- server = new Server();
- server.setDatabasePath(0, "file:" + fileLocation);
- server.setDatabaseName(0, "sqoop");
- server.putPropertiesFromString("hsqldb.write_delay=false");
- server.setPort(port);
- server.setSilent(true);
- server.setNoSystemExit(true);
-
- server.start();
- LOG.info("Server started on port " + port + " with protocol "
- + server.getProtocol());
- }
-
- /**
- * Blocks the current thread until the server is shut down.
- */
- public void waitForServer() {
- while (true) {
- int curState = server.getState();
- if (curState == ServerConstants.SERVER_STATE_SHUTDOWN) {
- LOG.info("Got shutdown notification");
- break;
- }
-
- try {
- Thread.sleep(100);
- } catch (InterruptedException ie) {
- LOG.info("Interrupted while blocking for server:"
- + StringUtils.stringifyException(ie));
- }
- }
- }
-
- /**
- * Connects to the server and instructs it to shutdown.
- */
- public void shutdown() {
- // Send the SHUTDOWN command to the server via SQL.
- SqoopOptions options = new SqoopOptions(conf);
- options.setConnectString("jdbc:hsqldb:hsql://localhost:"
- + port + "/sqoop");
- options.setUsername("SA");
- options.setPassword("");
- HsqldbManager manager = new HsqldbManager(options);
- Statement s = null;
- try {
- Connection c = manager.getConnection();
- s = c.createStatement();
- s.execute("SHUTDOWN");
- } catch (SQLException sqlE) {
- LOG.warn("Exception shutting down database: "
- + StringUtils.stringifyException(sqlE));
- } finally {
- if (null != s) {
- try {
- s.close();
- } catch (SQLException sqlE) {
- LOG.warn("Error closing statement: " + sqlE);
- }
- }
-
- try {
- manager.close();
- } catch (SQLException sqlE) {
- LOG.warn("Error closing manager: " + sqlE);
- }
- }
- }
}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobData.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobData.java?rev=1195857&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobData.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobData.java Tue Nov 1 07:39:43 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.metastore;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.tool.SqoopTool;
+
+/**
+ * Container for all job data that should be stored to a
+ * permanent resource.
+ */
+public class JobData {
+ private SqoopOptions opts;
+ private SqoopTool tool;
+
+ public JobData() {
+ }
+
+ public JobData(SqoopOptions options, SqoopTool sqoopTool) {
+ this.opts = options;
+ this.tool = sqoopTool;
+ }
+
+ /**
+ * Gets the SqoopOptions.
+ */
+ public SqoopOptions getSqoopOptions() {
+ return this.opts;
+ }
+
+ /**
+ * Gets the SqoopTool.
+ */
+ public SqoopTool getSqoopTool() {
+ return this.tool;
+ }
+
+ /**
+ * Sets the SqoopOptions.
+ */
+ public void setSqoopOptions(SqoopOptions options) {
+ this.opts = options;
+ }
+
+ /**
+ * Sets the SqoopTool.
+ */
+ public void setSqoopTool(SqoopTool sqoopTool) {
+ this.tool = sqoopTool;
+ }
+
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobData.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorage.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorage.java?rev=1195857&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorage.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorage.java Tue Nov 1 07:39:43 2011
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.metastore;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+import com.cloudera.sqoop.metastore.JobData;
+
+/**
+ * API that defines how jobs are saved, restored, and manipulated.
+ *
+ * <p>
+ * JobStorage instances may be created and then not used; the
+ * JobStorage factory may create additional JobStorage instances
+ * that return false from accept() and then discard them. The close()
+ * method will only be triggered for a JobStorage if the connect()
+ * method is called. Connection should not be triggered by a call to
+ * accept().</p>
+ */
+public abstract class JobStorage extends Configured implements Closeable {
+
+ /**
+ * Returns true if the JobStorage system can use the metadata in
+ * the descriptor to connect to an underlying storage resource.
+ */
+ public abstract boolean canAccept(Map<String, String> descriptor);
+
+
+ /**
+ * Opens / connects to the underlying storage resource specified by the
+ * descriptor.
+ */
+ public abstract void open(Map<String, String> descriptor)
+ throws IOException;
+
+ /**
+ * Given a job name, reconstitute a JobData that contains all
+ * configuration information required for the job. Returns null if the
+ * job name does not match an available job.
+ */
+ public abstract JobData read(String jobName)
+ throws IOException;
+
+ /**
+ * Forget about a saved job.
+ */
+ public abstract void delete(String jobName) throws IOException;
+
+ /**
+ * Given a job name and the data describing a configured job, record the job
+ * information to the storage medium.
+ */
+ public abstract void create(String jobName, JobData data)
+ throws IOException;
+
+ /**
+ * Given a job name and configured job data, update the underlying resource
+ * to match the current job configuration.
+ */
+ public abstract void update(String jobName, JobData data)
+ throws IOException;
+
+ /**
+ * Close any resources opened by the JobStorage system.
+ */
+ public void close() throws IOException {
+ }
+
+ /**
+ * Enumerate all jobs held in the connected resource.
+ */
+ public abstract List<String> list() throws IOException;
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorage.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorageFactory.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorageFactory.java?rev=1195857&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorageFactory.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorageFactory.java Tue Nov 1 07:39:43 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.metastore;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import com.cloudera.sqoop.metastore.JobStorage;
+
+/**
+ * Factory that produces the correct JobStorage system to work with
+ * a particular job descriptor.
+ */
+public class JobStorageFactory {
+
+ private Configuration conf;
+
+ /**
+ * Configuration key describing the list of JobStorage implementations
+ * to use to handle jobs.
+ */
+ public static final String AVAILABLE_STORAGES_KEY =
+ "sqoop.job.storage.implementations";
+
+ /** The default list of available JobStorage implementations. */
+ private static final String DEFAULT_AVAILABLE_STORAGES =
+ "com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage,"
+ + "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage";
+
+ public JobStorageFactory(Configuration config) {
+ this.conf = config;
+
+ // Ensure that we always have an available storages list.
+ if (this.conf.get(AVAILABLE_STORAGES_KEY) == null) {
+ this.conf.set(AVAILABLE_STORAGES_KEY, DEFAULT_AVAILABLE_STORAGES);
+ }
+ }
+
+ /**
+ * Given a storage descriptor, determine the correct JobStorage
+ * implementation to use to connect to the storage resource and return an
+ * instance of it -- or null if no JobStorage instance is appropriate.
+ */
+ public JobStorage getJobStorage(Map<String, String> descriptor) {
+ List<JobStorage> storages = this.conf.getInstances(
+ AVAILABLE_STORAGES_KEY, JobStorage.class);
+ for (JobStorage stor : storages) {
+ if (stor.canAccept(descriptor)) {
+ return stor;
+ }
+ }
+
+ return null;
+ }
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/JobStorageFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java?rev=1195857&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java Tue Nov 1 07:39:43 2011
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.metastore.hsqldb;
+
+import java.io.File;
+import java.io.IOException;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * JobStorage implementation that auto-configures an HSQLDB
+ * local-file-based instance to hold jobs.
+ */
+public class AutoHsqldbStorage
+ extends com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage {
+
+ public static final Log LOG = LogFactory.getLog(
+ AutoHsqldbStorage.class.getName());
+
+ /**
+ * Configuration key specifying whether this storage agent is active.
+ * Defaults to "on" to allow zero-conf local users.
+ */
+ public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
+ "sqoop.metastore.client.enable.autoconnect";
+
+ /**
+ * Configuration key specifying the connect string used by this
+ * storage agent.
+ */
+ public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
+ "sqoop.metastore.client.autoconnect.url";
+
+ /**
+ * Configuration key specifying the username to bind with.
+ */
+ public static final String AUTO_STORAGE_USER_KEY =
+ "sqoop.metastore.client.autoconnect.username";
+
+
+ /** HSQLDB default user is named 'SA'. */
+ private static final String DEFAULT_AUTO_USER = "SA";
+
+ /**
+ * Configuration key specifying the password to bind with.
+ */
+ public static final String AUTO_STORAGE_PASS_KEY =
+ "sqoop.metastore.client.autoconnect.password";
+
+ /** HSQLDB default user has an empty password. */
+ public static final String DEFAULT_AUTO_PASSWORD = "";
+
+ @Override
+ /** {@inheritDoc} */
+ public boolean canAccept(Map<String, String> descriptor) {
+ Configuration conf = this.getConf();
+ return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
+ }
+
+ /**
+ * Determine the user's home directory and return a connect
+ * string to HSQLDB that uses ~/.sqoop/ as the storage location
+ * for the metastore database.
+ */
+ private String getHomeDirFileConnectStr() {
+ String homeDir = System.getProperty("user.home");
+
+ File homeDirObj = new File(homeDir);
+ File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
+ File databaseFileObj = new File(sqoopDataDirObj, "metastore.db");
+
+ String dbFileStr = databaseFileObj.toString();
+ return "jdbc:hsqldb:file:" + dbFileStr
+ + ";hsqldb.write_delay=false;shutdown=true";
+ }
+
+ @Override
+ /**
+ * Set the connection information to use the auto-inferred connection
+ * string.
+ */
+ public void open(Map<String, String> descriptor) throws IOException {
+ Configuration conf = getConf();
+ setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY,
+ getHomeDirFileConnectStr()));
+ setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER));
+ setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY,
+ DEFAULT_AUTO_PASSWORD));
+ setConnectedDescriptor(descriptor);
+
+ init();
+ }
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java?rev=1195857&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java Tue Nov 1 07:39:43 2011
@@ -0,0 +1,805 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.metastore.hsqldb;
+
+import java.io.IOException;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.metastore.JobData;
+import com.cloudera.sqoop.metastore.JobStorage;
+import com.cloudera.sqoop.tool.SqoopTool;
+
+/**
+ * JobStorage implementation that uses an HSQLDB-backed database to
+ * hold job information.
+ */
+public class HsqldbJobStorage extends JobStorage {
+
+ public static final Log LOG = LogFactory.getLog(
+ HsqldbJobStorage.class.getName());
+
+ /** descriptor key identifying the connect string for the metastore. */
+ public static final String META_CONNECT_KEY = "metastore.connect.string";
+
+ /** descriptor key identifying the username to use when connecting
+ * to the metastore.
+ */
+ public static final String META_USERNAME_KEY = "metastore.username";
+
+ /** descriptor key identifying the password to use when connecting
+ * to the metastore.
+ */
+ public static final String META_PASSWORD_KEY = "metastore.password";
+
+
+ /** Default name for the root metadata table in HSQLDB. */
+ private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT";
+
+ /** Configuration key used to override root table name. */
+ public static final String ROOT_TABLE_NAME_KEY =
+ "sqoop.hsqldb.root.table.name";
+
+ /** root metadata table key used to define the current schema version. */
+ private static final String STORAGE_VERSION_KEY =
+ "sqoop.hsqldb.job.storage.version";
+
+ /** The current version number for the schema edition. */
+ private static final int CUR_STORAGE_VERSION = 0;
+
+ /** root metadata table key used to define the job table name. */
+ private static final String SESSION_TABLE_KEY =
+ "sqoop.hsqldb.job.info.table";
+
+ /** Default value for SESSION_TABLE_KEY. */
+ private static final String DEFAULT_SESSION_TABLE_NAME =
+ "SQOOP_SESSIONS";
+
+ /** Per-job key with propClass 'schema' that defines the set of
+ * properties valid to be defined for propClass 'SqoopOptions'. */
+ private static final String PROPERTY_SET_KEY =
+ "sqoop.property.set.id";
+
+ /** Current value for PROPERTY_SET_KEY. */
+ private static final String CUR_PROPERTY_SET_ID = "0";
+
+ // The following are values for propClass in the v0 schema which
+ // describe different aspects of the stored metadata.
+
+ /** Property class for properties about the stored data itself. */
+ private static final String PROPERTY_CLASS_SCHEMA = "schema";
+
+ /** Property class for properties that are loaded into SqoopOptions. */
+ private static final String PROPERTY_CLASS_SQOOP_OPTIONS = "SqoopOptions";
+
+ /** Property class for properties that are loaded into a Configuration. */
+ private static final String PROPERTY_CLASS_CONFIG = "config";
+
+ /**
+ * Per-job key with propClass 'schema' that specifies the SqoopTool
+ * to load.
+ */
+ private static final String SQOOP_TOOL_KEY = "sqoop.tool";
+
+
+ private Map<String, String> connectedDescriptor;
+ private String metastoreConnectStr;
+ private String metastoreUser;
+ private String metastorePassword;
+ private Connection connection;
+
+ protected Connection getConnection() {
+ return this.connection;
+ }
+
+ // After connection to the database and initialization of the
+ // schema, this holds the name of the job table.
+ private String jobTableName;
+
+ protected void setMetastoreConnectStr(String connectStr) {
+ this.metastoreConnectStr = connectStr;
+ }
+
+ protected void setMetastoreUser(String user) {
+ this.metastoreUser = user;
+ }
+
+ protected void setMetastorePassword(String pass) {
+ this.metastorePassword = pass;
+ }
+
+ private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver";
+
+ /**
+ * Set the descriptor used to open() this storage.
+ */
+ protected void setConnectedDescriptor(Map<String, String> descriptor) {
+ this.connectedDescriptor = descriptor;
+ }
+
+ @Override
+ /**
+ * Initialize the connection to the database.
+ */
+ public void open(Map<String, String> descriptor) throws IOException {
+ setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
+ setMetastoreUser(descriptor.get(META_USERNAME_KEY));
+ setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
+ setConnectedDescriptor(descriptor);
+
+ init();
+ }
+
+ protected void init() throws IOException {
+ try {
+ // Load/initialize the JDBC driver.
+ Class.forName(DB_DRIVER_CLASS);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Could not load HSQLDB JDBC driver", cnfe);
+ }
+
+ try {
+ if (null == metastoreUser) {
+ this.connection = DriverManager.getConnection(metastoreConnectStr);
+ } else {
+ this.connection = DriverManager.getConnection(metastoreConnectStr,
+ metastoreUser, metastorePassword);
+ }
+
+ connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ connection.setAutoCommit(false);
+
+ // Initialize the root schema.
+ if (!rootTableExists()) {
+ createRootTable();
+ }
+
+ // Check the schema version.
+ String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, null);
+ int actualStorageVer = -1;
+ try {
+ actualStorageVer = Integer.valueOf(curStorageVerStr);
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Could not interpret as a number: " + curStorageVerStr);
+ }
+ if (actualStorageVer != CUR_STORAGE_VERSION) {
+ LOG.error("Can not interpret metadata schema");
+ LOG.error("The metadata schema version is " + curStorageVerStr);
+ LOG.error("The highest version supported is " + CUR_STORAGE_VERSION);
+ LOG.error("To use this version of Sqoop, "
+ + "you must downgrade your metadata schema.");
+ throw new IOException("Invalid metadata version.");
+ }
+
+ // Initialize the versioned schema.
+ initV0Schema();
+ } catch (SQLException sqle) {
+ if (null != connection) {
+ try {
+ connection.rollback();
+ } catch (SQLException e2) {
+ LOG.warn("Error rolling back transaction in error handler: " + e2);
+ }
+ }
+
+ throw new IOException("Exception creating SQL connection", sqle);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (null != this.connection) {
+ try {
+ LOG.debug("Flushing current transaction");
+ this.connection.commit();
+ } catch (SQLException sqlE) {
+ throw new IOException("Exception committing connection", sqlE);
+ }
+
+ try {
+ LOG.debug("Closing connection");
+ this.connection.close();
+ } catch (SQLException sqlE) {
+ throw new IOException("Exception closing connection", sqlE);
+ } finally {
+ this.connection = null;
+ }
+ }
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public boolean canAccept(Map<String, String> descriptor) {
+ // We return true if the desciptor contains a connect string to find
+ // the database.
+ return descriptor.get(META_CONNECT_KEY) != null;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public JobData read(String jobName) throws IOException {
+ try {
+ if (!jobExists(jobName)) {
+ LOG.error("Cannot restore job: " + jobName);
+ LOG.error("(No such job)");
+ throw new IOException("Cannot restore missing job " + jobName);
+ }
+
+ LOG.debug("Restoring job: " + jobName);
+ Properties schemaProps = getV0Properties(jobName,
+ PROPERTY_CLASS_SCHEMA);
+ Properties sqoopOptProps = getV0Properties(jobName,
+ PROPERTY_CLASS_SQOOP_OPTIONS);
+ Properties configProps = getV0Properties(jobName,
+ PROPERTY_CLASS_CONFIG);
+
+ // Check that we're not using a saved job from a previous
+ // version whose functionality has been deprecated.
+ String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY);
+ LOG.debug("System property set: " + CUR_PROPERTY_SET_ID);
+ LOG.debug("Stored property set: " + thisPropSetId);
+ if (!CUR_PROPERTY_SET_ID.equals(thisPropSetId)) {
+ LOG.warn("The property set present in this database was written by");
+ LOG.warn("an incompatible version of Sqoop. This may result in an");
+ LOG.warn("incomplete operation.");
+ // TODO(aaron): Should this fail out-right?
+ }
+
+ String toolName = schemaProps.getProperty(SQOOP_TOOL_KEY);
+ if (null == toolName) {
+ // Don't know what tool to create.
+ throw new IOException("Incomplete metadata; missing "
+ + SQOOP_TOOL_KEY);
+ }
+
+ SqoopTool tool = SqoopTool.getTool(toolName);
+ if (null == tool) {
+ throw new IOException("Error in job metadata: invalid tool "
+ + toolName);
+ }
+
+ Configuration conf = new Configuration();
+ for (Map.Entry<Object, Object> entry : configProps.entrySet()) {
+ conf.set(entry.getKey().toString(), entry.getValue().toString());
+ }
+
+ SqoopOptions opts = new SqoopOptions();
+ opts.setConf(conf);
+ opts.loadProperties(sqoopOptProps);
+
+ // Set the job connection information for this job.
+ opts.setJobName(jobName);
+ opts.setStorageDescriptor(connectedDescriptor);
+
+ return new JobData(opts, tool);
+ } catch (SQLException sqlE) {
+ throw new IOException("Error communicating with database", sqlE);
+ }
+ }
+
+ private boolean jobExists(String jobName) throws SQLException {
+ PreparedStatement s = connection.prepareStatement(
+ "SELECT COUNT(job_name) FROM " + this.jobTableName
+ + " WHERE job_name = ? GROUP BY job_name");
+ ResultSet rs = null;
+ try {
+ s.setString(1, jobName);
+ rs = s.executeQuery();
+ if (rs.next()) {
+ return true; // We got a result, meaning the job exists.
+ }
+ } finally {
+ if (null != rs) {
+ try {
+ rs.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("Error closing result set: " + sqlE);
+ }
+ }
+
+ s.close();
+ }
+
+ return false; // No result.
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void delete(String jobName) throws IOException {
+ try {
+ if (!jobExists(jobName)) {
+ LOG.error("No such job: " + jobName);
+ } else {
+ LOG.debug("Deleting job: " + jobName);
+ PreparedStatement s = connection.prepareStatement("DELETE FROM "
+ + this.jobTableName + " WHERE job_name = ?");
+ try {
+ s.setString(1, jobName);
+ s.executeUpdate();
+ } finally {
+ s.close();
+ }
+ connection.commit();
+ }
+ } catch (SQLException sqlEx) {
+ try {
+ connection.rollback();
+ } catch (SQLException e2) {
+ LOG.warn("Error rolling back transaction in error handler: " + e2);
+ }
+ throw new IOException("Error communicating with database", sqlEx);
+ }
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void create(String jobName, JobData data)
+ throws IOException {
+ try {
+ if (jobExists(jobName)) {
+ LOG.error("Cannot create job " + jobName
+ + ": it already exists");
+ throw new IOException("Job " + jobName + " already exists");
+ }
+ } catch (SQLException sqlE) {
+ throw new IOException("Error communicating with database", sqlE);
+ }
+
+ createInternal(jobName, data);
+ }
+
+ /**
+ * Actually insert/update the resources for this job.
+ */
+ private void createInternal(String jobName, JobData data)
+ throws IOException {
+ try {
+ LOG.debug("Creating job: " + jobName);
+
+ // Save the name of the Sqoop tool.
+ setV0Property(jobName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY,
+ data.getSqoopTool().getToolName());
+
+ // Save the property set id.
+ setV0Property(jobName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY,
+ CUR_PROPERTY_SET_ID);
+
+ // Save all properties of the SqoopOptions.
+ Properties props = data.getSqoopOptions().writeProperties();
+ setV0Properties(jobName, PROPERTY_CLASS_SQOOP_OPTIONS, props);
+
+ // And save all unique properties of the configuration.
+ Configuration saveConf = data.getSqoopOptions().getConf();
+ Configuration baseConf = new Configuration();
+
+ for (Map.Entry<String, String> entry : saveConf) {
+ String key = entry.getKey();
+ String rawVal = saveConf.getRaw(key);
+ String baseVal = baseConf.getRaw(key);
+ if (baseVal != null && rawVal.equals(baseVal)) {
+ continue; // Don't save this; it's set in the base configuration.
+ }
+
+ LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal);
+ setV0Property(jobName, PROPERTY_CLASS_CONFIG, key, rawVal);
+ }
+
+ connection.commit();
+ } catch (SQLException sqlE) {
+ try {
+ connection.rollback();
+ } catch (SQLException sqlE2) {
+ LOG.warn("Exception rolling back transaction during error handling: "
+ + sqlE2);
+ }
+ throw new IOException("Error communicating with database", sqlE);
+ }
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void update(String jobName, JobData data)
+ throws IOException {
+ try {
+ if (!jobExists(jobName)) {
+ LOG.error("Cannot update job " + jobName + ": not found");
+ throw new IOException("Job " + jobName + " does not exist");
+ }
+ } catch (SQLException sqlE) {
+ throw new IOException("Error communicating with database", sqlE);
+ }
+
+ // Since we set properties with update-or-insert, this is the same
+ // as create on this system.
+ createInternal(jobName, data);
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public List<String> list() throws IOException {
+ ResultSet rs = null;
+ try {
+ PreparedStatement s = connection.prepareStatement(
+ "SELECT DISTINCT job_name FROM " + this.jobTableName);
+ try {
+ rs = s.executeQuery();
+ ArrayList<String> jobs = new ArrayList<String>();
+ while (rs.next()) {
+ jobs.add(rs.getString(1));
+ }
+
+ return jobs;
+ } finally {
+ if (null != rs) {
+ try {
+ rs.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("Error closing resultset: " + sqlE);
+ }
+ }
+
+ if (null != s) {
+ s.close();
+ }
+ }
+ } catch (SQLException sqlE) {
+ throw new IOException("Error communicating with database", sqlE);
+ }
+ }
+
+ // Determine the name to use for the root metadata table.
+ private String getRootTableName() {
+ Configuration conf = getConf();
+ return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME);
+ }
+
+ private boolean tableExists(String table) throws SQLException {
+ LOG.debug("Checking for table: " + table);
+ DatabaseMetaData dbmd = connection.getMetaData();
+ String [] tableTypes = { "TABLE" };
+ ResultSet rs = dbmd.getTables(null, null, null, tableTypes);
+ if (null != rs) {
+ try {
+ while (rs.next()) {
+ if (table.equalsIgnoreCase(rs.getString("TABLE_NAME"))) {
+ LOG.debug("Found table: " + table);
+ return true;
+ }
+ }
+ } finally {
+ rs.close();
+ }
+ }
+
+ LOG.debug("Could not find table.");
+ return false;
+ }
+
+ private boolean rootTableExists() throws SQLException {
+ String rootTableName = getRootTableName();
+ return tableExists(rootTableName);
+ }
+
+ private void createRootTable() throws SQLException {
+ String rootTableName = getRootTableName();
+ LOG.debug("Creating root table: " + rootTableName);
+
+ // TODO: Sanity-check the value of rootTableName to ensure it is
+ // not a SQL-injection attack vector.
+ Statement s = connection.createStatement();
+ try {
+ s.executeUpdate("CREATE TABLE " + rootTableName + " ("
+ + "version INT, "
+ + "propname VARCHAR(128) NOT NULL, "
+ + "propval VARCHAR(256), "
+ + "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))");
+ } finally {
+ s.close();
+ }
+
+ setRootProperty(STORAGE_VERSION_KEY, null,
+ Integer.toString(CUR_STORAGE_VERSION));
+
+ LOG.debug("Saving root table.");
+ connection.commit();
+ }
+
+ /**
+ * Look up a value for the specified version (may be null) in the
+ * root metadata table.
+ */
+ private String getRootProperty(String propertyName, Integer version)
+ throws SQLException {
+ LOG.debug("Looking up property " + propertyName + " for version "
+ + version);
+ PreparedStatement s = null;
+ ResultSet rs = null;
+
+ try {
+ if (null == version) {
+ s = connection.prepareStatement(
+ "SELECT propval FROM " + getRootTableName()
+ + " WHERE version IS NULL AND propname = ?");
+ s.setString(1, propertyName);
+ } else {
+ s = connection.prepareStatement(
+ "SELECT propval FROM " + getRootTableName() + " WHERE version = ? "
+ + " AND propname = ?");
+ s.setInt(1, version);
+ s.setString(2, propertyName);
+ }
+
+ rs = s.executeQuery();
+ if (!rs.next()) {
+ LOG.debug(" => (no result)");
+ return null; // No such result.
+ } else {
+ String result = rs.getString(1); // Return the only result col.
+ LOG.debug(" => " + result);
+ return result;
+ }
+ } finally {
+ if (null != rs) {
+ try {
+ rs.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("Error closing resultset: " + sqlE);
+ }
+ }
+
+ if (null != s) {
+ s.close();
+ }
+ }
+ }
+
+ /**
+ * Set a value for the specified version (may be null) in the root
+ * metadata table.
+ */
+ private void setRootProperty(String propertyName, Integer version,
+ String val) throws SQLException {
+ LOG.debug("Setting property " + propertyName + " for version "
+ + version + " => " + val);
+
+ PreparedStatement s;
+ String curVal = getRootProperty(propertyName, version);
+ if (null == curVal) {
+ // INSERT the row.
+ s = connection.prepareStatement("INSERT INTO " + getRootTableName()
+ + " (propval, propname, version) VALUES ( ? , ? , ? )");
+ } else if (version == null) {
+ // UPDATE an existing row with a null version
+ s = connection.prepareStatement("UPDATE " + getRootTableName()
+ + " SET propval = ? WHERE propname = ? AND version IS NULL");
+ } else {
+ // UPDATE an existing row with non-null version.
+ s = connection.prepareStatement("UPDATE " + getRootTableName()
+ + " SET propval = ? WHERE propname = ? AND version = ?");
+ }
+
+ try {
+ s.setString(1, val);
+ s.setString(2, propertyName);
+ if (null != version) {
+ s.setInt(3, version);
+ }
+ s.executeUpdate();
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Create the jobs table in the V0 schema.
+ */
+ private void createJobTable() throws SQLException {
+ String curTableName = DEFAULT_SESSION_TABLE_NAME;
+ int tableNum = -1;
+ while (true) {
+ if (tableExists(curTableName)) {
+ tableNum++;
+ curTableName = DEFAULT_SESSION_TABLE_NAME + "_" + tableNum;
+ } else {
+ break;
+ }
+ }
+
+ // curTableName contains a table name that does not exist.
+ // Create this table.
+ LOG.debug("Creating job storage table: " + curTableName);
+ Statement s = connection.createStatement();
+ try {
+ s.executeUpdate("CREATE TABLE " + curTableName + " ("
+ + "job_name VARCHAR(64) NOT NULL, "
+ + "propname VARCHAR(128) NOT NULL, "
+ + "propval VARCHAR(1024), "
+ + "propclass VARCHAR(32) NOT NULL, "
+ + "CONSTRAINT " + curTableName + "_unq UNIQUE "
+ + "(job_name, propname, propclass))");
+
+ // Then set a property in the root table pointing to it.
+ setRootProperty(SESSION_TABLE_KEY, 0, curTableName);
+ connection.commit();
+ } finally {
+ s.close();
+ }
+
+ this.jobTableName = curTableName;
+ }
+
+ /**
+ * Given a root schema that exists,
+ * initialize a version-0 key/value storage schema on top of it,
+ * if it does not already exist.
+ */
+ private void initV0Schema() throws SQLException {
+ this.jobTableName = getRootProperty(SESSION_TABLE_KEY, 0);
+ if (null == this.jobTableName) {
+ createJobTable();
+ }
+ if (!tableExists(this.jobTableName)) {
+ LOG.debug("Could not find job table: " + jobTableName);
+ createJobTable();
+ }
+ }
+
+ /**
+ * INSERT or UPDATE a single (job, propname, class) to point
+ * to the specified property value.
+ */
+ private void setV0Property(String jobName, String propClass,
+ String propName, String propVal) throws SQLException {
+ LOG.debug("Job: " + jobName + "; Setting property "
+ + propName + " with class " + propClass + " => " + propVal);
+
+ PreparedStatement s = null;
+ try {
+ String curValue = getV0Property(jobName, propClass, propName);
+ if (null == curValue) {
+ // Property is not yet set.
+ s = connection.prepareStatement("INSERT INTO " + this.jobTableName
+ + " (propval, job_name, propclass, propname) "
+ + "VALUES (?, ?, ?, ?)");
+ } else {
+ // Overwrite existing property.
+ s = connection.prepareStatement("UPDATE " + this.jobTableName
+ + " SET propval = ? WHERE job_name = ? AND propclass = ? "
+ + "AND propname = ?");
+ }
+
+ s.setString(1, propVal);
+ s.setString(2, jobName);
+ s.setString(3, propClass);
+ s.setString(4, propName);
+
+ s.executeUpdate();
+ } finally {
+ if (null != s) {
+ s.close();
+ }
+ }
+ }
+
+ /**
+ * Return a string containing the value of a specified property,
+ * or null if it is not set.
+ */
+ private String getV0Property(String jobName, String propClass,
+ String propertyName) throws SQLException {
+ LOG.debug("Job: " + jobName + "; Getting property "
+ + propertyName + " with class " + propClass);
+
+ ResultSet rs = null;
+ PreparedStatement s = connection.prepareStatement(
+ "SELECT propval FROM " + this.jobTableName
+ + " WHERE job_name = ? AND propclass = ? AND propname = ?");
+
+ try {
+ s.setString(1, jobName);
+ s.setString(2, propClass);
+ s.setString(3, propertyName);
+ rs = s.executeQuery();
+
+ if (!rs.next()) {
+ LOG.debug(" => (no result)");
+ return null;
+ }
+
+ String result = rs.getString(1);
+ LOG.debug(" => " + result);
+ return result;
+ } finally {
+ if (null != rs) {
+ try {
+ rs.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("Error closing resultset: " + sqlE);
+ }
+ }
+
+ s.close();
+ }
+ }
+
+ /**
+ * Get a java.util.Properties containing all propName -> propVal
+ * bindings for a given (jobName, propClass).
+ */
+ private Properties getV0Properties(String jobName, String propClass)
+ throws SQLException {
+ LOG.debug("Job: " + jobName
+ + "; Getting properties with class " + propClass);
+
+ ResultSet rs = null;
+ PreparedStatement s = connection.prepareStatement(
+ "SELECT propname, propval FROM " + this.jobTableName
+ + " WHERE job_name = ? AND propclass = ?");
+ try {
+ s.setString(1, jobName);
+ s.setString(2, propClass);
+ rs = s.executeQuery();
+
+ Properties p = new Properties();
+ while (rs.next()) {
+ p.setProperty(rs.getString(1), rs.getString(2));
+ }
+
+ return p;
+ } finally {
+ if (null != rs) {
+ try {
+ rs.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("Error closing result set: " + sqlE);
+ }
+ }
+
+ s.close();
+ }
+ }
+
+ private void setV0Properties(String jobName, String propClass,
+ Properties properties) throws SQLException {
+ LOG.debug("Job: " + jobName
+ + "; Setting bulk properties for class " + propClass);
+
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ String key = entry.getKey().toString();
+ String val = entry.getValue().toString();
+ setV0Property(jobName, propClass, key, val);
+ }
+ }
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
|