SQOOP-3216: Expanded Metastore support for MySql, Oracle, Postgresql, MSSql, and DB2

(Zach Berkowitz via Boglarka Egyed)
diff --git a/src/docs/user/metastore-purpose.txt b/src/docs/user/metastore-purpose.txt
index e7eb23d..95c2d77 100644
--- a/src/docs/user/metastore-purpose.txt
+++ b/src/docs/user/metastore-purpose.txt
@@ -17,11 +17,28 @@
   limitations under the License.
 ////
 
-The +metastore+ tool configures Sqoop to host a shared metadata repository.
+The +metastore+ tool configures Sqoop to host a shared Hsqldb metadata repository.
 Multiple users and/or remote users can define and execute saved jobs (created
 with +sqoop job+) defined in this metastore.
 
 Clients must be configured to connect to the metastore in +sqoop-site.xml+ or
-with the +--meta-connect+ argument.
+with the +--meta-connect+ argument. These commands MySql, Hsqldb, PostgreSql, Oracle, DB2,
+and SqlServer databases as well. All services other than Hsqldb and Postgres require the
+download of the corresponding JDBC driver and connect string structured in the correct format.
+
+Migration of metastore data from one database service to another is not directly supported, but is possible.
+
+.JDBC Connect String Formats:
+[grid="all"]
+`---------------------------`------------------------------------------
+Service                       Connect String Format
+-----------------------------------------------------------------------
+  +MySQL+                      jdbc:mysql://<server>:<port>/<dbname>
+  +HSQLDB+                     jdbc:hsqldb:hsql://<server>:<port>/<dbname>
+  +PostgreSQL+                 jdbc:postgresql://<server>:<port>/<dbname>
+  +Oracle+                     jdbc:oracle:thin:@//<server>:<port>/<SID>
+  +DB2+                        jdbc:db2://<server>:<port>/<dbname>
+  +MSSQL+                      jdbc:sqlserver://<server>:<port>;database=<dbname>
+-----------------------------------------------------------------------
 
 
diff --git a/src/docs/user/saved-jobs.txt b/src/docs/user/saved-jobs.txt
index e875780..d21df02 100644
--- a/src/docs/user/saved-jobs.txt
+++ b/src/docs/user/saved-jobs.txt
@@ -131,6 +131,8 @@
 -----------------------------------------------------------------------
 +\--meta-connect <jdbc-uri>+ Specifies the JDBC connect string used \
                              to connect to the metastore
++\--meta-username <username>+    Specifies the username for the metastore database
++\--meta-password <password>+    Specifies the password for the metastore database
 -----------------------------------------------------------------------
 
 By default, a private metastore is instantiated in +$HOME/.sqoop+. If
@@ -148,6 +150,15 @@
 If you configure +sqoop.metastore.client.enable.autoconnect+ with the
 value +false+, then you must explicitly supply +\--meta-connect+.
 
+Job data can be stored in MySql, PostgreSql, DB2, SqlServer, and Oracle with
+the +\--meta-connect+ argument. The +\--meta-username+ and +\--meta-password+ arguments are necessary
+if the database containing the saved jobs requires a username and password.
+
+----
+$ sqoop job --exec myjob --meta-connect jdbc:hsqldb:hsql://localhost:3000/ --meta-username *username* --meta-password *password*
+
+----
+
 .Common options:
 [grid="all"]
 `---------------------------`------------------------------------------
@@ -229,7 +240,7 @@
 
 Clients should connect to the metastore by specifying
 +sqoop.metastore.client.autoconnect.url+ or +\--meta-connect+ with the
-value +jdbc:hsqldb:hsql://<server-name>:<port>/sqoop+. For example,
+JDBC-URI string. For example,
 +jdbc:hsqldb:hsql://metaserver.example.com:16000/sqoop+.
 
 This metastore may be hosted on a machine within the Hadoop cluster, or
diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java b/src/java/com/cloudera/sqoop/metastore/GenericJobStorage.java
similarity index 70%
rename from src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java
rename to src/java/com/cloudera/sqoop/metastore/GenericJobStorage.java
index 083e2a3..d42e5a3 100644
--- a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java
+++ b/src/java/com/cloudera/sqoop/metastore/GenericJobStorage.java
@@ -15,22 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.cloudera.sqoop.metastore.hsqldb;
+package com.cloudera.sqoop.metastore;
 
 /**
  * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class HsqldbJobStorage
-    extends org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage {
+public class GenericJobStorage
+    extends org.apache.sqoop.metastore.GenericJobStorage {
 
   public static final String META_CONNECT_KEY =
-      org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_CONNECT_KEY;
+      org.apache.sqoop.metastore.GenericJobStorage.META_CONNECT_KEY;
   public static final String META_USERNAME_KEY =
-      org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_USERNAME_KEY;
+      org.apache.sqoop.metastore.GenericJobStorage.META_USERNAME_KEY;
   public static final String META_PASSWORD_KEY =
-      org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_PASSWORD_KEY;
+      org.apache.sqoop.metastore.GenericJobStorage.META_PASSWORD_KEY;
   public static final String ROOT_TABLE_NAME_KEY =
-      org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.ROOT_TABLE_NAME_KEY;
+      org.apache.sqoop.metastore.GenericJobStorage.ROOT_TABLE_NAME_KEY;
 
 }
 
diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
deleted file mode 100644
index 259d9f6..0000000
--- a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.sqoop.metastore.hsqldb;
-
-/**
- * @deprecated Moving to use org.apache.sqoop namespace.
- */
-public class AutoHsqldbStorage
-    extends org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage {
-
-  public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
-    org.apache.sqoop.metastore.hsqldb.
-        AutoHsqldbStorage.AUTO_STORAGE_IS_ACTIVE_KEY;
-  public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
-    org.apache.sqoop.metastore.hsqldb.
-        AutoHsqldbStorage.AUTO_STORAGE_CONNECT_STRING_KEY;
-  public static final String AUTO_STORAGE_USER_KEY =
-    org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_USER_KEY;
-  public static final String AUTO_STORAGE_PASS_KEY =
-    org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_PASS_KEY;
-  public static final String DEFAULT_AUTO_PASSWORD =
-    org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.DEFAULT_AUTO_PASSWORD;
-
-}
-
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index 2eb3d8a..587d4e1 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.accumulo.AccumuloConstants;
 import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
+import org.apache.sqoop.metastore.GenericJobStorage;
 import org.apache.sqoop.tool.BaseSqoopTool;
 import org.apache.sqoop.util.CredentialsUtil;
 import org.apache.sqoop.util.LoggingUtils;
@@ -391,6 +392,10 @@
   @StoredAsProperty(ORACLE_ESCAPING_DISABLED)
   private boolean oracleEscapingDisabled;
 
+  private String metaConnectStr;
+  private String metaUsername;
+  private String metaPassword;
+
   public SqoopOptions() {
     initDefaults(null);
   }
@@ -1076,6 +1081,25 @@
 
     // set escape column mapping to true
     this.escapeColumnMappingEnabled = true;
+
+    this.metaConnectStr =
+            System.getProperty(GenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY, getLocalAutoConnectString());
+    this.metaUsername =
+            System.getProperty(GenericJobStorage.AUTO_STORAGE_USER_KEY, GenericJobStorage.DEFAULT_AUTO_USER);
+    this.metaPassword =
+            System.getProperty(GenericJobStorage.AUTO_STORAGE_PASS_KEY, GenericJobStorage.DEFAULT_AUTO_PASSWORD);
+  }
+
+  private String getLocalAutoConnectString() {
+    String homeDir = System.getProperty("user.home");
+
+    File homeDirObj = new File(homeDir);
+    File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
+    File databaseFileObj = new File(sqoopDataDirObj, "metastore.db");
+
+    String dbFileStr = databaseFileObj.toString();
+    return "jdbc:hsqldb:file:" + dbFileStr
+            + ";hsqldb.write_delay=false;shutdown=true";
   }
 
   /**
@@ -2787,5 +2811,15 @@
     }
 
 
+  public String getMetaConnectStr() {
+    return metaConnectStr;
+  }
+  public String getMetaUsername() {
+    return metaUsername;
+  }
+
+  public String getMetaPassword() {
+    return metaPassword;
+  }
 }
 
diff --git a/src/java/org/apache/sqoop/manager/CubridManager.java b/src/java/org/apache/sqoop/manager/CubridManager.java
index 5a1a0e8..73b91d0 100644
--- a/src/java/org/apache/sqoop/manager/CubridManager.java
+++ b/src/java/org/apache/sqoop/manager/CubridManager.java
@@ -17,6 +17,8 @@
  */
 package org.apache.sqoop.manager;
 
+import static org.apache.sqoop.manager.JdbcDrivers.CUBRID;
+
 import java.io.IOException;
 import java.sql.Types;
 import java.util.Map;
@@ -42,12 +44,8 @@
   public static final Log LOG = LogFactory
       .getLog(CubridManager.class.getName());
 
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS =
-      "cubrid.jdbc.driver.CUBRIDDriver";
-
   public CubridManager(final SqoopOptions opts) {
-    super(DRIVER_CLASS, opts);
+    super(CUBRID.getDriverClass(), opts);
   }
 
   @Override
diff --git a/src/java/org/apache/sqoop/manager/Db2Manager.java b/src/java/org/apache/sqoop/manager/Db2Manager.java
index 61b6868..7525521 100644
--- a/src/java/org/apache/sqoop/manager/Db2Manager.java
+++ b/src/java/org/apache/sqoop/manager/Db2Manager.java
@@ -17,6 +17,8 @@
  */
 package org.apache.sqoop.manager;
 
+import static org.apache.sqoop.manager.JdbcDrivers.DB2;
+
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -51,10 +53,6 @@
   public static final Log LOG = LogFactory.getLog(
       Db2Manager.class.getName());
 
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS =
-      "com.ibm.db2.jcc.DB2Driver";
-
   private static final String XML_TO_JAVA_DATA_TYPE = "String";
 
   private Map<String, String> columnTypeNames;
