diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index 10de69a..d18141c 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -36,7 +36,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.accumulo.AccumuloConstants;
 import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
-import org.apache.sqoop.metastore.GenericJobStorage;
 import org.apache.sqoop.tool.BaseSqoopTool;
 import org.apache.sqoop.util.CredentialsUtil;
 import org.apache.sqoop.util.LoggingUtils;
@@ -1093,25 +1092,6 @@
 
     // set escape column mapping to true
     this.escapeColumnMappingEnabled = true;
-
-    this.metaConnectStr =
-            System.getProperty(GenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY, getLocalAutoConnectString());
-    this.metaUsername =
-            System.getProperty(GenericJobStorage.AUTO_STORAGE_USER_KEY, GenericJobStorage.DEFAULT_AUTO_USER);
-    this.metaPassword =
-            System.getProperty(GenericJobStorage.AUTO_STORAGE_PASS_KEY, GenericJobStorage.DEFAULT_AUTO_PASSWORD);
-  }
-
-  private String getLocalAutoConnectString() {
-    String homeDir = System.getProperty("user.home");
-
-    File homeDirObj = new File(homeDir);
-    File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
-    File databaseFileObj = new File(sqoopDataDirObj, "metastore.db");
-
-    String dbFileStr = databaseFileObj.toString();
-    return "jdbc:hsqldb:file:" + dbFileStr
-            + ";hsqldb.write_delay=false;shutdown=true";
   }
 
   /**
@@ -2826,12 +2806,26 @@
   public String getMetaConnectStr() {
     return metaConnectStr;
   }
+
+  public void setMetaConnectStr(String metaConnectStr) {
+    this.metaConnectStr = metaConnectStr;
+  }
+
   public String getMetaUsername() {
     return metaUsername;
   }
 
+  public void setMetaUsername(String metaUsername) {
+    this.metaUsername = metaUsername;
+  }
+
   public String getMetaPassword() {
     return metaPassword;
   }
+
+  public void setMetaPassword(String metaPassword) {
+    this.metaPassword = metaPassword;
+  }
+
 }
 
diff --git a/src/java/org/apache/sqoop/metastore/AutoGenericJobStorage.java b/src/java/org/apache/sqoop/metastore/AutoGenericJobStorage.java
new file mode 100644
index 0000000..a565850
--- /dev/null
+++ b/src/java/org/apache/sqoop/metastore/AutoGenericJobStorage.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.metastore;
+
+import java.io.File;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class AutoGenericJobStorage extends GenericJobStorage {
+
+  public static final Log LOG = LogFactory.getLog(AutoGenericJobStorage.class.getName());
+
+  /**
+   * Configuration key specifying whether this storage agent is active.
+   * Defaults to "on" to allow zero-conf local users.
+   */
+  public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
+      "sqoop.metastore.client.enable.autoconnect";
+
+  /**
+   * Configuration key specifying the connect string used by this
+   * storage agent.
+   */
+  public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
+      "sqoop.metastore.client.autoconnect.url";
+
+  /**
+   * Configuration key specifying the username to bind with.
+   */
+  public static final String AUTO_STORAGE_USER_KEY =
+      "sqoop.metastore.client.autoconnect.username";
+
+
+  /** HSQLDB default user is named 'SA'. */
+  private static final String DEFAULT_AUTO_USER = "SA";
+
+  /**
+   * Configuration key specifying the password to bind with.
+   */
+  public static final String AUTO_STORAGE_PASS_KEY =
+      "sqoop.metastore.client.autoconnect.password";
+
+  /** HSQLDB default user has an empty password. */
+  public static final String DEFAULT_AUTO_PASSWORD = "";
+
+  @Override
+  /** {@inheritDoc} */
+  public boolean canAccept(Map<String, String> descriptor) {
+    Configuration conf = this.getConf();
+    return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
+  }
+
+  /**
+   * Determine the user's home directory and return a connect
+   * string to HSQLDB that uses ~/.sqoop/ as the storage location
+   * for the metastore database.
+   */
+  private String getHomeDirFileConnectStr() {
+    String homeDir = System.getProperty("user.home");
+
+    File homeDirObj = new File(homeDir);
+    File sqoopDataDirObj = new File(homeDirObj, ".sqoop");
+    File databaseFileObj = new File(sqoopDataDirObj, "metastore.db");
+
+    String dbFileStr = databaseFileObj.toString();
+    return "jdbc:hsqldb:file:" + dbFileStr
+        + ";hsqldb.write_delay=false;shutdown=true";
+  }
+
+  @Override
+  protected void setConnectionParameters(Map<String, String> descriptor) {
+    Configuration conf = getConf();
+    setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY, getHomeDirFileConnectStr()));
+    setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER));
+    setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY, DEFAULT_AUTO_PASSWORD));
+  }
+}
diff --git a/src/java/org/apache/sqoop/metastore/GenericJobStorage.java b/src/java/org/apache/sqoop/metastore/GenericJobStorage.java
index e4ffde2..4117d7a 100644
--- a/src/java/org/apache/sqoop/metastore/GenericJobStorage.java
+++ b/src/java/org/apache/sqoop/metastore/GenericJobStorage.java
@@ -25,9 +25,11 @@
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import com.cloudera.sqoop.manager.ConnManager;
 import org.apache.commons.logging.Log;