@@ -82,7 +80,7 @@
   private String schema = null;
 
   public Db2Manager(final SqoopOptions opts) {
-    super(DRIVER_CLASS, opts);
+    super(DB2.getDriverClass(), opts);
 
     // Try to parse extra arguments
     try {
diff --git a/src/java/org/apache/sqoop/manager/HsqldbManager.java b/src/java/org/apache/sqoop/manager/HsqldbManager.java
index 9b9c582..92b7d53 100644
--- a/src/java/org/apache/sqoop/manager/HsqldbManager.java
+++ b/src/java/org/apache/sqoop/manager/HsqldbManager.java
@@ -18,16 +18,15 @@
 
 package org.apache.sqoop.manager;
 
-import java.io.IOException;
+import static org.apache.sqoop.manager.JdbcDrivers.HSQLDB;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.cloudera.sqoop.SqoopOptions;
-
 import com.cloudera.sqoop.mapreduce.AsyncSqlOutputFormat;
-
 import com.cloudera.sqoop.util.ExportException;
+import java.io.IOException;
 
 /**
  * Manages connections to hsqldb databases.
@@ -39,15 +38,12 @@
   public static final Log LOG = LogFactory.getLog(
       HsqldbManager.class.getName());
 
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
-
   // HsqlDb doesn't have a notion of multiple "databases"; the user's database
   // is always called "PUBLIC".
   private static final String HSQL_SCHEMA_NAME = "PUBLIC";
 
   public HsqldbManager(final SqoopOptions opts) {
-    super(DRIVER_CLASS, opts);
+    super(HSQLDB.getDriverClass(), opts);
   }
 
   /**
diff --git a/src/java/org/apache/sqoop/manager/JdbcDrivers.java b/src/java/org/apache/sqoop/manager/JdbcDrivers.java
new file mode 100644
index 0000000..20bdc98
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/JdbcDrivers.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+public enum JdbcDrivers {
+    MYSQL("com.mysql.jdbc.Driver", "jdbc:mysql:"), POSTGRES("org.postgresql.Driver", "jdbc:postgresql:"),
+    HSQLDB("org.hsqldb.jdbcDriver","jdbc:hsqldb:"), ORACLE("oracle.jdbc.OracleDriver","jdbc:oracle:"),
+    SQLSERVER("com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver:"),
+    JTDS_SQLSERVER("net.sourceforge.jtds.jdbc.Driver", "jdbc:jtds:sqlserver:"),
+    DB2("com.ibm.db2.jcc.DB2Driver", "jdbc:db2:"), NETEZZA("org.netezza.Driver", "jdbc:netezza:"),
+    CUBRID("cubrid.jdbc.driver.CUBRIDDriver", "jdbc:cubrid:");
+
+    private final String driverClass;
+    private final String schemePrefix;
+
+    JdbcDrivers(String driverClass, String schemePrefix) {
+        this.driverClass = driverClass;
+        this.schemePrefix = schemePrefix;
+    }
+
+    public String getDriverClass() {
+        return driverClass;
+    }
+
+    public String getSchemePrefix() {
+        return schemePrefix;
+    }
+}
diff --git a/src/java/org/apache/sqoop/manager/MySQLManager.java b/src/java/org/apache/sqoop/manager/MySQLManager.java
index 3c2276f..ba612e2 100644
--- a/src/java/org/apache/sqoop/manager/MySQLManager.java
+++ b/src/java/org/apache/sqoop/manager/MySQLManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop.manager;
 
+import static org.apache.sqoop.manager.JdbcDrivers.MYSQL;
+
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URI;
@@ -27,8 +29,6 @@
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -53,16 +53,13 @@
 
   public static final Log LOG = LogFactory.getLog(MySQLManager.class.getName());
 
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
-
   // set to true after we warn the user that we can use direct fastpath.
   private static boolean warningPrinted = false;
 
   private static final String EXPORT_OPERATION = "export";
 
   public MySQLManager(final SqoopOptions opts) {
-    super(DRIVER_CLASS, opts);
+    super(MYSQL.getDriverClass(), opts);
   }
 
   @Override
diff --git a/src/java/org/apache/sqoop/manager/NetezzaManager.java b/src/java/org/apache/sqoop/manager/NetezzaManager.java
index 0ac7717..8c21073 100644
--- a/src/java/org/apache/sqoop/manager/NetezzaManager.java
+++ b/src/java/org/apache/sqoop/manager/NetezzaManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop.manager;
 
+import static org.apache.sqoop.manager.JdbcDrivers.NETEZZA;
+
 import java.io.IOException;
 import java.sql.SQLException;
 
@@ -45,9 +47,6 @@
   public static final Log LOG = LogFactory.getLog(NetezzaManager.class
       .getName());
 
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS = "org.netezza.Driver";
-
   // set to true after we warn the user that we can use direct fastpath.
   protected static boolean directModeWarningPrinted = false;
 
@@ -62,7 +61,7 @@
       "partitioned-access";
 
   public NetezzaManager(final SqoopOptions opts) {
-    super(DRIVER_CLASS, opts);
+    super(NETEZZA.getDriverClass(), opts);
   }
 
 
diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java
index 2f4585c..c0f5114 100644
--- a/src/java/org/apache/sqoop/manager/OracleManager.java
+++ b/src/java/org/apache/sqoop/manager/OracleManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop.manager;
 
+import static org.apache.sqoop.manager.JdbcDrivers.ORACLE;
+
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.sql.Connection;
@@ -118,9 +120,6 @@
   public static final String QUERY_GET_SESSIONUSER =
      "SELECT USER FROM DUAL";
 
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
-
   // Configuration key to use to set the session timezone.
   public static final String ORACLE_TIMEZONE_KEY = "oracle.sessionTimeZone";
 
@@ -247,7 +246,7 @@
   }
 
   public OracleManager(final SqoopOptions opts) {
-    super(DRIVER_CLASS, opts);
+    super(ORACLE.getDriverClass(), opts);
   }
 
   public void close() throws SQLException {
diff --git a/src/java/org/apache/sqoop/manager/PostgresqlManager.java b/src/java/org/apache/sqoop/manager/PostgresqlManager.java
index 44e041a..29f7c7c 100644
--- a/src/java/org/apache/sqoop/manager/PostgresqlManager.java
+++ b/src/java/org/apache/sqoop/manager/PostgresqlManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop.manager;
 
+import static org.apache.sqoop.manager.JdbcDrivers.POSTGRES;
+
 import java.io.IOException;
 import java.sql.SQLException;
 
@@ -44,9 +46,6 @@
   public static final Log LOG = LogFactory.getLog(
       PostgresqlManager.class.getName());
 
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS = "org.postgresql.Driver";
-
   // set to true after we warn the user that we can use direct fastpath.
   private static boolean warningPrinted = false;
 
@@ -56,7 +55,7 @@
   private String schema;
 
   public PostgresqlManager(final SqoopOptions opts) {
-    super(DRIVER_CLASS, opts);
+    super(POSTGRES.getDriverClass(), opts);
 
     // Try to parse extra arguments
     try {
diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java
index 9a3d918..cc5a1b4 100644
--- a/src/java/org/apache/sqoop/manager/SQLServerManager.java
+++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop.manager;
 
+import static org.apache.sqoop.manager.JdbcDrivers.SQLSERVER;
+
 import java.io.IOException;
 
 import org.apache.commons.cli.CommandLine;
@@ -68,10 +70,6 @@
   public static final String IDENTITY_INSERT_PROP =
       "org.apache.sqoop.manager.sqlserver.table.identity";
 
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS =
-      "com.microsoft.sqlserver.jdbc.SQLServerDriver";
-
   // Define SQL Server specific types that are not covered by parent classes
   private static final int DATETIMEOFFSET = -155;
 
@@ -91,7 +89,7 @@
   private boolean identityInserts;
 
   public SQLServerManager(final SqoopOptions opts) {
-    this(DRIVER_CLASS, opts);
+    this(SQLSERVER.getDriverClass(), opts);
   }
 
   public SQLServerManager(final String driver, final SqoopOptions opts) {
diff --git a/src/java/org/apache/sqoop/manager/SupportedManagers.java b/src/java/org/apache/sqoop/manager/SupportedManagers.java
index 8a6037a..1b65a9a 100644
--- a/src/java/org/apache/sqoop/manager/SupportedManagers.java
+++ b/src/java/org/apache/sqoop/manager/SupportedManagers.java
@@ -24,8 +24,11 @@
 
 
 public enum SupportedManagers {
-    MYSQL("jdbc:mysql:", true), POSTGRES("jdbc:postgresql:", true), HSQLDB("jdbc:hsqldb:", false), ORACLE("jdbc:oracle:", true), SQLSERVER("jdbc:sqlserver:", false),
-    JTDS_SQLSERVER("jdbc:jtds:sqlserver:", false), DB2("jdbc:db2:", false), NETEZZA("jdbc:netezza:", true), CUBRID("jdbc:cubrid:", false);
+    MYSQL(JdbcDrivers.MYSQL.getSchemePrefix(), true), POSTGRES(JdbcDrivers.POSTGRES.getSchemePrefix(), true),
+    HSQLDB(JdbcDrivers.HSQLDB.getSchemePrefix(), false), ORACLE(JdbcDrivers.ORACLE.getSchemePrefix(), true),
+    SQLSERVER(JdbcDrivers.SQLSERVER.getSchemePrefix(), false),  CUBRID(JdbcDrivers.CUBRID.getSchemePrefix(), false),
+    JTDS_SQLSERVER(JdbcDrivers.JTDS_SQLSERVER.getSchemePrefix(), false), DB2(JdbcDrivers.DB2.getSchemePrefix(), false),
+    NETEZZA(JdbcDrivers.NETEZZA.getSchemePrefix(), true);
 
     private final String schemePrefix;
 
diff --git a/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java b/src/java/org/apache/sqoop/metastore/GenericJobStorage.java
similarity index 78%
rename from src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java
rename to src/java/org/apache/sqoop/metastore/GenericJobStorage.java
index a0f29fd..9e1b18b 100644
--- a/src/java/org/apache/sqoop/metastore/hsqldb/HsqldbJobStorage.java
+++ b/src/java/org/apache/sqoop/metastore/GenericJobStorage.java
@@ -15,13 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.sqoop.metastore.hsqldb;
+package org.apache.sqoop.metastore;
 
 import java.io.IOException;
 
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -31,6 +29,7 @@
 import java.util.Map;
 import java.util.Properties;
 
+import com.cloudera.sqoop.manager.ConnManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -40,15 +39,16 @@
 import com.cloudera.sqoop.metastore.JobData;
 import com.cloudera.sqoop.metastore.JobStorage;
 import com.cloudera.sqoop.tool.SqoopTool;
+import org.apache.sqoop.manager.DefaultManagerFactory;
 
 /**
- * JobStorage implementation that uses an HSQLDB-backed database to
+ * JobStorage implementation that uses a database to
  * hold job information.
  */
-public class HsqldbJobStorage extends JobStorage {
+public class GenericJobStorage extends JobStorage {
 
   public static final Log LOG = LogFactory.getLog(
-      HsqldbJobStorage.class.getName());
+      GenericJobStorage.class.getName());
 
   /** descriptor key identifying the connect string for the metastore. */
   public static final String META_CONNECT_KEY = "metastore.connect.string";
@@ -63,24 +63,36 @@
    */
   public static final String META_PASSWORD_KEY = "metastore.password";
 
+  /** descriptor key identifying the class name of the jdbc driver */
+  public static final String META_DRIVER_KEY = "metastore.driver.class";
 
-  /** Default name for the root metadata table in HSQLDB. */
+  /** Default name for the root metadata table. */
   private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT";
 
   /** Configuration key used to override root table name. */
   public static final String ROOT_TABLE_NAME_KEY =
-       "sqoop.hsqldb.root.table.name";
+       "sqoop.root.table.name";
 
   /** root metadata table key used to define the current schema version. */
   private static final String STORAGE_VERSION_KEY =
-      "sqoop.hsqldb.job.storage.version";
+      "sqoop.job.storage.version";
 
   /** The current version number for the schema edition. */
   private static final int CUR_STORAGE_VERSION = 0;
 
+  /** This value represents an invalid version */
+  private static final int NO_VERSION = -1;
+
   /** root metadata table key used to define the job table name. */
   private static final String SESSION_TABLE_KEY =
-      "sqoop.hsqldb.job.info.table";
+      "sqoop.job.info.table";
+
+  /** Outdated key for job table data, kept for backward compatibility */
+  public static final String HSQLDB_TABLE_KEY = "sqoop.hsqldb.job.info.table";
+
+  /** Outdated key for schema version, kept for backward compatibility */
+  private static final String HSQLDB_VERSION_KEY =
+          "sqoop.hsqldb.job.storage.version";
 
   /** Default value for SESSION_TABLE_KEY. */
   private static final String DEFAULT_SESSION_TABLE_NAME =
@@ -107,17 +119,50 @@
   private static final String PROPERTY_CLASS_CONFIG = "config";
 
   /**
+   * Configuration key specifying whether this storage agent is active.
+   * Defaults to "on" to allow zero-conf local users.
+   */
+  public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
+          "sqoop.metastore.client.enable.autoconnect";
+
+  /**
+   * Configuration key specifying the connect string used by this
+   * storage agent.
+   */
+  public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
+          "sqoop.metastore.client.autoconnect.url";
+
+  /**
+   * Configuration key specifying the username to bind with.
+   */
+  public static final String AUTO_STORAGE_USER_KEY =
+          "sqoop.metastore.client.autoconnect.username";
+
+
+  /** HSQLDB default user is named 'SA'. */
+  public static final String DEFAULT_AUTO_USER = "SA";
+
+  /**
+   * Configuration key specifying the password to bind with.
+   */
+  public static final String AUTO_STORAGE_PASS_KEY =
+          "sqoop.metastore.client.autoconnect.password";
+
+  /** HSQLDB default user has an empty password. */
+  public static final String DEFAULT_AUTO_PASSWORD = "";
+
+  /**
    * Per-job key with propClass 'schema' that specifies the SqoopTool
    * to load.
    */
   private static final String SQOOP_TOOL_KEY = "sqoop.tool";
-
-
   private Map<String, String> connectedDescriptor;
   private String metastoreConnectStr;
   private String metastoreUser;
   private String metastorePassword;
   private Connection connection;
+  private String driverClass;
+  private ConnManager connManager;
 
   protected Connection getConnection() {
     return this.connection;
@@ -139,8 +184,9 @@
     this.metastorePassword = pass;
   }
 
-  private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver";
-
+  protected void setDriverClass(String driverClass) {
+    this.driverClass = driverClass;
+  }
   /**
    * Set the descriptor used to open() this storage.
    */
@@ -156,6 +202,7 @@
     setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
     setMetastoreUser(descriptor.get(META_USERNAME_KEY));
     setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
+    setDriverClass(descriptor.get(META_DRIVER_KEY));
     setConnectedDescriptor(descriptor);
 
     init();
@@ -163,21 +210,10 @@
 
   protected void init() throws IOException {
     try {
-      // Load/initialize the JDBC driver.
-      Class.forName(DB_DRIVER_CLASS);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException("Could not load HSQLDB JDBC driver", cnfe);
-    }
+      connManager = createConnManager();
+      connection = connManager.getConnection();
 
-    try {
-      if (null == metastoreUser) {
-        this.connection = DriverManager.getConnection(metastoreConnectStr);
-      } else {
-        this.connection = DriverManager.getConnection(metastoreConnectStr,
-            metastoreUser, metastorePassword);
-      }
-
-      connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+      connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
       connection.setAutoCommit(false);
 
       // Initialize the root schema.
@@ -186,8 +222,15 @@
       }
 
       // Check the schema version.
-      String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, null);
-      int actualStorageVer = -1;
+      String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, NO_VERSION);
+
+      // If schema version is not present under the current key,
+      // sets it correctly. Present for backward compatibility
+      if (curStorageVerStr == null) {
+        setRootProperty(STORAGE_VERSION_KEY, NO_VERSION, Integer.toString(CUR_STORAGE_VERSION));
+        curStorageVerStr = Integer.toString(CUR_STORAGE_VERSION);
+      }
+      int actualStorageVer = NO_VERSION;
       try {
         actualStorageVer = Integer.valueOf(curStorageVerStr);
       } catch (NumberFormatException nfe) {
@@ -219,31 +262,25 @@
 
   @Override
   public void close() throws IOException {
-    if (null != this.connection) {
       try {
-        LOG.debug("Flushing current transaction");
-        this.connection.commit();
+          LOG.debug("Closing connection manager");
+          connManager.close();
       } catch (SQLException sqlE) {
-        throw new IOException("Exception committing connection", sqlE);
-      }
-
-      try {
-        LOG.debug("Closing connection");
-        this.connection.close();
-      } catch (SQLException sqlE) {
-        throw new IOException("Exception closing connection", sqlE);
+          throw new IOException("Exception closing connection manager", sqlE);
       } finally {
-        this.connection = null;
+          this.connection = null;
       }
-    }
   }
 
   @Override
   /** {@inheritDoc} */
   public boolean canAccept(Map<String, String> descriptor) {
     // We return true if the desciptor contains a connect string to find
-    // the database.
-    return descriptor.get(META_CONNECT_KEY) != null;
+    // the database or auto-connect is enabled
+    Configuration conf = this.getConf();
+    boolean metaConnectTrue = descriptor.get(META_CONNECT_KEY) != null;
+    boolean autoConnectEnabled = conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
+    return metaConnectTrue || autoConnectEnabled;
   }
 
   @Override
@@ -310,7 +347,7 @@
 
   private boolean jobExists(String jobName) throws SQLException {
     PreparedStatement s = connection.prepareStatement(
-        "SELECT COUNT(job_name) FROM " + this.jobTableName
+        "SELECT COUNT(job_name) FROM " + connManager.escapeTableName(this.jobTableName)
         + " WHERE job_name = ? GROUP BY job_name");
     ResultSet rs = null;
     try {
@@ -343,7 +380,7 @@
       } else {
         LOG.debug("Deleting job: " + jobName);
         PreparedStatement s = connection.prepareStatement("DELETE FROM "
-            + this.jobTableName + " WHERE job_name = ?");
+            + connManager.escapeTableName(this.jobTableName) + " WHERE job_name = ?");
         try {
           s.setString(1, jobName);
           s.executeUpdate();
@@ -451,7 +488,7 @@
     ResultSet rs = null;
     try {
       PreparedStatement s = connection.prepareStatement(
-          "SELECT DISTINCT job_name FROM " + this.jobTableName);
+          "SELECT DISTINCT job_name FROM " + connManager.escapeTableName(this.jobTableName));
       try {
         rs = s.executeQuery();
         ArrayList<String> jobs = new ArrayList<String>();
@@ -481,28 +518,20 @@
   // Determine the name to use for the root metadata table.
   private String getRootTableName() {
     Configuration conf = getConf();
-    return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME);
+    return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME).toUpperCase();
   }
 
-  private boolean tableExists(String table) throws SQLException {
-    LOG.debug("Checking for table: " + table);
-    DatabaseMetaData dbmd = connection.getMetaData();
-    String [] tableTypes = { "TABLE" };
-    ResultSet rs = dbmd.getTables(null, null, null, tableTypes);
-    if (null != rs) {
-      try {
-        while (rs.next()) {
-          if (table.equalsIgnoreCase(rs.getString("TABLE_NAME"))) {
-            LOG.debug("Found table: " + table);
-            return true;
-          }
-        }
-      } finally {
-        rs.close();
+  private String getEscapedRootTableName() {
+    return connManager.escapeTableName(getRootTableName());
+  }
+
+  private boolean tableExists(String tableToCheck) throws SQLException {
+    String[] tables = connManager.listTables();
+    for (String table : tables) {
+      if (table.equals(tableToCheck)) {
+        return true;
       }
     }
-
-    LOG.debug("Could not find table.");
     return false;
   }
 
@@ -519,8 +548,8 @@
     // not a SQL-injection attack vector.
     Statement s = connection.createStatement();
     try {
-      s.executeUpdate("CREATE TABLE " + rootTableName + " ("
-          + "version INT, "
+      s.executeUpdate("CREATE TABLE " + getEscapedRootTableName() + " ("
+          + "version INT NOT NULL, "
           + "propname VARCHAR(128) NOT NULL, "
           + "propval VARCHAR(256), "
           + "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))");
@@ -528,7 +557,7 @@
       s.close();
     }
 
-    setRootProperty(STORAGE_VERSION_KEY, null,
+    setRootProperty(STORAGE_VERSION_KEY, NO_VERSION,
         Integer.toString(CUR_STORAGE_VERSION));
 
     LOG.debug("Saving root table.");
@@ -549,12 +578,12 @@
     try {
       if (null == version) {
         s = connection.prepareStatement(
-          "SELECT propval FROM " + getRootTableName()
+          "SELECT propval FROM " + getEscapedRootTableName()
           + " WHERE version IS NULL AND propname = ?");
         s.setString(1, propertyName);
       } else {
         s = connection.prepareStatement(
-          "SELECT propval FROM " + getRootTableName() + " WHERE version = ? "
+          "SELECT propval FROM " + getEscapedRootTableName() + " WHERE version = ? "
           + " AND propname = ?");
         s.setInt(1, version);
         s.setString(2, propertyName);
@@ -597,22 +626,21 @@
     String curVal = getRootProperty(propertyName, version);
     if (null == curVal) {
       // INSERT the row.
-      s = connection.prepareStatement("INSERT INTO " + getRootTableName()
+      s = connection.prepareStatement("INSERT INTO " + getEscapedRootTableName()
           + " (propval, propname, version) VALUES ( ? , ? , ? )");
-    } else if (version == null) {
-      // UPDATE an existing row with a null version
-      s = connection.prepareStatement("UPDATE " + getRootTableName()
-          + " SET propval = ? WHERE  propname = ? AND version IS NULL");
     } else {
       // UPDATE an existing row with non-null version.
-      s = connection.prepareStatement("UPDATE " + getRootTableName()
+      s = connection.prepareStatement("UPDATE " + getEscapedRootTableName()
           + " SET propval = ? WHERE  propname = ? AND version = ?");
     }
 
     try {
       s.setString(1, val);
       s.setString(2, propertyName);
-      if (null != version) {
+      //Replaces null value with -1 constant, for backward compatibility
+      if (null == version) {
+       s.setInt(3, NO_VERSION);
+      } else {
         s.setInt(3, version);
       }
       s.executeUpdate();
@@ -641,7 +669,7 @@
     LOG.debug("Creating job storage table: " + curTableName);
     Statement s = connection.createStatement();
     try {
-      s.executeUpdate("CREATE TABLE " + curTableName + " ("
+      s.executeUpdate("CREATE TABLE " + connManager.escapeTableName(curTableName) + " ("
           + "job_name VARCHAR(64) NOT NULL, "
           + "propname VARCHAR(128) NOT NULL, "
           + "propval VARCHAR(1024), "
@@ -666,6 +694,9 @@
    */
   private void initV0Schema() throws SQLException {
     this.jobTableName = getRootProperty(SESSION_TABLE_KEY, 0);
+
+    checkForOldRootProperties();
+
     if (null == this.jobTableName) {
       createJobTable();
     }
@@ -675,6 +706,16 @@
     }
   }
 
+  /** Checks to see if there is an existing job table under the old root table schema
+   *  and reconfigures under the present schema, present for backward compatibility. **/
+  private void checkForOldRootProperties() throws SQLException {
+    String hsqldbStorageJobTableName = getRootProperty(HSQLDB_TABLE_KEY, 0);
+    if(hsqldbStorageJobTableName != null && this.jobTableName == null) {
+      this.jobTableName = hsqldbStorageJobTableName;
+      setRootProperty(SESSION_TABLE_KEY, 0, jobTableName);
+    }
+  }
+
   /**
    * INSERT or UPDATE a single (job, propname, class) to point
    * to the specified property value.
@@ -689,12 +730,12 @@
       String curValue = getV0Property(jobName, propClass, propName);
       if (null == curValue) {
         // Property is not yet set.
-        s = connection.prepareStatement("INSERT INTO " + this.jobTableName
+        s = connection.prepareStatement("INSERT INTO " + connManager.escapeTableName(this.jobTableName)
             + " (propval, job_name, propclass, propname) "
             + "VALUES (?, ?, ?, ?)");
       } else {
         // Overwrite existing property.
-        s = connection.prepareStatement("UPDATE " + this.jobTableName
+        s = connection.prepareStatement("UPDATE " + connManager.escapeTableName(this.jobTableName)
             + " SET propval = ? WHERE job_name = ? AND propclass = ? "
             + "AND propname = ?");
       }
@@ -723,7 +764,7 @@
 
     ResultSet rs = null;
     PreparedStatement s = connection.prepareStatement(
-        "SELECT propval FROM " + this.jobTableName
+        "SELECT propval FROM " + connManager.escapeTableName(this.jobTableName)
         + " WHERE job_name = ? AND propclass = ? AND propname = ?");
 
     try {
@@ -764,7 +805,7 @@
 
     ResultSet rs = null;
     PreparedStatement s = connection.prepareStatement(
-        "SELECT propname, propval FROM " + this.jobTableName
+        "SELECT propname, propval FROM " + connManager.escapeTableName(this.jobTableName)
         + " WHERE job_name = ? AND propclass = ?");
     try {
       s.setString(1, jobName);
@@ -801,5 +842,17 @@
       setV0Property(jobName, propClass, key, val);
     }
   }
+
+  private ConnManager createConnManager() {
+    SqoopOptions sqoopOptions = new SqoopOptions();
+    sqoopOptions.setConnectString(metastoreConnectStr);
+    sqoopOptions.setUsername(metastoreUser);
+    sqoopOptions.setPassword(metastorePassword);
+    JobData jd = new JobData();
+    jd.setSqoopOptions(sqoopOptions);
+    DefaultManagerFactory dmf = new DefaultManagerFactory();
+    return dmf.accept(jd);
+  }
+
 }
 
diff --git a/src/java/org/apache/sqoop/metastore/JobStorageFactory.java b/src/java/org/apache/sqoop/metastore/JobStorageFactory.java
index 2edc33b..9a348d5 100644
--- a/src/java/org/apache/sqoop/metastore/JobStorageFactory.java
+++ b/src/java/org/apache/sqoop/metastore/JobStorageFactory.java
@@ -42,8 +42,7 @@
 
   /** The default list of available JobStorage implementations. */
   private static final String DEFAULT_AVAILABLE_STORAGES =
-      "com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage,"
-      + "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage";
+      "com.cloudera.sqoop.metastore.GenericJobStorage";
 
   public JobStorageFactory(Configuration config) {
     this.conf = config;
diff --git a/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java b/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
deleted file mode 100644
index 49e3031..0000000
--- a/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.metastore.hsqldb;
-
-import java.io.File;
-import java.io.IOException;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * JobStorage implementation that auto-configures an HSQLDB
- * local-file-based instance to hold jobs.
- */
-public class AutoHsqldbStorage
-    extends com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage {
-
-  public static final Log LOG = LogFactory.getLog(
-      AutoHsqldbStorage.class.getName());
-
-  /**
-   * Configuration key specifying whether this storage agent is active.
-   * Defaults to "on" to allow zero-conf local users.
-   */
-  public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
-      "sqoop.metastore.client.enable.autoconnect";
-
-  /**
-   * Configuration key specifying the connect string used by this
-   * storage agent.
-   */
-  public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
-      "sqoop.metastore.client.autoconnect.url";
-
-  /**
-   * Configuration key specifying the username to bind with.
-   */
-  public static final String AUTO_STORAGE_USER_KEY =
-      "sqoop.metastore.client.autoconnect.username";
-
-
-  /** HSQLDB default user is named 'SA'. */
-  private static final String DEFAULT_AUTO_USER = "SA";
-
-  /**
-   * Configuration key specifying the password to bind with.
-   */
-  public static final String AUTO_STORAGE_PASS_KEY =
-      "sqoop.metastore.client.autoconnect.password";
-
-  /** HSQLDB default user has an empty password. */
-  public static final String DEFAULT_AUTO_PASSWORD = "";
-
-  @Override
-  /** {@inheritDoc} */
-  public boolean canAccept(Map<String, String> descriptor) {
-    Configuration conf = this.getConf();
-    return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
-  }
-
-  /**
-   * Determine the user's home directory and return a connect
-   * string to HSQLDB that uses ~/.sqoop/ as the storage location
-   * for the metastore database.
-   */
-  private String getHomeDirFileConnectStr() {
-    String homeDir = System.getProperty("user.home");
-
-    File homeDirObj = new File(homeDir);
-    File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
-    File databaseFileObj = new File(sqoopDataDirObj, "metastore.db");
-
-    String dbFileStr = databaseFileObj.toString();
-    return "jdbc:hsqldb:file:" + dbFileStr
-        + ";hsqldb.write_delay=false;shutdown=true";
-  }
-
-  @Override
-  /**
-   * Set the connection information to use the auto-inferred connection
-   * string.
-   */
-  public void open(Map<String, String> descriptor) throws IOException {
-    Configuration conf = getConf();
-    setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY,
-        getHomeDirFileConnectStr()));
-    setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER));
-    setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY,
-        DEFAULT_AUTO_PASSWORD));
-    setConnectedDescriptor(descriptor);
-
-    init();
-  }
-}
-
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 1564bdc..6a4dcb0 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -216,6 +216,8 @@
 
   // Arguments for the saved job management system.
   public static final String STORAGE_METASTORE_ARG = "meta-connect";
+  public static final String METASTORE_USER_ARG = "meta-username";
+  public static final String METASTORE_PASS_ARG = "meta-password";
   public static final String JOB_CMD_CREATE_ARG = "create";
   public static final String JOB_CMD_DELETE_ARG = "delete";
   public static final String JOB_CMD_EXEC_ARG = "exec";
@@ -379,6 +381,16 @@
         .withDescription("Specify JDBC connect string for the metastore")
         .withLongOpt(STORAGE_METASTORE_ARG)
         .create());
+    relatedOpts.addOption(OptionBuilder.withArgName("metastore-db-username")
+        .hasArg()
+        .withDescription("Specify the username string for the metastore")
+        .withLongOpt(METASTORE_USER_ARG)
+        .create());
+    relatedOpts.addOption(OptionBuilder.withArgName("metastore-db-password")
+        .hasArg()
+        .withDescription("Specify the password string for the metastore")
+        .withLongOpt(METASTORE_PASS_ARG)
+        .create());
 
     // Create an option-group surrounding the operations a user
     // can perform on jobs.
diff --git a/src/java/org/apache/sqoop/tool/JobTool.java b/src/java/org/apache/sqoop/tool/JobTool.java
index 054e274..dbe8934 100644
--- a/src/java/org/apache/sqoop/tool/JobTool.java
+++ b/src/java/org/apache/sqoop/tool/JobTool.java
@@ -18,12 +18,21 @@
 
 package org.apache.sqoop.tool;
 
+import static org.apache.sqoop.manager.JdbcDrivers.DB2;
+import static org.apache.sqoop.manager.JdbcDrivers.HSQLDB;
+import static org.apache.sqoop.manager.JdbcDrivers.MYSQL;
+import static org.apache.sqoop.manager.JdbcDrivers.ORACLE;
+import static org.apache.sqoop.manager.JdbcDrivers.POSTGRES;
+import static org.apache.sqoop.manager.JdbcDrivers.SQLSERVER;
+
 import java.io.IOException;
 
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.cli.CommandLine;
@@ -38,10 +47,11 @@
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
 import com.cloudera.sqoop.cli.ToolOptions;
-import com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage;
+import com.cloudera.sqoop.metastore.GenericJobStorage;
 import com.cloudera.sqoop.metastore.JobData;
 import com.cloudera.sqoop.metastore.JobStorage;
 import com.cloudera.sqoop.metastore.JobStorageFactory;
+import org.apache.sqoop.manager.JdbcDrivers;
 import org.apache.sqoop.util.LoggingUtils;
 
 /**
@@ -53,6 +63,8 @@
       JobTool.class.getName());
   private static final String DASH_STR  = "--";
 
+  private static Set<JdbcDrivers> SUPPORTED_DRIVERS = EnumSet.of(HSQLDB, MYSQL, ORACLE, POSTGRES, DB2, SQLSERVER);
+
   private enum JobOp {
     JobCreate,
     JobDelete,
@@ -345,11 +357,7 @@
 
     this.storageDescriptor = new TreeMap<String, String>();
 
-    if (in.hasOption(STORAGE_METASTORE_ARG)) {
-      this.storageDescriptor.put(HsqldbJobStorage.META_CONNECT_KEY,
-          in.getOptionValue(STORAGE_METASTORE_ARG));
-    }
-
+    applyMetastoreOptions(in, out);
     // These are generated via an option group; exactly one
     // of this exhaustive list will always be selected.
     if (in.hasOption(JOB_CMD_CREATE_ARG)) {
@@ -369,6 +377,44 @@
     }
   }
 
+  private void applyMetastoreOptions(CommandLine in, SqoopOptions out) throws InvalidOptionsException {
+    String metaConnectString;
+    String metaUsernameString;
+    String metaPasswordString;
+    if (in.hasOption(STORAGE_METASTORE_ARG)) {
+      metaConnectString = in.getOptionValue(STORAGE_METASTORE_ARG);
+      this.storageDescriptor.put(GenericJobStorage.META_DRIVER_KEY, chooseDriverType(metaConnectString));
+      this.storageDescriptor.put(GenericJobStorage.META_CONNECT_KEY, metaConnectString);
+    } else {
+      metaConnectString = out.getMetaConnectStr();
+      this.storageDescriptor.put(GenericJobStorage.META_DRIVER_KEY, chooseDriverType(metaConnectString));
+      this.storageDescriptor.put(GenericJobStorage.META_CONNECT_KEY, metaConnectString);
+    }
+    if (in.hasOption(METASTORE_USER_ARG)) {
+      metaUsernameString = in.getOptionValue(METASTORE_USER_ARG);
+      this.storageDescriptor.put(GenericJobStorage.META_USERNAME_KEY, metaUsernameString);
+    } else {
+      metaUsernameString = out.getMetaUsername();
+      this.storageDescriptor.put(GenericJobStorage.META_USERNAME_KEY, metaUsernameString);
+    }
+    if (in.hasOption(METASTORE_PASS_ARG)) {
+      metaPasswordString = in.getOptionValue(METASTORE_PASS_ARG);
+      this.storageDescriptor.put(GenericJobStorage.META_PASSWORD_KEY, metaPasswordString);
+    } else {
+      metaPasswordString = out.getMetaPassword();
+      this.storageDescriptor.put(GenericJobStorage.META_PASSWORD_KEY, metaPasswordString);
+    }
+  }
+
+  private String chooseDriverType(String metaConnectString) throws InvalidOptionsException {
+    for (JdbcDrivers driver : SUPPORTED_DRIVERS) {
+      if (metaConnectString.startsWith(driver.getSchemePrefix())) {
+        return driver.getDriverClass();
+      }
+    }
+    throw new InvalidOptionsException("current meta-connect scheme not compatible with metastore");
+  }
+
   @Override
   /** {@inheritDoc} */
   public void validateOptions(SqoopOptions options)
diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
index 52a55b7..166792b 100644
--- a/src/test/com/cloudera/sqoop/TestIncrementalImport.java
+++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
@@ -31,6 +31,7 @@
 import java.util.Arrays;
 import java.util.List;
 
+import com.cloudera.sqoop.metastore.SavedJobsTestBase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,11 +44,11 @@
 import com.cloudera.sqoop.manager.HsqldbManager;
 import com.cloudera.sqoop.manager.ManagerFactory;
 import com.cloudera.sqoop.metastore.JobData;
-import com.cloudera.sqoop.metastore.TestSavedJobs;
 import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
 import com.cloudera.sqoop.testutil.CommonArgs;
 import com.cloudera.sqoop.tool.ImportTool;
 import com.cloudera.sqoop.tool.JobTool;
+import org.apache.sqoop.metastore.GenericJobStorage;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -71,25 +72,37 @@
 
   // What database do we read from.
   public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:incremental";
+  public static final String AUTO_STORAGE_PASSWORD = "";
+  public static final String AUTO_STORAGE_USERNAME = "SA";
 
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   @Before
   public void setUp() throws Exception {
-    // Delete db state between tests.
-    TestSavedJobs.resetJobSchema();
+    // Delete db state between tests
+    System.setProperty(GenericJobStorage.AUTO_STORAGE_USER_KEY, AUTO_STORAGE_USERNAME);
+    System.setProperty(GenericJobStorage.AUTO_STORAGE_PASS_KEY, AUTO_STORAGE_PASSWORD);
+    System.setProperty(GenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
+            SOURCE_DB_URL);
     resetSourceDataSchema();
   }
 
   public static void resetSourceDataSchema() throws SQLException {
     SqoopOptions options = new SqoopOptions();
     options.setConnectString(SOURCE_DB_URL);
-    TestSavedJobs.resetSchema(options);
+    options.setUsername(AUTO_STORAGE_USERNAME);
+    options.setPassword(AUTO_STORAGE_PASSWORD);
+    SavedJobsTestBase.resetSchema(options);
   }
 
   public static Configuration newConf() {
-    return TestSavedJobs.newConf();
+    Configuration conf = new Configuration();
+    conf.set(GenericJobStorage.AUTO_STORAGE_USER_KEY, AUTO_STORAGE_USERNAME);
+    conf.set(GenericJobStorage.AUTO_STORAGE_PASS_KEY, AUTO_STORAGE_PASSWORD);
+    conf.set(GenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
+            SOURCE_DB_URL);
+    return conf;
   }
 
   /**
diff --git a/src/test/com/cloudera/sqoop/metastore/JobToolTestBase.java b/src/test/com/cloudera/sqoop/metastore/JobToolTestBase.java
new file mode 100644
index 0000000..2f46ec9
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/JobToolTestBase.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore;
+
+import static org.junit.Assert.assertEquals;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.manager.DefaultManagerFactory;
+import org.apache.sqoop.Sqoop;
+import org.apache.sqoop.tool.JobTool;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base test class for JobTool, implemented for specific database services in sub-classes
+ */
+
+public abstract class JobToolTestBase extends BaseSqoopTestCase {
+
+    public static final Log LOG = LogFactory
+            .getLog(MetaConnectIncrementalImportTestBase.class.getName());
+
+    private String metaConnectString;
+    private String metaUser;
+    private String metaPass;
+    private ConnManager cm;
+
+
+    public JobToolTestBase(String metaConnectString, String metaUser, String metaPass) {
+        this.metaConnectString = metaConnectString;
+        this.metaUser = metaUser;
+        this.metaPass = metaPass;
+    }
+
+    @Before
+    public void setUp() {
+        super.setUp();
+
+        SqoopOptions options = getSqoopOptions();
+
+        Connection conn = getConnection(options);
+
+        try {
+            Statement statement = conn.createStatement();
+            statement.execute("DROP TABLE " + cm.escapeTableName("SQOOP_ROOT"));
+            statement.execute("DROP TABLE " + cm.escapeTableName("SQOOP_SESSIONS"));
+            conn.commit();
+        } catch (Exception e) {
+            LOG.error("Failed to clear metastore database");
+        }
+        //Methods from BaseSqoopTestClass reference the test Hsqldb database, not the metastore
+        try{
+            dropTableIfExists("CarLocations");
+        } catch (SQLException e) {
+            LOG.error("Failed to drop table CarLocations");
+        }
+        setCurTableName("CarLocations");
+        createTableWithColTypesAndNames(
+                new String [] {"carId", "Locations"},
+                new String [] {"INTEGER", "VARCHAR"},
+                new String [] {"1", "'Lexus'"});
+    }
+
+    private Connection getConnection(SqoopOptions options) {
+        try {
+            com.cloudera.sqoop.metastore.JobData jd = new com.cloudera.sqoop.metastore.JobData(options, null);
+            DefaultManagerFactory dmf = new DefaultManagerFactory();
+            cm = dmf.accept(jd);
+            return cm.getConnection();
+        } catch (SQLException e) {
+            LOG.error("Failed to create a connection to the Metastore");
+            return  null;
+        }
+    }
+
+    private SqoopOptions getSqoopOptions() {
+        SqoopOptions options = new SqoopOptions();
+        options.setConnectString(metaConnectString);
+        options.setUsername(metaUser);
+        options.setPassword(metaPass);
+        return options;
+    }
+
+    @After
+    public void tearDown() {
+        super.tearDown();
+
+        try {
+            cm.close();
+        } catch (SQLException e) {
+            LOG.error("Failed to close ConnManager");
+        }
+
+    }
+
+    protected String[] getCreateJob(String metaConnectString, String metaUser, String metaPass) {
+        List<String> args = new ArrayList<>();
+        CommonArgs.addHadoopFlags(args);
+        args.add("--create");
+        args.add("testJob");
+        args.add("--meta-connect");
+        args.add(metaConnectString);
+        args.add("--meta-username");
+        args.add(metaUser);
+        args.add("--meta-password");
+        args.add(metaPass);
+        args.add("--");
+        args.add("list-tables");
+        args.add("--connect");
+        args.add(getConnectString());
+
+        return args.toArray(new String[0]);
+    }
+
+    protected String[] getExecJob(String metaConnectString, String metaUser, String metaPass) {
+        List<String> args = new ArrayList<>();
+        CommonArgs.addHadoopFlags(args);
+        args.add("--exec");
+        args.add("testJob");
+        args.add("--meta-connect");
+        args.add(metaConnectString);
+        args.add("--meta-username");
+        args.add(metaUser);
+        args.add("--meta-password");
+        args.add(metaPass);
+
+        return args.toArray(new String[0]);
+    }
+
+
+    protected String[] getDeleteJob(String metaConnectString, String metaUser, String metaPass) {
+        List<String> args = new ArrayList<>();
+        CommonArgs.addHadoopFlags(args);
+        args.add("--delete");
+        args.add("testJob");
+        args.add("--meta-connect");
+        args.add(metaConnectString);
+        args.add("--meta-username");
+        args.add(metaUser);
+        args.add("--meta-password");
+        args.add(metaPass);
+
+        return args.toArray(new String[0]);
+    }
+
+    @Test
+    public void testCreateJob() throws IOException {
+        org.apache.sqoop.tool.JobTool jobTool = new org.apache.sqoop.tool.JobTool();
+        org.apache.sqoop.Sqoop sqoop = new Sqoop(jobTool);
+        String[] args = getCreateJob(metaConnectString, metaUser, metaPass);
+        assertEquals("Error creating Sqoop Job", 0, Sqoop.runSqoop(sqoop, args));
+    }
+
+    @Test
+    public void testExecJob() throws IOException {
+        Configuration conf = new Configuration();
+        //creates the job
+        JobTool jobToolCreate = new JobTool();
+        Sqoop sqoopCreate = new Sqoop(jobToolCreate, conf);
+        String[] argsCreate = getCreateJob(metaConnectString, metaUser, metaPass);
+        Sqoop.runSqoop(sqoopCreate, argsCreate);
+        //executes the job
+        JobTool jobToolExec = new JobTool();
+        Sqoop sqoopExec = new Sqoop(jobToolExec);
+        String[] argsExec = getExecJob(metaConnectString, metaUser, metaPass);
+        assertEquals("Error executing Sqoop Job", 0, Sqoop.runSqoop(sqoopExec, argsExec));
+    }
+
+    @Test
+    public void testDeleteJob() throws IOException {
+        Configuration conf = new Configuration();
+        //Creates the job
+        JobTool jobToolCreate = new JobTool();
+        Sqoop sqoopCreate = new Sqoop(jobToolCreate, conf);
+        String[] argsCreate = getCreateJob(metaConnectString, metaUser, metaPass);
+        Sqoop.runSqoop(sqoopCreate, argsCreate);
+        //Deletes the job
+        JobTool jobToolDelete = new JobTool();
+        Sqoop sqoopExec = new Sqoop(jobToolDelete);
+        String[] argsDelete = getDeleteJob(metaConnectString, metaUser, metaPass);
+        assertEquals("Error deleting Sqoop Job", 0, Sqoop.runSqoop(sqoopExec, argsDelete));
+    }
+}
\ No newline at end of file
diff --git a/src/test/com/cloudera/sqoop/metastore/MetaConnectIncrementalImportTestBase.java b/src/test/com/cloudera/sqoop/metastore/MetaConnectIncrementalImportTestBase.java
new file mode 100644
index 0000000..587aaff
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/MetaConnectIncrementalImportTestBase.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore;
+
+import static org.junit.Assert.assertEquals;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.manager.DefaultManagerFactory;
+import org.apache.sqoop.tool.JobTool;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Base test class for Incremental Import Metastore data, implemented for specific database services in sub-classes
+ */
+
+public abstract class MetaConnectIncrementalImportTestBase extends BaseSqoopTestCase {
+
+    public static final Log LOG = LogFactory
+            .getLog(MetaConnectIncrementalImportTestBase.class.getName());
+
+    private String metaConnectString;
+    private String metaUser;
+    private String metaPass;
+
+    private Connection connMeta;
+    private ConnManager cm;
+
+    public MetaConnectIncrementalImportTestBase(String metaConnectString, String metaUser, String metaPass) {
+        this.metaConnectString = metaConnectString;
+        this.metaUser = metaUser;
+        this.metaPass = metaPass;
+    }
+
+    @Before
+    public void setUp() {
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() {
+        super.tearDown();
+    }
+
+    protected String[] getIncrementalJob(String metaConnectString, String metaUser, String metaPass) {
+        List<String> args = new ArrayList<>();
+        CommonArgs.addHadoopFlags(args);
+        args.add("--create");
+        args.add("testJob");
+        args.add("--meta-connect");
+        args.add(metaConnectString);
+        args.add("--meta-username");
+        args.add(metaUser);
+        args.add("--meta-password");
+        args.add(metaPass);
+        args.add("--");
+        args.add("import");
+        args.add("-m");
+        args.add("1");
+        args.add("--connect");
+        args.add(getConnectString());
+        args.add("--table");
+        args.add("CARLOCATIONS");
+        args.add("--incremental");
+        args.add("append");
+        args.add("--check-column");
+        args.add("CARID");
+        args.add("--last-value");
+        args.add("0");
+        args.add("--as-textfile");
+
+        return args.toArray(new String[0]);
+    }
+
+
+    protected String[] getExecJob(String metaConnectString, String metaUser, String metaPass) {
+        List<String> args = new ArrayList<>();
+        CommonArgs.addHadoopFlags(args);
+        args.add("--exec");
+        args.add("testJob");
+        args.add("--meta-connect");
+        args.add(metaConnectString);
+        args.add("--meta-username");
+        args.add(metaUser);
+        args.add("--meta-password");
+        args.add(metaPass);
+
+        return args.toArray(new String[0]);
+    }
+
+    @Test
+    public void testIncrementalJob() throws SQLException {
+        resetTable();
+
+        initMetastoreConnection();
+
+        resetMetastoreSchema();
+
+        //creates Job
+        createJob();
+
+        //Executes the import
+        execJob();
+
+        //Ensures the saveIncrementalState saved the right row
+        checkIncrementalState(1);
+
+        //Adds rows to the import table
+        Statement insertStmt = getConnection().createStatement();
+        insertStmt.executeUpdate("INSERT INTO CARLOCATIONS VALUES (2, 'lexus')");
+        getConnection().commit();
+
+        //Execute the import again
+        execJob();
+
+        //Ensures the last incremental value is updated correctly.
+        checkIncrementalState(2);
+
+        cm.close();
+    }
+
+    private void checkIncrementalState(int expected) throws SQLException {
+        Statement getSaveIncrementalState = connMeta.createStatement();
+        ResultSet lastCol = getSaveIncrementalState.executeQuery(
+                "SELECT propVal FROM " + cm.escapeTableName("SQOOP_SESSIONS") + " WHERE propname = 'incremental.last.value'");
+        lastCol.next();
+        assertEquals("Last row value differs from expected",
+                expected, lastCol.getInt("propVal"));
+    }
+
+    private void execJob() {
+        JobTool jobToolExec = new JobTool();
+        org.apache.sqoop.Sqoop sqoopExec = new org.apache.sqoop.Sqoop(jobToolExec);
+        String[] argsExec = getExecJob(metaConnectString, metaUser, metaPass);
+        assertEquals("Sqoop Job did not execute properly",
+                0, org.apache.sqoop.Sqoop.runSqoop(sqoopExec, argsExec));
+    }
+
+    private void createJob() {
+        Configuration conf = new Configuration();
+        conf.set(org.apache.sqoop.SqoopOptions.METASTORE_PASSWORD_KEY, "true");
+        JobTool jobToolCreate = new JobTool();
+        org.apache.sqoop.Sqoop sqoopCreate = new org.apache.sqoop.Sqoop(jobToolCreate, conf);
+        String[] argsCreate = getIncrementalJob(metaConnectString, metaUser, metaPass);
+        org.apache.sqoop.Sqoop.runSqoop(sqoopCreate, argsCreate);
+    }
+
+    private void resetTable() throws SQLException {
+        //Resets the target table
+        dropTableIfExists("CARLOCATIONS");
+        setCurTableName("CARLOCATIONS");
+        createTableWithColTypesAndNames(
+                new String [] {"CARID", "LOCATIONS"},
+                new String [] {"INTEGER", "VARCHAR"},
+                new String [] {"1", "'Lexus'"});
+    }
+
+    private void resetMetastoreSchema() {
+        try {
+            //Resets the metastore schema
+            Statement metastoreStatement = connMeta.createStatement();
+            metastoreStatement.execute("DROP TABLE " + cm.escapeTableName("SQOOP_ROOT"));
+            metastoreStatement.execute("DROP TABLE " + cm.escapeTableName("SQOOP_SESSIONS"));
+            connMeta.commit();
+        }
+        catch (Exception e) {
+            LOG.error( e.getLocalizedMessage() );
+        }
+    }
+
+    private void initMetastoreConnection() throws SQLException{
+        SqoopOptions options = new SqoopOptions();
+        options.setConnectString(metaConnectString);
+        options.setUsername(metaUser);
+        options.setPassword(metaPass);
+        com.cloudera.sqoop.metastore.JobData jd =
+                new com.cloudera.sqoop.metastore.JobData(options, new JobTool());
+        DefaultManagerFactory dmf = new DefaultManagerFactory();
+        cm = dmf.accept(jd);
+        connMeta= cm.getConnection();
+    }
+}
\ No newline at end of file
diff --git a/src/test/com/cloudera/sqoop/metastore/SavedJobsTestBase.java b/src/test/com/cloudera/sqoop/metastore/SavedJobsTestBase.java
new file mode 100644
index 0000000..81789e7
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/SavedJobsTestBase.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore;
+
+import static org.apache.sqoop.metastore.GenericJobStorage.META_CONNECT_KEY;
+import static org.apache.sqoop.metastore.GenericJobStorage.META_DRIVER_KEY;
+import static org.apache.sqoop.metastore.GenericJobStorage.META_PASSWORD_KEY;
+import static org.apache.sqoop.metastore.GenericJobStorage.META_USERNAME_KEY;
+
+import static org.hamcrest.core.IsCollectionContaining.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.tool.VersionTool;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.manager.DefaultManagerFactory;
+import org.apache.sqoop.tool.ImportTool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Test the metastore and job-handling features,
+ * implemented for specific database services in sub-classes.
+ */
+public abstract class SavedJobsTestBase {
+
+  public static final String TEST_JOB = "testJob";
+  public static final String TEST_TABLE_NAME = "abcd";
+  public static final String TEST_TABLE_NAME_2 = "efgh";
+  public static final String TEST_JOB_2 = "testJob2";
+  public static final String TEST_JOB_3 = "testJob3";
+  public static final String TEST_TABLE_NAME_3 = "ijkl";
+  private String metaConnect;
+  private String metaUser;
+  private String metaPassword;
+  private String driverClass;
+  private JobStorage storage;
+
+  private Configuration conf;
+  private Map<String, String> descriptor;
+
+  public SavedJobsTestBase(String metaConnect, String metaUser, String metaPassword, String driverClass){
+    this.metaConnect = metaConnect;
+    this.metaUser = metaUser;
+    this.metaPassword = metaPassword;
+    this.driverClass = driverClass;
+    this.descriptor = new TreeMap<>();
+  }
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setUp() throws Exception {
+    // Delete db state between tests.
+    resetJobSchema();
+    conf = newConf();
+
+    descriptor.put(META_CONNECT_KEY, metaConnect);
+    descriptor.put(META_USERNAME_KEY, metaUser);
+    descriptor.put(META_PASSWORD_KEY, metaPassword);
+    descriptor.put(META_DRIVER_KEY, driverClass);
+
+    JobStorageFactory ssf = new JobStorageFactory(conf);
+    storage = ssf.getJobStorage(descriptor);
+    storage.open(descriptor);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    descriptor.clear();
+    storage.close();
+  }
+
+  public void resetJobSchema()
+          throws SQLException {
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(metaConnect);
+    options.setUsername(metaUser);
+    options.setPassword(metaPassword);
+    options.setDriverClassName(driverClass);
+
+    resetSchema(options);
+  }
+
+  /**
+   * Drop all tables in the configured HSQLDB-based schema/user/pass.
+   */
+  public static void resetSchema(SqoopOptions options) throws SQLException {
+    JobData jd = new JobData();
+    jd.setSqoopOptions(options);
+    DefaultManagerFactory dmf = new DefaultManagerFactory();
+    ConnManager manager = dmf.accept(jd);
+    Connection c = manager.getConnection();
+    Statement s = c.createStatement();
+    try {
+      String [] tables = manager.listTables();
+      for (String table : tables) {
+        if(table.equals("SQOOP_ROOT") || table.equals("SQOOP_SESSIONS")){
+          s.execute("DROP TABLE " + manager.escapeTableName(table));
+        }
+      }
+
+      c.commit();
+    } finally {
+      s.close();
+    }
+  }
+
+  public Configuration newConf() {
+    Configuration conf = new Configuration();
+    conf.set(META_CONNECT_KEY, metaConnect);
+    conf.set(META_USERNAME_KEY, metaUser);
+    conf.set(META_PASSWORD_KEY, metaPassword);
+    conf.set(META_DRIVER_KEY, driverClass);
+
+    return conf;
+  }
+
+  @Test
+  public void testReadJobDoesExistPasses() throws Exception{
+    storage.create(TEST_JOB, createTestJobData(TEST_TABLE_NAME));
+
+    assertEquals("Read did not return job data correctly",
+            storage.read(TEST_JOB).getSqoopOptions().getTableName(),
+            TEST_TABLE_NAME);
+  }
+
+  @Test
+  public void testUpdateJob() throws  Exception {
+    storage.create(TEST_JOB, createTestJobData(TEST_TABLE_NAME));
+
+    storage.update(TEST_JOB, createTestJobData(TEST_TABLE_NAME_2) );
+
+    assertEquals("Update did not change data correctly",
+            storage.read(TEST_JOB).getSqoopOptions().getTableName(),
+            TEST_TABLE_NAME_2);
+  }
+
+  @Test
+  public void testList() throws IOException {
+    storage.create(TEST_JOB, createTestJobData(TEST_TABLE_NAME));
+    storage.create(TEST_JOB_2, createTestJobData(TEST_TABLE_NAME_2));
+    storage.create(TEST_JOB_3, createTestJobData(TEST_TABLE_NAME_3));
+
+    assertThat("List did not return correct job data",
+            storage.list(), hasItems(TEST_JOB, TEST_JOB_2, TEST_JOB_3));
+  }
+
+  @Test
+  public void testCreateSameJob() throws IOException {
+
+    // Job list should start out empty.
+    List<String> jobs = storage.list();
+    assertEquals("Job list should start out empty", 0, jobs.size());
+
+    // Create a job that displays the version.
+    JobData data = new JobData(new SqoopOptions(), new VersionTool());
+    storage.create(TEST_JOB, data);
+
+    jobs = storage.list();
+    assertEquals("Test Job not created correctly",1, jobs.size());
+    assertEquals("Test Job data not returned correctly", TEST_JOB, jobs.get(0));
+
+    try {
+      // Try to create that same job name again. This should fail.
+      thrown.expect(IOException.class);
+      thrown.reportMissingExceptionWithMessage("Expected IOException since job already exists");
+      storage.create(TEST_JOB, data);
+    } finally {
+      jobs = storage.list();
+      assertEquals("Incorrect number of jobs present",1, jobs.size());
+
+      // Restore our job, check that it exists.
+      JobData outData = storage.read(TEST_JOB);
+      assertEquals("Test job does not exist", new VersionTool().getToolName(),
+          outData.getSqoopTool().getToolName());
+    }
+  }
+
+  @Test
+  public void testDeleteJob() throws IOException {
+    // Job list should start out empty.
+    List<String> jobs = storage.list();
+    assertEquals("Job List should start out empty", 0, jobs.size());
+
+    // Create a job that displays the version.
+    JobData data = new JobData(new SqoopOptions(), new VersionTool());
+    storage.create(TEST_JOB, data);
+
+    jobs = storage.list();
+    assertEquals("Incorrect number of jobs present",1, jobs.size());
+    assertEquals("Test Job created incorrectly", TEST_JOB, jobs.get(0));
+
+    // Now delete the job.
+    storage.delete(TEST_JOB);
+
+    // After delete, we should have no jobs.
+    jobs = storage.list();
+    assertEquals("Job was not deleted correctly", 0, jobs.size());
+  }
+
+  @Test
+  public void testRestoreNonExistingJob() throws IOException {
+      // Try to restore a job that doesn't exist. Watch it fail.
+      thrown.expect(IOException.class);
+      thrown.reportMissingExceptionWithMessage("Expected IOException since job doesn't exist");
+      storage.read("DoesNotExist");
+  }
+
+  @Test
+  public void testCreateJobWithExtraArgs() throws IOException {
+
+        // Job list should start out empty.
+        List<String> jobs = storage.list();
+        assertEquals("Job list should start out empty", 0, jobs.size());
+
+        // Create a job with extra args
+        com.cloudera.sqoop.SqoopOptions opts = new SqoopOptions();
+        String[] args = {"-schema", "test"};
+        opts.setExtraArgs(args);
+        JobData data = new JobData(opts, new VersionTool());
+        storage.create(TEST_JOB, data);
+
+        jobs = storage.list();
+        assertEquals("Incorrect number of jobs", 1, jobs.size());
+        assertEquals("Job not created properly", TEST_JOB, jobs.get(0));
+
+        // Restore our job, check that it exists.
+        JobData outData = storage.read(TEST_JOB);
+        assertEquals("Incorrect Tool in Test Job",
+                new VersionTool().getToolName(),
+                outData.getSqoopTool().getToolName());
+
+        String[] storedArgs = outData.getSqoopOptions().getExtraArgs();
+        for(int index = 0; index < args.length; ++index) {
+            assertEquals(args[index], storedArgs[index]);
+        }
+
+        // Now delete the job.
+        storage.delete(TEST_JOB);
+    }
+
+  @Test
+  public void testMultiConnections() throws IOException {
+
+    // Job list should start out empty.
+    List<String> jobs = storage.list();
+    assertEquals("Job list should start out empty", 0, jobs.size());
+
+    // Create a job that displays the version.
+    JobData data = new JobData(new SqoopOptions(), new VersionTool());
+    storage.create(TEST_JOB, data);
+
+    jobs = storage.list();
+    assertEquals("Incorrect number of jobs", 1, jobs.size());
+    assertEquals("Job not created correctly", TEST_JOB, jobs.get(0));
+
+    storage.close(); // Close the existing connection
+
+    // Now re-open the storage.
+    storage.open(descriptor);
+
+    jobs = storage.list();
+    assertEquals("Test Job did not persist through re-open", 1, jobs.size());
+    assertEquals("Job data not correct after re-open", TEST_JOB, jobs.get(0));
+
+    // Restore our job, check that it exists.
+    JobData outData = storage.read(TEST_JOB);
+    assertEquals("Incorrect Tool in Test Job",
+            new VersionTool().getToolName(),
+            outData.getSqoopTool().getToolName());
+  }
+
+  private com.cloudera.sqoop.metastore.JobData createTestJobData(String setTableName) throws IOException {
+    SqoopOptions testOpts = new SqoopOptions();
+    testOpts.setTableName(setTableName);
+    ImportTool testTool = new ImportTool();
+    return new com.cloudera.sqoop.metastore.JobData(testOpts,testTool);
+
+  }
+}
\ No newline at end of file
diff --git a/src/test/com/cloudera/sqoop/metastore/TestSavedJobs.java b/src/test/com/cloudera/sqoop/metastore/TestSavedJobs.java
deleted file mode 100644
index 61d8c97..0000000
--- a/src/test/com/cloudera/sqoop/metastore/TestSavedJobs.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.sqoop.metastore;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.manager.HsqldbManager;
-import com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage;
-import com.cloudera.sqoop.tool.VersionTool;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.io.IOException;
-import java.sql.Connection;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test the metastore and job-handling features.
- *
- * These all make use of the auto-connect hsqldb-based metastore.
- * The metastore URL is configured to be in-memory, and drop all
- * state between individual tests.
- */
-public class TestSavedJobs {
-
-  public static final String TEST_AUTOCONNECT_URL =
-      "jdbc:hsqldb:mem:sqoopmetastore";
-  public static final String TEST_AUTOCONNECT_USER = "SA";
-  public static final String TEST_AUTOCONNECT_PASS = "";
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Before
-  public void setUp() throws Exception {
-    // Delete db state between tests.
-    resetJobSchema();
-  }
-
-  public static void resetJobSchema() throws SQLException {
-    SqoopOptions options = new SqoopOptions();
-    options.setConnectString(TEST_AUTOCONNECT_URL);
-    options.setUsername(TEST_AUTOCONNECT_USER);
-    options.setPassword(TEST_AUTOCONNECT_PASS);
-
-    resetSchema(options);
-  }
-
-  /**
-   * Drop all tables in the configured HSQLDB-based schema/user/pass.
-   */
-  public static void resetSchema(SqoopOptions options) throws SQLException {
-    HsqldbManager manager = new HsqldbManager(options);
-    Connection c = manager.getConnection();
-    Statement s = c.createStatement();
-    try {
-      String [] tables = manager.listTables();
-      for (String table : tables) {
-        s.executeUpdate("DROP TABLE " + manager.escapeTableName(table));
-      }
-
-      c.commit();
-    } finally {
-      s.close();
-    }
-  }
-
-  public static Configuration newConf() {
-    Configuration conf = new Configuration();
-    conf.set(AutoHsqldbStorage.AUTO_STORAGE_USER_KEY, TEST_AUTOCONNECT_USER);
-    conf.set(AutoHsqldbStorage.AUTO_STORAGE_PASS_KEY, TEST_AUTOCONNECT_PASS);
-    conf.set(AutoHsqldbStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
-        TEST_AUTOCONNECT_URL);
-
-    return conf;
-  }
-
-  @Test
-  public void testAutoConnect() throws IOException {
-    // By default, we should be able to auto-connect with an
-    // empty connection descriptor. We should see an empty
-    // job set.
-
-    Configuration conf = newConf();
-    JobStorageFactory ssf = new JobStorageFactory(conf);
-
-    Map<String, String> descriptor = new TreeMap<String, String>();
-    JobStorage storage = ssf.getJobStorage(descriptor);
-
-    storage.open(descriptor);
-    List<String> jobs = storage.list();
-    assertEquals(0, jobs.size());
-    storage.close();
-  }
-
-  @Test
-  public void testCreateSameJob() throws IOException {
-    Configuration conf = newConf();
-    JobStorageFactory ssf = new JobStorageFactory(conf);
-
-    Map<String, String> descriptor = new TreeMap<String, String>();
-    JobStorage storage = ssf.getJobStorage(descriptor);
-
-    storage.open(descriptor);
-
-    // Job list should start out empty.
-    List<String> jobs = storage.list();
-    assertEquals(0, jobs.size());
-
-    // Create a job that displays the version.
-    JobData data = new JobData(new SqoopOptions(), new VersionTool());
-    storage.create("versionJob", data);
-
-    jobs = storage.list();
-    assertEquals(1, jobs.size());
-    assertEquals("versionJob", jobs.get(0));
-
-    try {
-      // Try to create that same job name again. This should fail.
-      thrown.expect(IOException.class);
-      thrown.reportMissingExceptionWithMessage("Expected IOException since job already exists");
-      storage.create("versionJob", data);
-    } finally {
-      jobs = storage.list();
-      assertEquals(1, jobs.size());
-
-      // Restore our job, check that it exists.
-      JobData outData = storage.read("versionJob");
-      assertEquals(new VersionTool().getToolName(),
-          outData.getSqoopTool().getToolName());
-
-      storage.close();
-    }
-  }
-
-  @Test
-  public void testDeleteJob() throws IOException {
-    Configuration conf = newConf();
-    JobStorageFactory ssf = new JobStorageFactory(conf);
-
-    Map<String, String> descriptor = new TreeMap<String, String>();
-    JobStorage storage = ssf.getJobStorage(descriptor);
-
-    storage.open(descriptor);
-
-    // Job list should start out empty.
-    List<String> jobs = storage.list();
-    assertEquals(0, jobs.size());
-
-    // Create a job that displays the version.
-    JobData data = new JobData(new SqoopOptions(), new VersionTool());
-    storage.create("versionJob", data);
-
-    jobs = storage.list();
-    assertEquals(1, jobs.size());
-    assertEquals("versionJob", jobs.get(0));
-
-    // Now delete the job.
-    storage.delete("versionJob");
-
-    // After delete, we should have no jobs.
-    jobs = storage.list();
-    assertEquals(0, jobs.size());
-
-    storage.close();
-  }
-
-  @Test
-  public void testRestoreNonExistingJob() throws IOException {
-    Configuration conf = newConf();
-    JobStorageFactory ssf = new JobStorageFactory(conf);
-
-    Map<String, String> descriptor = new TreeMap<String, String>();
-    JobStorage storage = ssf.getJobStorage(descriptor);
-
-    storage.open(descriptor);
-
-    try {
-      // Try to restore a job that doesn't exist. Watch it fail.
-      thrown.expect(IOException.class);
-      thrown.reportMissingExceptionWithMessage("Expected IOException since job doesn't exist");
-      storage.read("DoesNotExist");
-    } finally {
-      storage.close();
-    }
-  }
-
-  @Test
-    public void testCreateJobWithExtraArgs() throws IOException {
-        Configuration conf = newConf();
-        JobStorageFactory ssf = new JobStorageFactory(conf);
-
-        Map<String, String> descriptor = new TreeMap<String, String>();
-        JobStorage storage = ssf.getJobStorage(descriptor);
-
-        storage.open(descriptor);
-
-        // Job list should start out empty.
-        List<String> jobs = storage.list();
-        assertEquals(0, jobs.size());
-
-        // Create a job with extra args
-        com.cloudera.sqoop.SqoopOptions opts = new SqoopOptions();
-        String[] args = {"-schema", "test"};
-        opts.setExtraArgs(args);
-        JobData data = new JobData(opts, new VersionTool());
-        storage.create("versionJob", data);
-
-        jobs = storage.list();
-        assertEquals(1, jobs.size());
-        assertEquals("versionJob", jobs.get(0));
-
-        // Restore our job, check that it exists.
-        JobData outData = storage.read("versionJob");
-        assertEquals(new VersionTool().getToolName(),
-                outData.getSqoopTool().getToolName());
-
-        String[] storedArgs = outData.getSqoopOptions().getExtraArgs();
-        for(int index = 0; index < args.length; ++index) {
-            assertEquals(args[index], storedArgs[index]);
-        }
-
-        // Now delete the job.
-        storage.delete("versionJob");
-
-        storage.close();
-    }
-
-  @Test
-  public void testMultiConnections() throws IOException {
-    // Ensure that a job can be retrieved when the storage is
-    // closed and reopened.
-
-    Configuration conf = newConf();
-    JobStorageFactory ssf = new JobStorageFactory(conf);
-
-    Map<String, String> descriptor = new TreeMap<String, String>();
-    JobStorage storage = ssf.getJobStorage(descriptor);
-
-    storage.open(descriptor);
-
-    // Job list should start out empty.
-    List<String> jobs = storage.list();
-    assertEquals(0, jobs.size());
-
-    // Create a job that displays the version.
-    JobData data = new JobData(new SqoopOptions(), new VersionTool());
-    storage.create("versionJob", data);
-
-    jobs = storage.list();
-    assertEquals(1, jobs.size());
-    assertEquals("versionJob", jobs.get(0));
-
-    storage.close(); // Close the existing connection
-
-    // Now re-open the storage.
-    ssf = new JobStorageFactory(newConf());
-    storage = ssf.getJobStorage(descriptor);
-    storage.open(descriptor);
-
-    jobs = storage.list();
-    assertEquals(1, jobs.size());
-    assertEquals("versionJob", jobs.get(0));
-
-    // Restore our job, check that it exists.
-    JobData outData = storage.read("versionJob");
-    assertEquals(new VersionTool().getToolName(),
-        outData.getSqoopTool().getToolName());
-
-    storage.close();
-  }
-
-}
-
diff --git a/src/test/com/cloudera/sqoop/metastore/db2/DB2JobToolTest.java b/src/test/com/cloudera/sqoop/metastore/db2/DB2JobToolTest.java
new file mode 100644
index 0000000..b92d36a
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/db2/DB2JobToolTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.db2;
+
+import com.cloudera.sqoop.metastore.JobToolTestBase;
+
+/**
+ * Test that the Job Tool works in DB2
+ *
+ * This uses JDBC to store and retrieve metastore data from a DB2 server
+ *
+ * Since this requires a DB2 installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=DB2JobToolTest or -Dthirdparty=true.
+ *
+ * You need to put DB2 JDBC driver library (db2jcc4.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running DB2 database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.db2.connectstring.host_url, -Dsqoop.test.db2.connectstring.database,
+ *   -Dsqoop.test.db2.connectstring.username and -Dsqoop.test.db2.connectstring.password respectively
+ */
+
+public class DB2JobToolTest extends JobToolTestBase {
+
+    private static final String HOST_URL = System.getProperty(
+        "sqoop.test.db2.connectstring.host_url",
+        "jdbc:db2://db2host:50000");
+
+    private static final String DATABASE_NAME = System.getProperty(
+        "sqoop.test.db2.connectstring.database",
+        "SQOOP");
+    private static final String DATABASE_USER = System.getProperty(
+        "sqoop.test.db2.connectstring.username",
+        "SQOOP");
+    private static final String DATABASE_PASSWORD = System.getProperty(
+        "sqoop.test.db2.connectstring.password",
+        "SQOOP");
+    private static final String CONNECT_STRING = HOST_URL
+        + "/" + DATABASE_NAME
+        + ":currentSchema=" + DATABASE_USER +";";
+
+    public DB2JobToolTest() {
+        super(CONNECT_STRING, DATABASE_USER, DATABASE_PASSWORD);
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/db2/DB2MetaConnectIncrementalImportTest.java b/src/test/com/cloudera/sqoop/metastore/db2/DB2MetaConnectIncrementalImportTest.java
new file mode 100644
index 0000000..c1ae70c
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/db2/DB2MetaConnectIncrementalImportTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.db2;
+
+import com.cloudera.sqoop.metastore.MetaConnectIncrementalImportTestBase;
+
+/**
+ * Test that Incremental-Import values are stored correctly in DB2
+ *
+ * This uses JDBC to store and retrieve metastore data from a DB2 server
+ *
+ * Since this requires a DB2 installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=DB2MetaConnectIncrementalImportTest or -Dthirdparty=true.
+ *
+ * You need to put DB2 JDBC driver library (db2jcc4.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running DB2 database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.db2.connectstring.host_url, -Dsqoop.test.db2.connectstring.database,
+ *   -Dsqoop.test.db2.connectstring.username and -Dsqoop.test.db2.connectstring.password respectively
+ */
+
+public class DB2MetaConnectIncrementalImportTest extends MetaConnectIncrementalImportTestBase {
+
+    private static final String HOST_URL = System.getProperty(
+        "sqoop.test.db2.connectstring.host_url",
+        "jdbc:db2://db2host:50000");
+
+    private static final String DATABASE_NAME = System.getProperty(
+        "sqoop.test.db2.connectstring.database",
+        "SQOOP");
+    private static final String DATABASE_USER = System.getProperty(
+        "sqoop.test.db2.connectstring.username",
+        "SQOOP");
+    private static final String DATABASE_PASSWORD = System.getProperty(
+        "sqoop.test.db2.connectstring.password",
+        "SQOOP");
+    private static final String CONNECT_STRING = HOST_URL
+        + "/" + DATABASE_NAME
+        + ":currentSchema=" + DATABASE_USER +";";
+
+    public DB2MetaConnectIncrementalImportTest() {
+        super(CONNECT_STRING, DATABASE_USER, DATABASE_PASSWORD);
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/db2/DB2SavedJobsTest.java b/src/test/com/cloudera/sqoop/metastore/db2/DB2SavedJobsTest.java
new file mode 100644
index 0000000..efeef62
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/db2/DB2SavedJobsTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.db2;
+
+import com.cloudera.sqoop.metastore.SavedJobsTestBase;
+import org.apache.sqoop.manager.JdbcDrivers;
+
+/**
+ * Test of GenericJobStorage compatibility with DB2
+ *
+ * This uses JDBC to store and retrieve metastore data from a DB2 server
+ *
+ * Since this requires a DB2 installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=DB2SavedJobsTest or -Dthirdparty=true.
+ *
+ * You need to put DB2 JDBC driver library (db2jcc4.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running DB2 database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.db2.connectstring.host_url, -Dsqoop.test.db2.connectstring.database,
+ *   -Dsqoop.test.db2.connectstring.username and -Dsqoop.test.db2.connectstring.password respectively
+ */
+
+public class DB2SavedJobsTest extends SavedJobsTestBase {
+
+    private static final String HOST_URL = System.getProperty(
+        "sqoop.test.db2.connectstring.host_url",
+        "jdbc:db2://db2host:50000");
+
+    private static final String DATABASE_NAME = System.getProperty(
+        "sqoop.test.db2.connectstring.database",
+        "SQOOP");
+    private static final String DATABASE_USER = System.getProperty(
+        "sqoop.test.db2.connectstring.username",
+        "SQOOP");
+    private static final String DATABASE_PASSWORD = System.getProperty(
+        "sqoop.test.db2.connectstring.password",
+        "SQOOP");
+    private static final String CONNECT_STRING = HOST_URL
+        + "/" + DATABASE_NAME
+        + ":currentSchema=" + DATABASE_USER +";";
+
+    public DB2SavedJobsTest() {
+        super(CONNECT_STRING, DATABASE_USER, DATABASE_PASSWORD, JdbcDrivers.DB2.getDriverClass());
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobToolTest.java b/src/test/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobToolTest.java
new file mode 100644
index 0000000..07eefee
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobToolTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.hsqldb;
+
+import com.cloudera.sqoop.metastore.JobToolTestBase;
+
+/**
+ * Test that the Job Tool works in Hsqldb
+ *
+ * This class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=HsqldbJobToolTest or -Dthirdparty=true.
+ *
+ * This uses JDBC to store and retrieve metastore data from a local Hsqldb server
+ */
+
+public class HsqldbJobToolTest extends JobToolTestBase {
+
+    public HsqldbJobToolTest() {
+        super( "jdbc:hsqldb:mem:sqoopmetastore", "SA" , "");
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaConnectIncrementalImportTest.java b/src/test/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaConnectIncrementalImportTest.java
new file mode 100644
index 0000000..d302bfb
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/hsqldb/HsqldbMetaConnectIncrementalImportTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.hsqldb;
+
+import com.cloudera.sqoop.metastore.MetaConnectIncrementalImportTestBase;
+
+/**
+ * Test that Incremental-Import values are stored correctly in Hsqldb
+ *
+ * This class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=HsqldbMetaConnectIncrementalImportTest or -Dthirdparty=true.
+ *
+ * This uses JDBC to store and retrieve metastore data from a local Hsqldb server
+ */
+
+public class HsqldbMetaConnectIncrementalImportTest extends MetaConnectIncrementalImportTestBase {
+
+    public HsqldbMetaConnectIncrementalImportTest() {
+        super( "jdbc:hsqldb:mem:sqoopmetastore", "SA" , "");
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/hsqldb/HsqldbSavedJobsTest.java b/src/test/com/cloudera/sqoop/metastore/hsqldb/HsqldbSavedJobsTest.java
new file mode 100644
index 0000000..398f1a0
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/hsqldb/HsqldbSavedJobsTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.hsqldb;
+
+import com.cloudera.sqoop.metastore.SavedJobsTestBase;
+import org.apache.sqoop.manager.JdbcDrivers;
+
+/**
+ * Test of GenericJobStorage compatibility with Hsqldb
+ *
+ * This class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=HsqldbSavedJobsTest or -Dthirdparty=true.
+ *
+ * This uses JDBC to store and retrieve metastore data from a local Hsqldb server
+ */
+
+public class HsqldbSavedJobsTest extends SavedJobsTestBase {
+
+    public HsqldbSavedJobsTest() {
+        super("jdbc:hsqldb:mem:sqoopmetastore",
+                "SA" , "", JdbcDrivers.HSQLDB.getDriverClass());
+    }
+}
\ No newline at end of file
diff --git a/src/test/com/cloudera/sqoop/metastore/mysql/MySqlJobToolTest.java b/src/test/com/cloudera/sqoop/metastore/mysql/MySqlJobToolTest.java
new file mode 100644
index 0000000..6a6bae4
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/mysql/MySqlJobToolTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.mysql;
+
+import com.cloudera.sqoop.manager.MySQLTestUtils;
+import com.cloudera.sqoop.metastore.JobToolTestBase;
+
+/**
+ * Test that the Job Tool works in MySql
+ *
+ * This uses JDBC to store and retrieve metastore data from a MySql server
+ *
+ * Since this requires a MySql installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=MySqlJobToolTest or -Dthirdparty=true.
+ *
+ * You need to put MySql JDBC driver library (mysql-connector-java-5.1.38-bin.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running MySql database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.mysql.connectstring.host_url, -Dsqoop.test.mysql.databasename,
+ *   -Dsqoop.test.mysql.username and -Dsqoop.test.mysql.password respectively
+ */
+
+public class MySqlJobToolTest extends JobToolTestBase {
+
+    private static MySQLTestUtils mySQLTestUtils = new MySQLTestUtils();
+
+    public MySqlJobToolTest() {
+        super(mySQLTestUtils.getMySqlConnectString(), mySQLTestUtils.getUserName(),
+                mySQLTestUtils.getUserPass());
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/mysql/MySqlMetaConnectIncrementalImportTest.java b/src/test/com/cloudera/sqoop/metastore/mysql/MySqlMetaConnectIncrementalImportTest.java
new file mode 100644
index 0000000..3a97cfd
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/mysql/MySqlMetaConnectIncrementalImportTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.mysql;
+
+
+import com.cloudera.sqoop.manager.MySQLTestUtils;
+import com.cloudera.sqoop.metastore.MetaConnectIncrementalImportTestBase;
+
+/**
+ * Test that Incremental-Import values are stored correctly in MySql
+ *
+ * This uses JDBC to store and retrieve metastore data from a MySql server
+ *
+ * Since this requires a DB2 installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=MySqlMetaConnectIncrementalImportTest or -Dthirdparty=true.
+ *
+ * You need to put MySql JDBC driver library (mysql-connector-java-5.1.38-bin.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running MySql database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.mysql.connectstring.host_url, -Dsqoop.test.mysql.databasename,
+ *   -Dsqoop.test.mysql.username and -Dsqoop.test.mysql.password respectively
+ */
+
+public class MySqlMetaConnectIncrementalImportTest extends MetaConnectIncrementalImportTestBase {
+
+    private static MySQLTestUtils mySQLTestUtils = new MySQLTestUtils();
+
+    public MySqlMetaConnectIncrementalImportTest() {
+        super(mySQLTestUtils.getMySqlConnectString(), mySQLTestUtils.getUserName(),
+                mySQLTestUtils.getUserPass());
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/mysql/MySqlSavedJobsTest.java b/src/test/com/cloudera/sqoop/metastore/mysql/MySqlSavedJobsTest.java
new file mode 100644
index 0000000..febb7da
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/mysql/MySqlSavedJobsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.mysql;
+
+import com.cloudera.sqoop.manager.MySQLTestUtils;
+import com.cloudera.sqoop.metastore.SavedJobsTestBase;
+import org.apache.sqoop.manager.JdbcDrivers;
+
+/**
+ * Test of GenericJobStorage compatibility with MySql
+ *
+ * This uses JDBC to store and retrieve metastore data from a MySql server
+ *
+ * Since this requires a MySql installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=MySqlSavedJobsTest or -Dthirdparty=true.
+ *
+ * You need to put MySql JDBC driver library (mysql-connector-java-5.1.38-bin.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running MySql database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.mysql.connectstring.host_url, -Dsqoop.test.mysql.databasename,
+ *   -Dsqoop.test.mysql.username and -Dsqoop.test.mysql.password respectively
+ */
+
+public class MySqlSavedJobsTest extends SavedJobsTestBase {
+
+    private static MySQLTestUtils mySQLTestUtils = new MySQLTestUtils();
+
+    public MySqlSavedJobsTest() {
+        super(mySQLTestUtils.getMySqlConnectString(), mySQLTestUtils.getUserName(),
+                mySQLTestUtils.getUserPass(), JdbcDrivers.MYSQL.getDriverClass());
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/oracle/OracleJobToolTest.java b/src/test/com/cloudera/sqoop/metastore/oracle/OracleJobToolTest.java
new file mode 100644
index 0000000..4891b00
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/oracle/OracleJobToolTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.oracle;
+
+import com.cloudera.sqoop.manager.OracleUtils;
+import com.cloudera.sqoop.metastore.JobToolTestBase;
+
+/**
+ * Test that the Job Tool works in Oracle
+ *
+ * This uses JDBC to store and retrieve metastore data from an Oracle server
+ *
+ * Since this requires an Oracle installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=OracleJobToolTest or -Dthirdparty=true.
+ *
+ * You need to put Oracle JDBC driver library (ojdbc6.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running Oracle database,
+ *   Set server URL, username, and password with system variables
+ *   -Dsqoop.test.oracle.connectstring, -Dsqoop.test.oracle.username
+ *   and -Dsqoop.test.oracle.password respectively
+ */
+
+public class OracleJobToolTest extends JobToolTestBase {
+
+    public OracleJobToolTest() {
+        super(OracleUtils.CONNECT_STRING,
+                OracleUtils.ORACLE_USER_NAME,
+                OracleUtils.ORACLE_USER_PASS);
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/oracle/OracleMetaConnectIncrementalImportTest.java b/src/test/com/cloudera/sqoop/metastore/oracle/OracleMetaConnectIncrementalImportTest.java
new file mode 100644
index 0000000..f916a13
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/oracle/OracleMetaConnectIncrementalImportTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.oracle;
+
+import com.cloudera.sqoop.manager.OracleUtils;
+import com.cloudera.sqoop.metastore.MetaConnectIncrementalImportTestBase;
+
+/**
+ * Test that Incremental-Import values are stored correctly in Oracle
+ *
+ * This uses JDBC to store and retrieve metastore data from an Oracle server
+ *
+ * Since this requires an Oracle installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=OracleMetaConnectIncrementalImportTest or -Dthirdparty=true.
+ *
+ * You need to put Oracle JDBC driver library (ojdbc6.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running Oracle database,
+ *   Set server URL, username, and password with system variables
+ *   -Dsqoop.test.oracle.connectstring, -Dsqoop.test.oracle.username
+ *   and -Dsqoop.test.oracle.password respectively
+ */
+
+public class OracleMetaConnectIncrementalImportTest extends MetaConnectIncrementalImportTestBase {
+
+    public OracleMetaConnectIncrementalImportTest() {
+        super(OracleUtils.CONNECT_STRING,
+                OracleUtils.ORACLE_USER_NAME,
+                OracleUtils.ORACLE_USER_PASS);
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/oracle/OracleSavedJobsTest.java b/src/test/com/cloudera/sqoop/metastore/oracle/OracleSavedJobsTest.java
new file mode 100644
index 0000000..0f487d1
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/oracle/OracleSavedJobsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.oracle;
+
+import com.cloudera.sqoop.manager.OracleUtils;
+import com.cloudera.sqoop.metastore.SavedJobsTestBase;
+import org.apache.sqoop.manager.JdbcDrivers;
+
+/**
+ * Test of GenericJobStorage compatibility with Oracle
+ *
+ * This uses JDBC to store and retrieve metastore data from an Oracle server
+ *
+ * Since this requires an Oracle installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=OracleSavedJobsTest or -Dthirdparty=true.
+ *
+ * You need to put Oracle JDBC driver library (ojdbc6.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running Oracle database,
+ *   Set server URL, username, and password with system variables
+ *   -Dsqoop.test.oracle.connectstring, -Dsqoop.test.oracle.username
+ *   and -Dsqoop.test.oracle.password respectively
+ */
+
+public class OracleSavedJobsTest extends SavedJobsTestBase {
+
+    public OracleSavedJobsTest() {
+        super(OracleUtils.CONNECT_STRING,
+                OracleUtils.ORACLE_USER_NAME,
+                OracleUtils.ORACLE_USER_PASS,
+                JdbcDrivers.ORACLE.getDriverClass());
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/postgres/PostgresJobToolTest.java b/src/test/com/cloudera/sqoop/metastore/postgres/PostgresJobToolTest.java
new file mode 100644
index 0000000..b596fc8
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/postgres/PostgresJobToolTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.postgres;
+
+import com.cloudera.sqoop.metastore.JobToolTestBase;
+
+/**
+ * Test that the Job Tool works in PostgreSQL
+ *
+ * This uses JDBC to store and retrieve metastore data from a Postgres server
+ *
+ * Since this requires a Postgres installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=PostgresJobToolTest or -Dthirdparty=true.
+ *
+ *   Once you have a running Postgres database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.postgresql.connectstring.host_url, -Dsqoop.test.postgresql.database,
+ *   -Dsqoop.test.postgresql.username and -Dsqoop.test.postgresql.password respectively
+ */
+
+public class PostgresJobToolTest extends JobToolTestBase {
+
+    private static final String HOST_URL = System.getProperty("sqoop.test.postgresql.connectstring.host_url",
+        "jdbc:postgresql://localhost/");
+    private static final String DATABASE_USER = System.getProperty(
+        "sqoop.test.postgresql.username", "sqooptest");
+    private static final String DATABASE_NAME = System.getProperty(
+        "sqoop.test.postgresql.database", "sqooptest");
+    private static final String PASSWORD = System.getProperty("sqoop.test.postgresql.password");
+    private static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
+
+    public PostgresJobToolTest() {
+        super(CONNECT_STRING, DATABASE_USER, PASSWORD);
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/postgres/PostgresMetaConnectIncrementalImportTest.java b/src/test/com/cloudera/sqoop/metastore/postgres/PostgresMetaConnectIncrementalImportTest.java
new file mode 100644
index 0000000..21f4938
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/postgres/PostgresMetaConnectIncrementalImportTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.postgres;
+
+import com.cloudera.sqoop.metastore.MetaConnectIncrementalImportTestBase;
+
+/**
+ * Test that Incremental-Import values are stored correctly in PostgreSQL
+ *
+ * This uses JDBC to store and retrieve metastore data from a Postgres server
+ *
+ * Since this requires a Postgres installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=PostgresMetaConnectIncrementalImportTest or -Dthirdparty=true.
+ *
+ *   Once you have a running Postgres database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.postgresql.connectstring.host_url, -Dsqoop.test.postgresql.database,
+ *   -Dsqoop.test.postgresql.username and -Dsqoop.test.postgresql.password respectively
+ */
+
+public class PostgresMetaConnectIncrementalImportTest extends MetaConnectIncrementalImportTestBase {
+
+    private static final String HOST_URL = System.getProperty("sqoop.test.postgresql.connectstring.host_url",
+        "jdbc:postgresql://localhost/");
+    private static final String DATABASE_USER = System.getProperty(
+        "sqoop.test.postgresql.username", "sqooptest");
+    private static final String DATABASE_NAME = System.getProperty(
+        "sqoop.test.postgresql.database", "sqooptest");
+    private static final String PASSWORD = System.getProperty("sqoop.test.postgresql.password");
+    private static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
+
+    public PostgresMetaConnectIncrementalImportTest() {
+        super(CONNECT_STRING, DATABASE_USER, PASSWORD);
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/postgres/PostgresSavedJobsTest.java b/src/test/com/cloudera/sqoop/metastore/postgres/PostgresSavedJobsTest.java
new file mode 100644
index 0000000..ed06cb2
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/postgres/PostgresSavedJobsTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.postgres;
+
+import com.cloudera.sqoop.metastore.SavedJobsTestBase;
+import org.apache.sqoop.manager.JdbcDrivers;
+
+/**
+ * Test of GenericJobStorage compatibility with PostgreSQL
+ *
+ * This uses JDBC to store and retrieve metastore data from a Postgres server
+ *
+ * Since this requires a Postgres installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=PostgresSavedJobsTest or -Dthirdparty=true.
+ *
+ *   Once you have a running Postgres database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.postgresql.connectstring.host_url, -Dsqoop.test.postgresql.database,
+ *   -Dsqoop.test.postgresql.username and -Dsqoop.test.postgresql.password respectively
+ */
+
+public class PostgresSavedJobsTest extends SavedJobsTestBase {
+
+    private static final String HOST_URL = System.getProperty("sqoop.test.postgresql.connectstring.host_url",
+        "jdbc:postgresql://localhost/");
+    private static final String DATABASE_USER = System.getProperty(
+        "sqoop.test.postgresql.username", "sqooptest");
+    private static final String DATABASE_NAME = System.getProperty(
+        "sqoop.test.postgresql.database", "sqooptest");
+    private static final String PASSWORD = System.getProperty("sqoop.test.postgresql.password");
+    private static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
+
+    public PostgresSavedJobsTest() {
+        super(CONNECT_STRING, DATABASE_USER, PASSWORD, JdbcDrivers.POSTGRES.getDriverClass());
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/sqlserver/SqlServerJobToolTest.java b/src/test/com/cloudera/sqoop/metastore/sqlserver/SqlServerJobToolTest.java
new file mode 100644
index 0000000..e3f8bde
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/sqlserver/SqlServerJobToolTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.sqlserver;
+
+import com.cloudera.sqoop.metastore.JobToolTestBase;
+import org.apache.sqoop.manager.sqlserver.MSSQLTestUtils;
+
+/**
+ * Test that the Job Tool works in SQLServer
+ *
+ * This uses JDBC to store and retrieve metastore data from an SQLServer
+ *
+ * Since this requires an SQLServer installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=SqlServerJobToolTest or -Dthirdparty=true.
+ *
+ * You need to put SQL Server JDBC driver library (sqljdbc4.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running SQLServer database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.sqlserver.connectstring.host_url, -Dsqoop.test.sqlserver.database,
+ *   -Dms.sqlserver.username and -Dms.sqlserver.password respectively
+ */
+
+public class SqlServerJobToolTest extends JobToolTestBase {
+
+    private static MSSQLTestUtils msSQLTestUtils = new MSSQLTestUtils();
+
+    public SqlServerJobToolTest() {
+        super(msSQLTestUtils.getDBConnectString(),
+                msSQLTestUtils.getDBUserName(),
+                msSQLTestUtils.getDBPassWord());
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/sqlserver/SqlServerMetaConnectIncrementalImportTest.java b/src/test/com/cloudera/sqoop/metastore/sqlserver/SqlServerMetaConnectIncrementalImportTest.java
new file mode 100644
index 0000000..3c8ac5f
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/sqlserver/SqlServerMetaConnectIncrementalImportTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.sqlserver;
+
+import com.cloudera.sqoop.metastore.MetaConnectIncrementalImportTestBase;
+import org.apache.sqoop.manager.sqlserver.MSSQLTestUtils;
+
+/**
+ * Test that Incremental-Import values are stored correctly in SQLServer
+ *
+ * This uses JDBC to store and retrieve metastore data from an SQLServer
+ *
+ * Since this requires an SQLServer installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=SqlServerJobToolTest or -Dthirdparty=true.
+ *
+ * You need to put SQL Server JDBC driver library (sqljdbc4.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running SQLServer database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.sqlserver.connectstring.host_url, -Dsqoop.test.sqlserver.database,
+ *   -Dms.sqlserver.username and -Dms.sqlserver.password respectively
+ */
+
+public class SqlServerMetaConnectIncrementalImportTest extends MetaConnectIncrementalImportTestBase {
+
+    private static MSSQLTestUtils msSQLTestUtils = new MSSQLTestUtils();
+
+    public SqlServerMetaConnectIncrementalImportTest() {
+        super(msSQLTestUtils.getDBConnectString(),
+                msSQLTestUtils.getDBUserName(),
+                msSQLTestUtils.getDBPassWord());
+    }
+}
diff --git a/src/test/com/cloudera/sqoop/metastore/sqlserver/SqlServerSavedJobsTest.java b/src/test/com/cloudera/sqoop/metastore/sqlserver/SqlServerSavedJobsTest.java
new file mode 100644
index 0000000..5589f14
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/sqlserver/SqlServerSavedJobsTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore.sqlserver;
+
+import com.cloudera.sqoop.metastore.SavedJobsTestBase;
+import org.apache.sqoop.manager.JdbcDrivers;
+import org.apache.sqoop.manager.sqlserver.MSSQLTestUtils;
+
+/**
+ * Test of GenericJobStorage compatibility with SQLServer
+ *
+ * This uses JDBC to store and retrieve metastore data from an SQLServer
+ *
+ * Since this requires an SQLServer installation,
+ * this class is named in such a way that Sqoop's default QA process does
+ * not run it. You need to run this manually with
+ * -Dtestcase=SqlServerJobToolTest or -Dthirdparty=true.
+ *
+ * You need to put SQL Server JDBC driver library (sqljdbc4.jar) in a location
+ * where Sqoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
+ *
+ *   Once you have a running SQLServer database,
+ *   Set server URL, database name, username, and password with system variables
+ *   -Dsqoop.test.sqlserver.connectstring.host_url, -Dsqoop.test.sqlserver.database,
+ *   -Dms.sqlserver.username and -Dms.sqlserver.password respectively
+ */
+
+public class SqlServerSavedJobsTest extends SavedJobsTestBase {
+
+    private static MSSQLTestUtils msSQLTestUtils = new MSSQLTestUtils();
+
+    public SqlServerSavedJobsTest() {
+        super(msSQLTestUtils.getDBConnectString(),
+                msSQLTestUtils.getDBUserName(),
+                msSQLTestUtils.getDBPassWord(),
+                JdbcDrivers.SQLSERVER.getDriverClass());
+    }
+}
diff --git a/src/test/findbugsExcludeFile.xml b/src/test/findbugsExcludeFile.xml
index a27ec37..8aa4ed5 100644
--- a/src/test/findbugsExcludeFile.xml
+++ b/src/test/findbugsExcludeFile.xml
@@ -47,7 +47,7 @@
     <!-- createRootTable() allows a user-specified table name retrieved
          from properties. This since instance is allowed for now.
     -->
-    <Class name="com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage" />
+    <Class name="com.cloudera.sqoop.metastore.GenericJobStorage" />
     <Method name="createRootTable" />
     <Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" />
   </Match>