@@ -40,6 +42,15 @@
 import com.cloudera.sqoop.metastore.JobStorage;
 import com.cloudera.sqoop.tool.SqoopTool;
 import org.apache.sqoop.manager.DefaultManagerFactory;
+import org.apache.sqoop.manager.JdbcDrivers;
+
+import static org.apache.commons.lang3.StringUtils.startsWith;
+import static org.apache.sqoop.manager.JdbcDrivers.DB2;
+import static org.apache.sqoop.manager.JdbcDrivers.HSQLDB;
+import static org.apache.sqoop.manager.JdbcDrivers.MYSQL;
+import static org.apache.sqoop.manager.JdbcDrivers.ORACLE;
+import static org.apache.sqoop.manager.JdbcDrivers.POSTGRES;
+import static org.apache.sqoop.manager.JdbcDrivers.SQLSERVER;
 
 /**
  * JobStorage implementation that uses a database to
@@ -63,9 +74,6 @@
    */
   public static final String META_PASSWORD_KEY = "metastore.password";
 
-  /** descriptor key identifying the class name of the jdbc driver */
-  public static final String META_DRIVER_KEY = "metastore.driver.class";
-
   /** Default name for the root metadata table. */
   private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT";
 
@@ -119,49 +127,16 @@
   private static final String PROPERTY_CLASS_CONFIG = "config";
 
   /**
-   * Configuration key specifying whether this storage agent is active.
-   * Defaults to "on" to allow zero-conf local users.
-   */
-  public static final String AUTO_STORAGE_IS_ACTIVE_KEY =
-          "sqoop.metastore.client.enable.autoconnect";
-
-  /**
-   * Configuration key specifying the connect string used by this
-   * storage agent.
-   */
-  public static final String AUTO_STORAGE_CONNECT_STRING_KEY =
-          "sqoop.metastore.client.autoconnect.url";
-
-  /**
-   * Configuration key specifying the username to bind with.
-   */
-  public static final String AUTO_STORAGE_USER_KEY =
-          "sqoop.metastore.client.autoconnect.username";
-
-
-  /** HSQLDB default user is named 'SA'. */
-  public static final String DEFAULT_AUTO_USER = "SA";
-
-  /**
-   * Configuration key specifying the password to bind with.
-   */
-  public static final String AUTO_STORAGE_PASS_KEY =
-          "sqoop.metastore.client.autoconnect.password";
-
-  /** HSQLDB default user has an empty password. */
-  public static final String DEFAULT_AUTO_PASSWORD = "";
-
-  /**
    * Per-job key with propClass 'schema' that specifies the SqoopTool
    * to load.
    */
   private static final String SQOOP_TOOL_KEY = "sqoop.tool";
+  private static final Set<JdbcDrivers> SUPPORTED_DRIVERS = EnumSet.of(HSQLDB, MYSQL, ORACLE, POSTGRES, DB2, SQLSERVER);
   private Map<String, String> connectedDescriptor;
   private String metastoreConnectStr;
   private String metastoreUser;
   private String metastorePassword;
   private Connection connection;
-  private String driverClass;
   private ConnManager connManager;
 
   protected Connection getConnection() {
@@ -184,9 +159,6 @@
     this.metastorePassword = pass;
   }
 
-  protected void setDriverClass(String driverClass) {
-    this.driverClass = driverClass;
-  }
   /**
    * Set the descriptor used to open() this storage.
    */
@@ -199,15 +171,27 @@
    * Initialize the connection to the database.
    */
   public void open(Map<String, String> descriptor) throws IOException {
-    setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
-    setMetastoreUser(descriptor.get(META_USERNAME_KEY));
-    setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
-    setDriverClass(descriptor.get(META_DRIVER_KEY));
+    setConnectionParameters(descriptor);
+    validateMetastoreConnectionString(metastoreConnectStr);
     setConnectedDescriptor(descriptor);
 
     init();
   }
 
+  protected void validateMetastoreConnectionString(String metastoreConnectStr) {
+    if (!isDbSupported(metastoreConnectStr)) {
+      String errorMessage = metastoreConnectStr + " is an invalid connection string or the required RDBMS is not supported." +
+          "Supported RDBMSs are: " + SUPPORTED_DRIVERS.toString();
+      throw new RuntimeException(errorMessage);
+    }
+  }
+
+  protected void setConnectionParameters(Map<String, String> descriptor) {
+    setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
+    setMetastoreUser(descriptor.get(META_USERNAME_KEY));
+    setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
+  }
+
   protected void init() throws IOException {
     try {
       connManager = createConnManager();
@@ -263,8 +247,10 @@
   @Override
   public void close() throws IOException {
       try {
-          LOG.debug("Closing connection manager");
-          connManager.close();
+          if (connManager != null) {
+            LOG.debug("Closing connection manager");
+            connManager.close();
+          }
       } catch (SQLException sqlE) {
           throw new IOException("Exception closing connection manager", sqlE);
       } finally {
@@ -275,12 +261,7 @@
   @Override
   /** {@inheritDoc} */
   public boolean canAccept(Map<String, String> descriptor) {
-    // We return true if the desciptor contains a connect string to find
-    // the database or auto-connect is enabled
-    Configuration conf = this.getConf();
-    boolean metaConnectTrue = descriptor.get(META_CONNECT_KEY) != null;
-    boolean autoConnectEnabled = conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
-    return metaConnectTrue || autoConnectEnabled;
+    return descriptor.get(META_CONNECT_KEY) != null;
   }
 
   @Override
@@ -854,5 +835,13 @@
     return dmf.accept(jd);
   }
 
-}
+  protected boolean isDbSupported(String metaConnectString) {
+    for (JdbcDrivers driver : SUPPORTED_DRIVERS) {
+      if (startsWith(metaConnectString, driver.getSchemePrefix())) {
+        return true;
+      }
+    }
+    return false;
+  }
 
+}
diff --git a/src/java/org/apache/sqoop/metastore/JobStorageFactory.java b/src/java/org/apache/sqoop/metastore/JobStorageFactory.java
index 9a348d5..2163f2c 100644
--- a/src/java/org/apache/sqoop/metastore/JobStorageFactory.java
+++ b/src/java/org/apache/sqoop/metastore/JobStorageFactory.java
@@ -42,7 +42,8 @@
 
   /** The default list of available JobStorage implementations. */
   private static final String DEFAULT_AVAILABLE_STORAGES =
-      "com.cloudera.sqoop.metastore.GenericJobStorage";
+      "com.cloudera.sqoop.metastore.GenericJobStorage,"
+      + "org.apache.sqoop.metastore.AutoGenericJobStorage";
 
   public JobStorageFactory(Configuration config) {
     this.conf = config;
diff --git a/src/java/org/apache/sqoop/tool/JobTool.java b/src/java/org/apache/sqoop/tool/JobTool.java
index 5b95c7d..72234ba 100644
--- a/src/java/org/apache/sqoop/tool/JobTool.java
+++ b/src/java/org/apache/sqoop/tool/JobTool.java
@@ -18,21 +18,16 @@
 
 package org.apache.sqoop.tool;
 
-import static org.apache.sqoop.manager.JdbcDrivers.DB2;
-import static org.apache.sqoop.manager.JdbcDrivers.HSQLDB;
-import static org.apache.sqoop.manager.JdbcDrivers.MYSQL;
-import static org.apache.sqoop.manager.JdbcDrivers.ORACLE;
-import static org.apache.sqoop.manager.JdbcDrivers.POSTGRES;
-import static org.apache.sqoop.manager.JdbcDrivers.SQLSERVER;
+import static com.cloudera.sqoop.metastore.GenericJobStorage.META_CONNECT_KEY;
+import static com.cloudera.sqoop.metastore.GenericJobStorage.META_PASSWORD_KEY;
+import static com.cloudera.sqoop.metastore.GenericJobStorage.META_USERNAME_KEY;
 
 import java.io.IOException;
 
 import java.util.Arrays;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.cli.CommandLine;
@@ -47,11 +42,9 @@
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
 import com.cloudera.sqoop.cli.ToolOptions;
-import com.cloudera.sqoop.metastore.GenericJobStorage;
 import com.cloudera.sqoop.metastore.JobData;
 import com.cloudera.sqoop.metastore.JobStorage;
 import com.cloudera.sqoop.metastore.JobStorageFactory;
-import org.apache.sqoop.manager.JdbcDrivers;
 import org.apache.sqoop.metastore.PasswordRedactor;
 import org.apache.sqoop.util.LoggingUtils;
 
@@ -64,8 +57,6 @@
       JobTool.class.getName());
   private static final String DASH_STR  = "--";
 
-  private static Set<JdbcDrivers> SUPPORTED_DRIVERS = EnumSet.of(HSQLDB, MYSQL, ORACLE, POSTGRES, DB2, SQLSERVER);
-
   private enum JobOp {
     JobCreate,
     JobDelete,
@@ -356,9 +347,8 @@
       throw new InvalidOptionsException("");
     }
 
-    this.storageDescriptor = new TreeMap<String, String>();
-
     applyMetastoreOptions(in, out);
+
     // These are generated via an option group; exactly one
     // of this exhaustive list will always be selected.
     if (in.hasOption(JOB_CMD_CREATE_ARG)) {
@@ -376,46 +366,30 @@
       this.operation = JobOp.JobShow;
       this.jobName = in.getOptionValue(JOB_CMD_SHOW_ARG);
     }
+
+    initializeStorageDescriptor(out);
+  }
+
+  private void initializeStorageDescriptor(SqoopOptions options) throws InvalidOptionsException {
+    storageDescriptor = new TreeMap<>();
+
+    storageDescriptor.put(META_CONNECT_KEY, options.getMetaConnectStr());
+    storageDescriptor.put(META_USERNAME_KEY, options.getMetaUsername());
+    storageDescriptor.put(META_PASSWORD_KEY, options.getMetaPassword());
   }
 
   private void applyMetastoreOptions(CommandLine in, SqoopOptions out) throws InvalidOptionsException {
-    String metaConnectString;
-    String metaUsernameString;
-    String metaPasswordString;
     if (in.hasOption(STORAGE_METASTORE_ARG)) {
-      metaConnectString = in.getOptionValue(STORAGE_METASTORE_ARG);
-      this.storageDescriptor.put(GenericJobStorage.META_DRIVER_KEY, chooseDriverType(metaConnectString));
-      this.storageDescriptor.put(GenericJobStorage.META_CONNECT_KEY, metaConnectString);
-    } else {
-      metaConnectString = out.getMetaConnectStr();
-      this.storageDescriptor.put(GenericJobStorage.META_DRIVER_KEY, chooseDriverType(metaConnectString));
-      this.storageDescriptor.put(GenericJobStorage.META_CONNECT_KEY, metaConnectString);
+      out.setMetaConnectStr(in.getOptionValue(STORAGE_METASTORE_ARG));
     }
     if (in.hasOption(METASTORE_USER_ARG)) {
-      metaUsernameString = in.getOptionValue(METASTORE_USER_ARG);
-      this.storageDescriptor.put(GenericJobStorage.META_USERNAME_KEY, metaUsernameString);
-    } else {
-      metaUsernameString = out.getMetaUsername();
-      this.storageDescriptor.put(GenericJobStorage.META_USERNAME_KEY, metaUsernameString);
+      out.setMetaUsername(in.getOptionValue(METASTORE_USER_ARG));
     }
     if (in.hasOption(METASTORE_PASS_ARG)) {
-      metaPasswordString = in.getOptionValue(METASTORE_PASS_ARG);
-      this.storageDescriptor.put(GenericJobStorage.META_PASSWORD_KEY, metaPasswordString);
-    } else {
-      metaPasswordString = out.getMetaPassword();
-      this.storageDescriptor.put(GenericJobStorage.META_PASSWORD_KEY, metaPasswordString);
+      out.setMetaPassword(in.getOptionValue(METASTORE_PASS_ARG));
     }
   }
 
-  private String chooseDriverType(String metaConnectString) throws InvalidOptionsException {
-    for (JdbcDrivers driver : SUPPORTED_DRIVERS) {
-      if (metaConnectString.startsWith(driver.getSchemePrefix())) {
-        return driver.getDriverClass();
-      }
-    }
-    throw new InvalidOptionsException("current meta-connect scheme not compatible with metastore");
-  }
-
   @Override
   /** {@inheritDoc} */
   public void validateOptions(SqoopOptions options)
diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
index 166792b..1faa52b 100644
--- a/src/test/com/cloudera/sqoop/TestIncrementalImport.java
+++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
@@ -48,7 +48,7 @@
 import com.cloudera.sqoop.testutil.CommonArgs;
 import com.cloudera.sqoop.tool.ImportTool;
 import com.cloudera.sqoop.tool.JobTool;
-import org.apache.sqoop.metastore.GenericJobStorage;
+import org.apache.sqoop.metastore.AutoGenericJobStorage;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -80,11 +80,6 @@
 
   @Before
   public void setUp() throws Exception {
-    // Delete db state between tests
-    System.setProperty(GenericJobStorage.AUTO_STORAGE_USER_KEY, AUTO_STORAGE_USERNAME);
-    System.setProperty(GenericJobStorage.AUTO_STORAGE_PASS_KEY, AUTO_STORAGE_PASSWORD);
-    System.setProperty(GenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
-            SOURCE_DB_URL);
     resetSourceDataSchema();
   }
 
@@ -98,9 +93,9 @@
 
   public static Configuration newConf() {
     Configuration conf = new Configuration();
-    conf.set(GenericJobStorage.AUTO_STORAGE_USER_KEY, AUTO_STORAGE_USERNAME);
-    conf.set(GenericJobStorage.AUTO_STORAGE_PASS_KEY, AUTO_STORAGE_PASSWORD);
-    conf.set(GenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
+    conf.set(AutoGenericJobStorage.AUTO_STORAGE_USER_KEY, AUTO_STORAGE_USERNAME);
+    conf.set(AutoGenericJobStorage.AUTO_STORAGE_PASS_KEY, AUTO_STORAGE_PASSWORD);
+    conf.set(AutoGenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY,
             SOURCE_DB_URL);
     return conf;
   }
diff --git a/src/test/com/cloudera/sqoop/metastore/SavedJobsTestBase.java b/src/test/com/cloudera/sqoop/metastore/SavedJobsTestBase.java
index 81789e7..3a414ea 100644
--- a/src/test/com/cloudera/sqoop/metastore/SavedJobsTestBase.java
+++ b/src/test/com/cloudera/sqoop/metastore/SavedJobsTestBase.java
@@ -19,7 +19,6 @@
 package com.cloudera.sqoop.metastore;
 
 import static org.apache.sqoop.metastore.GenericJobStorage.META_CONNECT_KEY;
-import static org.apache.sqoop.metastore.GenericJobStorage.META_DRIVER_KEY;
 import static org.apache.sqoop.metastore.GenericJobStorage.META_PASSWORD_KEY;
 import static org.apache.sqoop.metastore.GenericJobStorage.META_USERNAME_KEY;
 
@@ -91,7 +90,6 @@
     descriptor.put(META_CONNECT_KEY, metaConnect);
     descriptor.put(META_USERNAME_KEY, metaUser);
     descriptor.put(META_PASSWORD_KEY, metaPassword);
-    descriptor.put(META_DRIVER_KEY, driverClass);
 
     JobStorageFactory ssf = new JobStorageFactory(conf);
     storage = ssf.getJobStorage(descriptor);
@@ -144,7 +142,6 @@
     conf.set(META_CONNECT_KEY, metaConnect);
     conf.set(META_USERNAME_KEY, metaUser);
     conf.set(META_PASSWORD_KEY, metaPassword);
-    conf.set(META_DRIVER_KEY, driverClass);
 
     return conf;
   }
diff --git a/src/test/com/cloudera/sqoop/metastore/TestMetastoreConfigurationParameters.java b/src/test/com/cloudera/sqoop/metastore/TestMetastoreConfigurationParameters.java
new file mode 100644
index 0000000..a485b9a
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/metastore/TestMetastoreConfigurationParameters.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.metastore;
+
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import org.apache.sqoop.Sqoop;
+import org.apache.sqoop.testutil.Argument;
+import org.apache.sqoop.tool.JobTool;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static org.apache.sqoop.testutil.Argument.from;
+import static org.apache.sqoop.testutil.Argument.fromPair;
+import static org.apache.sqoop.testutil.ArgumentUtils.createArgumentArray;
+import static org.apache.sqoop.testutil.ArgumentUtils.createArgumentArrayFromProperties;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestMetastoreConfigurationParameters {
+
+    private static final int STATUS_FAILURE = 1;
+    private static final int STATUS_SUCCESS = 0;
+    private static final String TEST_USER = "sqoop";
+    private static final String TEST_PASSWORD = "sqoop";
+    private static final String DEFAULT_HSQLDB_USER = "SA";
+    private static final String NON_DEFAULT_PASSWORD = "NOT_DEFAULT";
+    private static HsqldbTestServer testHsqldbServer;
+
+    private Sqoop sqoop;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        testHsqldbServer = new HsqldbTestServer();
+        testHsqldbServer.start();
+        setupUsersForTesting();
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        testHsqldbServer.stop();
+    }
+
+    @Before
+    public void before() {
+        sqoop = new Sqoop(new JobTool());
+    }
+
+    @Test
+    public void testJobToolWithAutoConnectDisabledFails() throws IOException {
+        Argument autoConnectProperty = fromPair("sqoop.metastore.client.enable.autoconnect", "false");
+        String[] arguments = createArgumentArrayFromProperties(singleton(autoConnectProperty));
+        assertEquals(STATUS_FAILURE, Sqoop.runSqoop(sqoop, arguments));
+    }
+
+    @Test
+    public void testJobToolWithAutoConnectUrlAndCorrectUsernamePasswordSpecifiedSuccessfullyRuns() {
+        int status = runJobToolWithAutoConnectUrlAndCorrectUsernamePasswordSpecified();
+        assertEquals(STATUS_SUCCESS, status);
+    }
+
+    @Test
+    public void testJobToolWithAutoConnectUrlAndCorrectUsernamePasswordSpecifiedInitializesSpecifiedDatabase() throws SQLException {
+        runJobToolWithAutoConnectUrlAndCorrectUsernamePasswordSpecified();
+        verifyMetastoreIsInitialized();
+    }
+
+    private int runJobToolWithAutoConnectUrlAndCorrectUsernamePasswordSpecified() {
+        Argument url = fromPair("sqoop.metastore.client.autoconnect.url", HsqldbTestServer.getUrl());
+        Argument user = fromPair("sqoop.metastore.client.autoconnect.username", TEST_USER);
+        Argument password = fromPair("sqoop.metastore.client.autoconnect.password", TEST_PASSWORD);
+        Argument listJob = from("list");
+
+        Iterable<Argument> properties = asList(url, user, password);
+        Iterable<Argument> options = singleton(listJob);
+
+        String[] arguments = createArgumentArray(properties, options);
+        return Sqoop.runSqoop(sqoop, arguments);
+    }
+
+    private static void setupUsersForTesting() throws SQLException {
+        // We create a new user and change the password of SA to make sure that Sqoop does not connect to metastore with the default user and password.
+        testHsqldbServer.createNewUser(TEST_USER, TEST_PASSWORD);
+        testHsqldbServer.changePasswordForUser(DEFAULT_HSQLDB_USER, NON_DEFAULT_PASSWORD);
+    }
+
+    private void verifyMetastoreIsInitialized() throws SQLException {
+        try (Connection connection = testHsqldbServer.getConnection(TEST_USER, TEST_PASSWORD); Statement statement = connection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery("SELECT * FROM SQOOP_ROOT");
+            assertTrue(resultSet.next());
+        }
+    }
+
+}
diff --git a/src/test/com/cloudera/sqoop/testutil/HsqldbTestServer.java b/src/test/com/cloudera/sqoop/testutil/HsqldbTestServer.java
index 8d0a30d..ad68b61 100644
--- a/src/test/com/cloudera/sqoop/testutil/HsqldbTestServer.java
+++ b/src/test/com/cloudera/sqoop/testutil/HsqldbTestServer.java
@@ -113,7 +113,19 @@
     }
   }
 
+  public void stop() {
+    if (null == server) {
+      return;
+    }
+    server.stop();
+    server = null;
+  }
+
   public Connection getConnection() throws SQLException {
+    return getConnection(null, null);
+  }
+
+  public Connection getConnection(String user, String password) throws SQLException {
     try {
       Class.forName(DRIVER_CLASS);
     } catch (ClassNotFoundException cnfe) {
@@ -122,7 +134,7 @@
       return null;
     }
 
-    Connection connection = DriverManager.getConnection(DB_URL);
+    Connection connection = DriverManager.getConnection(DB_URL, user, password);
     connection.setAutoCommit(false);
     return connection;
   }
@@ -267,5 +279,16 @@
     return new HsqldbManager(getSqoopOptions());
   }
 
+  public void createNewUser(String username, String password) throws SQLException {
+    try (Connection connection = getConnection(); Statement statement = connection.createStatement()) {
+      statement.executeUpdate(String.format("CREATE USER %s PASSWORD %s ADMIN", username, password));
+    }
+  }
+
+  public void changePasswordForUser(String username, String newPassword) throws SQLException {
+    try (Connection connection = getConnection(); Statement statement = connection.createStatement()) {
+      statement.executeUpdate(String.format("ALTER USER %s SET PASSWORD %s", username, newPassword));
+    }
+  }
 
 }
diff --git a/src/test/org/apache/sqoop/metastore/TestAutoGenericJobStorage.java b/src/test/org/apache/sqoop/metastore/TestAutoGenericJobStorage.java
new file mode 100644
index 0000000..d5424c6
--- /dev/null
+++ b/src/test/org/apache/sqoop/metastore/TestAutoGenericJobStorage.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.sqoop.metastore.AutoGenericJobStorage.AUTO_STORAGE_IS_ACTIVE_KEY;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestAutoGenericJobStorage {
+
+  private AutoGenericJobStorage jobStorage;
+
+  private Configuration jobStorageConfiguration;
+
+  private Map<String, String> descriptor;
+
+  @Before
+  public void before() {
+    jobStorage = new AutoGenericJobStorage();
+    jobStorageConfiguration = new Configuration();
+    descriptor = new HashMap<>();
+
+    jobStorage.setConf(jobStorageConfiguration);
+  }
+
+  @Test
+  public void testCanAcceptWithAutoStorageDisabledReturnsFalse() {
+    jobStorageConfiguration.setBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, false);
+    assertFalse(jobStorage.canAccept(descriptor));
+  }
+
+  @Test
+  public void testCanAcceptWithAutoStorageEnabledReturnsTrue() {
+    jobStorageConfiguration.setBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true);
+    assertTrue(jobStorage.canAccept(descriptor));
+  }
+
+  @Test
+  public void testCanAcceptWithAutoStorageDefaultValueReturnsTrue() {
+    assertTrue(jobStorage.canAccept(descriptor));
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/metastore/TestGenericJobStorage.java b/src/test/org/apache/sqoop/metastore/TestGenericJobStorage.java
new file mode 100644
index 0000000..026fbde
--- /dev/null
+++ b/src/test/org/apache/sqoop/metastore/TestGenericJobStorage.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.metastore;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.sqoop.metastore.GenericJobStorage.META_CONNECT_KEY;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestGenericJobStorage {
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private GenericJobStorage jobStorage;
+
+  private Map<String, String> descriptor;
+
+  @Before
+  public void before() {
+    jobStorage = new GenericJobStorage();
+    descriptor = new HashMap<>();
+  }
+
+  @Test
+  public void testCanAcceptWithMetaConnectStringSetReturnsTrue() {
+    descriptor.put(META_CONNECT_KEY, "anyvalue");
+    assertTrue(jobStorage.canAccept(descriptor));
+  }
+
+  @Test
+  public void testCanAcceptWithoutMetaConnectStringSetReturnsFalse() {
+    assertFalse(jobStorage.canAccept(descriptor));
+  }
+
+  /**
+   * This method validates that the public open() method invokes the connection string validation before connecting.
+   * For detailed testing of the validation check TestGenericJobStorageValidate test class.
+   * @see org.apache.sqoop.metastore.TestGenericJobStorageValidate
+   * @throws IOException
+   */
+  @Test
+  public void testOpenWithInvalidConnectionStringThrows() throws IOException {
+    String invalidConnectionString = "invalidConnectionString";
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage(invalidConnectionString + " is an invalid connection string or the required RDBMS is not supported.");
+    descriptor.put(META_CONNECT_KEY, invalidConnectionString);
+
+    jobStorage.open(descriptor);
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/metastore/TestGenericJobStorageValidate.java b/src/test/org/apache/sqoop/metastore/TestGenericJobStorageValidate.java
new file mode 100644
index 0000000..9995a42
--- /dev/null
+++ b/src/test/org/apache/sqoop/metastore/TestGenericJobStorageValidate.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.metastore;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+
+@RunWith(Parameterized.class)
+public class TestGenericJobStorageValidate {
+
+  @Parameters(name = "metastoreConnectionString = {0}, validationShouldFail = {1}")
+  public static Iterable<? extends Object> parameters() {
+    return Arrays.asList(
+        new Object[]{"jdbc:mysql://localhost/", false},
+        new Object[]{"jdbc:oracle://localhost/", false},
+        new Object[]{"jdbc:hsqldb://localhost/", false},
+        new Object[]{"jdbc:postgresql://localhost/", false},
+        new Object[]{"jdbc:sqlserver://localhost/", false},
+        new Object[]{"jdbc:db2://localhost/", false},
+        new Object[]{"jdbc:dummy://localhost/", true},
+        new Object[]{null, true},
+        new Object[]{"", true});
+  }
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final String connectionString;
+
+  private final boolean expectedToFail;
+
+  private GenericJobStorage jobStorage;
+
+  public TestGenericJobStorageValidate(String connectionString, boolean expectedToFail) {
+    this.connectionString = connectionString;
+    this.expectedToFail = expectedToFail;
+  }
+
+  @Before
+  public void before() {
+    jobStorage = new GenericJobStorage();
+  }
+
+  @Test
+  public void testValidateMetastoreConnectionStringWithParameters() {
+    if (expectedToFail) {
+      expectedException.expect(RuntimeException.class);
+      expectedException.expectMessage(connectionString + " is an invalid connection string or the required RDBMS is not supported.");
+    }
+    jobStorage.validateMetastoreConnectionString(connectionString);
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/testutil/Argument.java b/src/test/org/apache/sqoop/testutil/Argument.java
new file mode 100644
index 0000000..0832ed0
--- /dev/null
+++ b/src/test/org/apache/sqoop/testutil/Argument.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.testutil;
+
+import java.util.Objects;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+public class Argument {
+
+  private final String name;
+
+  private final String value;
+
+  public Argument(String name) {
+    this(name, "");
+  }
+
+  public Argument(String name, String value) {
+    Objects.requireNonNull(name);
+    this.name = name;
+    this.value = value;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public static Argument fromPair(String name, String value) {
+    return new Argument(name, value);
+  }
+
+  public static Argument from(String name) {
+    return new Argument(name);
+  }
+
+  @Override
+  public String toString() {
+    if (isEmpty(value)) {
+      return name;
+    } else {
+      return String.format("%s=%s", name, value);
+    }
+  }
+}
diff --git a/src/test/org/apache/sqoop/testutil/ArgumentUtils.java b/src/test/org/apache/sqoop/testutil/ArgumentUtils.java
new file mode 100644
index 0000000..2f95e45
--- /dev/null
+++ b/src/test/org/apache/sqoop/testutil/ArgumentUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.testutil;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+public final class ArgumentUtils {
+
+  private static final String PROPERTY_PREFIX = "-D";
+
+  private static final String OPTION_PREFIX = "--";
+
+  public static String[] createArgumentArrayFromProperties(Iterable<Argument> properties) {
+    List<String> result = new ArrayList<>();
+    for (Argument property : properties) {
+      result.add(PROPERTY_PREFIX);
+      result.add(property.toString());
+    }
+
+    return result.toArray(new String[result.size()]);
+  }
+
+  public static String[] createArgumentArrayFromOptions(Iterable<Argument> options) {
+    List<String> result = new ArrayList<>();
+    for (Argument option : options) {
+      result.add(OPTION_PREFIX + option.getName());
+      if (!isEmpty(option.getValue())) {
+        result.add(option.getValue());
+      }
+    }
+
+    return result.toArray(new String[result.size()]);
+  }
+
+  public static String[] createArgumentArray(Iterable<Argument> properties, Iterable<Argument> options) {
+    List<String> result = new ArrayList<>();
+    Collections.addAll(result, createArgumentArrayFromProperties(properties));
+    Collections.addAll(result, createArgumentArrayFromOptions(options));
+
+    return result.toArray(new String[result.size()]);
+  }
+
+}
