support using mariadb connector with mysql extensions (#11402)

* support using mariadb connector with mysql extensions

* cleanup and more tests

* fix test

* javadocs, more tests, etc

* style and more test

* more test more better

* missing pom

* more pom
diff --git a/.travis.yml b/.travis.yml
index 2d2c304..e76ecf5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -610,6 +610,11 @@
       jdk: openjdk8
       env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
 
+    - <<: *integration_query
+      name: "(Compile=openjdk8, Run=openjdk8) query integration test (mariaDB)"
+      jdk: openjdk8
+      env: TESTNG_GROUPS='-Dgroups=query' USE_INDEXER='middleManager' MYSQL_DRIVER_CLASSNAME='org.mariadb.jdbc.Driver'
+
     # END - Integration tests for Compile with Java 8 and Run with Java 8
 
     # START - Integration tests for Compile with Java 8 and Run with Java 11
@@ -683,6 +688,11 @@
       jdk: openjdk8
       env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
 
+    - <<: *integration_query
+      name: "(Compile=openjdk8, Run=openjdk11) query integration test (mariaDB)"
+      jdk: openjdk8
+      env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' MYSQL_DRIVER_CLASSNAME='org.mariadb.jdbc.Driver'
+
     # END - Integration tests for Compile with Java 8 and Run with Java 11
 
     - &integration_batch_index_k8s
diff --git a/core/pom.xml b/core/pom.xml
index 70ca838..f316e8c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -350,10 +350,34 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mariadb.jdbc</groupId>
+      <artifactId>mariadb-java-client</artifactId>
+      <version>${mariadb.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <version>${postgresql.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java
index a665107..c985009 100644
--- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java
+++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java
@@ -49,16 +49,19 @@
   @JsonProperty("dbcp")
   private Properties dbcpProperties;
 
+  @JsonProperty
   public boolean isCreateTables()
   {
     return createTables;
   }
 
+  @JsonProperty
   public String getHost()
   {
     return host;
   }
 
+  @JsonProperty
   public int getPort()
   {
     return port;
@@ -73,16 +76,19 @@
     return connectURI;
   }
 
+  @JsonProperty
   public String getUser()
   {
     return user;
   }
 
+  @JsonProperty
   public String getPassword()
   {
     return passwordProvider == null ? null : passwordProvider.getPassword();
   }
 
+  @JsonProperty("dbcp")
   public Properties getDbcpProperties()
   {
     return dbcpProperties;
@@ -132,6 +138,7 @@
         : !getDbcpProperties().equals(that.getDbcpProperties())) {
       return false;
     }
+
     return passwordProvider != null ? passwordProvider.equals(that.passwordProvider) : that.passwordProvider == null;
 
   }
diff --git a/core/src/main/java/org/apache/druid/utils/ConnectionUriUtils.java b/core/src/main/java/org/apache/druid/utils/ConnectionUriUtils.java
index 21f47ec..0cd201d 100644
--- a/core/src/main/java/org/apache/druid/utils/ConnectionUriUtils.java
+++ b/core/src/main/java/org/apache/druid/utils/ConnectionUriUtils.java
@@ -19,18 +19,33 @@
 
 package org.apache.druid.utils;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.IAE;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Properties;
 import java.util.Set;
 
 public final class ConnectionUriUtils
 {
+  private static final String MARIADB_EXTRAS = "nonMappedOptions";
   // Note: MySQL JDBC connector 8 supports 7 other protocols than just `jdbc:mysql:`
   // (https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-jdbc-url-format.html).
   // We should consider either expanding recognized mysql protocols or restricting allowed protocols to
   // just a basic one.
   public static final String MYSQL_PREFIX = "jdbc:mysql:";
   public static final String POSTGRES_PREFIX = "jdbc:postgresql:";
+  public static final String MARIADB_PREFIX = "jdbc:mariadb:";
+
+  public static final String POSTGRES_DRIVER = "org.postgresql.Driver";
+  public static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
+  public static final String MYSQL_NON_REGISTERING_DRIVER = "com.mysql.jdbc.NonRegisteringDriver";
+  public static final String MARIADB_DRIVER = "org.mariadb.jdbc.Driver";
 
   /**
    * This method checks {@param actualProperties} against {@param allowedProperties} if they are not system properties.
@@ -57,6 +72,211 @@
     }
   }
 
+  /**
+   * This method tries to determine the correct type of database for a given JDBC connection string URI, then load the
+   * driver using reflection to parse the uri parameters, returning the set of keys which can be used for JDBC
+   * parameter whitelist validation.
+   *
+   * uris starting with {@link #MYSQL_PREFIX} will first try to use the MySQL Connector/J driver (5.x), then fallback
+   * to MariaDB Connector/J (version 2.x) which also accepts jdbc:mysql uris. This method does not attempt to use
+   * MariaDB Connector/J 3.x alpha driver (at the time of these javadocs, it only handles the jdbc:mariadb prefix)
+   *
+   * uris starting with {@link #POSTGRES_PREFIX} will use the postgresql driver to parse the uri
+   *
+   * uris starting with {@link #MARIADB_PREFIX} will first try to use MariaDB Connector/J driver (2.x) then fallback to
+   * MariaDB Connector/J 3.x driver.
+   *
+   * If the uri does not match any of these schemes, this method will return an empty set if unknown uris are allowed,
+   * or throw an exception if not.
+   */
+  public static Set<String> tryParseJdbcUriParameters(String connectionUri, boolean allowUnknown)
+  {
+    if (connectionUri.startsWith(MYSQL_PREFIX)) {
+      try {
+        return tryParseMySqlConnectionUri(connectionUri);
+      }
+      catch (ClassNotFoundException notFoundMysql) {
+        try {
+          return tryParseMariaDb2xConnectionUri(connectionUri);
+        }
+        catch (ClassNotFoundException notFoundMaria2x) {
+          throw new RuntimeException(
+              "Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
+              notFoundMysql
+          );
+        }
+        catch (IllegalArgumentException iaeMaria2x) {
+          throw iaeMaria2x;
+        }
+        catch (Throwable otherMaria2x) {
+          throw new RuntimeException(otherMaria2x);
+        }
+      }
+      catch (IllegalArgumentException iaeMySql) {
+        throw iaeMySql;
+      }
+      catch (Throwable otherMysql) {
+        throw new RuntimeException(otherMysql);
+      }
+    } else if (connectionUri.startsWith(MARIADB_PREFIX)) {
+      try {
+        return tryParseMariaDb2xConnectionUri(connectionUri);
+      }
+      catch (ClassNotFoundException notFoundMaria2x) {
+        try {
+          return tryParseMariaDb3xConnectionUri(connectionUri);
+        }
+        catch (ClassNotFoundException notFoundMaria3x) {
+          throw new RuntimeException(
+              "Failed to find MariaDB driver class. Please check the MariaDB connector version 2.7.3 is in the classpath",
+              notFoundMaria2x
+          );
+        }
+        catch (IllegalArgumentException iaeMaria3x) {
+          throw iaeMaria3x;
+        }
+        catch (Throwable otherMaria3x) {
+          throw new RuntimeException(otherMaria3x);
+        }
+      }
+      catch (IllegalArgumentException iaeMaria2x) {
+        throw iaeMaria2x;
+      }
+      catch (Throwable otherMaria2x) {
+        throw new RuntimeException(otherMaria2x);
+      }
+    } else if (connectionUri.startsWith(POSTGRES_PREFIX)) {
+      try {
+        return tryParsePostgresConnectionUri(connectionUri);
+      }
+      catch (IllegalArgumentException iaePostgres) {
+        throw iaePostgres;
+      }
+      catch (Throwable otherPostgres) {
+        // no special handling for class not found because postgres driver is in distribution and should be available.
+        throw new RuntimeException(otherPostgres);
+      }
+    } else {
+      if (!allowUnknown) {
+        throw new IAE("Unknown JDBC connection scheme: %s", connectionUri.split(":")[1]);
+      }
+      return Collections.emptySet();
+    }
+  }
+
+  public static Set<String> tryParsePostgresConnectionUri(String connectionUri)
+      throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException
+  {
+    Class<?> driverClass = Class.forName(POSTGRES_DRIVER);
+    Method parseUrl = driverClass.getMethod("parseURL", String.class, Properties.class);
+    Properties properties = (Properties) parseUrl.invoke(null, connectionUri, null);
+    if (properties == null) {
+      throw new IAE("Invalid URL format for PostgreSQL: [%s]", connectionUri);
+    }
+    Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
+    properties.forEach((k, v) -> keys.add((String) k));
+    return keys;
+  }
+
+  public static Set<String> tryParseMySqlConnectionUri(String connectionUri)
+      throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException,
+             InvocationTargetException
+  {
+    Class<?> driverClass = Class.forName(MYSQL_NON_REGISTERING_DRIVER);
+    Method parseUrl = driverClass.getMethod("parseURL", String.class, Properties.class);
+    // almost the same as postgres, but is an instance level method
+    Properties properties = (Properties) parseUrl.invoke(driverClass.getConstructor().newInstance(), connectionUri, null);
+
+    if (properties == null) {
+      throw new IAE("Invalid URL format for MySQL: [%s]", connectionUri);
+    }
+    Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
+    properties.forEach((k, v) -> keys.add((String) k));
+    return keys;
+  }
+
+  public static Set<String> tryParseMariaDb2xConnectionUri(String connectionUri)
+      throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException,
+             NoSuchFieldException, InstantiationException
+  {
+    // these are a bit more complicated
+    Class<?> urlParserClass = Class.forName("org.mariadb.jdbc.UrlParser");
+    Class<?> optionsClass = Class.forName("org.mariadb.jdbc.util.Options");
+    Method parseUrl = urlParserClass.getMethod("parse", String.class);
+    Method getOptions = urlParserClass.getMethod("getOptions");
+
+    Object urlParser = parseUrl.invoke(null, connectionUri);
+
+    if (urlParser == null) {
+      throw new IAE("Invalid URL format for MariaDB: [%s]", connectionUri);
+    }
+
+    Object options = getOptions.invoke(urlParser);
+    Field nonMappedOptionsField = optionsClass.getField(MARIADB_EXTRAS);
+    Properties properties = (Properties) nonMappedOptionsField.get(options);
+
+    Field[] fields = optionsClass.getDeclaredFields();
+    Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size() + fields.length);
+    properties.forEach((k, v) -> keys.add((String) k));
+
+    Object defaultOptions = optionsClass.getConstructor().newInstance();
+    for (Field field : fields) {
+      if (field.getName().equals(MARIADB_EXTRAS)) {
+        continue;
+      }
+      try {
+        if (!Objects.equal(field.get(options), field.get(defaultOptions))) {
+          keys.add(field.getName());
+        }
+      }
+      catch (IllegalAccessException ignored) {
+        // ignore stuff we aren't allowed to read
+      }
+    }
+
+    return keys;
+  }
+
+  public static Set<String> tryParseMariaDb3xConnectionUri(String connectionUri)
+      throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException,
+             InstantiationException
+  {
+    Class<?> configurationClass = Class.forName("org.mariadb.jdbc.Configuration");
+    Class<?> configurationBuilderClass = Class.forName("org.mariadb.jdbc.Configuration$Builder");
+    Method parseUrl = configurationClass.getMethod("parse", String.class);
+    Method buildMethod = configurationBuilderClass.getMethod("build");
+    Object configuration = parseUrl.invoke(null, connectionUri);
+
+    if (configuration == null) {
+      throw new IAE("Invalid URL format for MariaDB: [%s]", connectionUri);
+    }
+
+    Method nonMappedOptionsGetter = configurationClass.getMethod(MARIADB_EXTRAS);
+    Properties properties = (Properties) nonMappedOptionsGetter.invoke(configuration);
+
+    Field[] fields = configurationClass.getDeclaredFields();
+    Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size() + fields.length);
+    properties.forEach((k, v) -> keys.add((String) k));
+
+    Object defaultConfiguration = buildMethod.invoke(configurationBuilderClass.getConstructor().newInstance());
+    for (Field field : fields) {
+      if (field.getName().equals(MARIADB_EXTRAS)) {
+        continue;
+      }
+      try {
+        final Method fieldGetter = configurationClass.getMethod(field.getName());
+        if (!Objects.equal(fieldGetter.invoke(configuration), fieldGetter.invoke(defaultConfiguration))) {
+          keys.add(field.getName());
+        }
+      }
+      catch (IllegalAccessException | NoSuchMethodException ignored) {
+        // ignore stuff we aren't allowed to read
+      }
+    }
+
+    return keys;
+  }
+
   private ConnectionUriUtils()
   {
   }
diff --git a/core/src/test/java/org/apache/druid/utils/ConnectionUriUtilsTest.java b/core/src/test/java/org/apache/druid/utils/ConnectionUriUtilsTest.java
index f5edd9b..f264e0a 100644
--- a/core/src/test/java/org/apache/druid/utils/ConnectionUriUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/utils/ConnectionUriUtilsTest.java
@@ -20,17 +20,28 @@
 package org.apache.druid.utils;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.IAE;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.Set;
 
 @RunWith(Enclosed.class)
 public class ConnectionUriUtilsTest
 {
   public static class ThrowIfURLHasNotAllowedPropertiesTest
   {
+    private static final String MYSQL_URI = "jdbc:mysql://localhost:3306/test?user=druid&password=diurd&keyonly&otherOptions=wat";
+    private static final String MARIA_URI = "jdbc:mariadb://localhost:3306/test?user=druid&password=diurd&keyonly&otherOptions=wat";
+    private static final String POSTGRES_URI = "jdbc:postgresql://localhost:3306/test?user=druid&password=diurd&keyonly&otherOptions=wat";
+    private static final String UNKNOWN_URI = "jdbc:druid://localhost:8888/query/v2/sql/avatica?user=druid&password=diurd&keyonly&otherOptions=wat";
+
     @Rule
     public ExpectedException expectedException = ExpectedException.none();
 
@@ -86,5 +97,180 @@
           ImmutableSet.of("valid_key1", "valid_key2")
       );
     }
+
+    @Test
+    public void testTryParses()
+    {
+      Set<String> props = ConnectionUriUtils.tryParseJdbcUriParameters(POSTGRES_URI, false);
+      Assert.assertEquals(7, props.size());
+
+      props = ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false);
+      // though this would be 4 if mysql wasn't loaded in classpath because it would fall back to mariadb
+      Assert.assertEquals(9, props.size());
+
+      props = ConnectionUriUtils.tryParseJdbcUriParameters(MARIA_URI, false);
+      Assert.assertEquals(4, props.size());
+    }
+
+    @Test
+    public void testTryParseUnknown()
+    {
+      Set<String> props = ConnectionUriUtils.tryParseJdbcUriParameters(UNKNOWN_URI, true);
+      Assert.assertEquals(0, props.size());
+
+      expectedException.expect(IAE.class);
+      ConnectionUriUtils.tryParseJdbcUriParameters(UNKNOWN_URI, false);
+    }
+
+    @Test
+    public void tryParseInvalidPostgres()
+    {
+      expectedException.expect(IAE.class);
+      ConnectionUriUtils.tryParseJdbcUriParameters("jdbc:postgresql://bad:1234&param", true);
+    }
+
+    @Test
+    public void tryParseInvalidMySql()
+    {
+      expectedException.expect(IAE.class);
+      ConnectionUriUtils.tryParseJdbcUriParameters("jdbc:mysql:/bad", true);
+    }
+
+    @Test
+    public void testMySqlFallbackMySqlMaria2x()
+    {
+      MockedStatic<ConnectionUriUtils> utils = Mockito.mockStatic(ConnectionUriUtils.class);
+      utils.when(() -> ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false)).thenCallRealMethod();
+      utils.when(() -> ConnectionUriUtils.tryParseMySqlConnectionUri(MYSQL_URI)).thenThrow(ClassNotFoundException.class);
+      utils.when(() -> ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MYSQL_URI)).thenCallRealMethod();
+
+      Set<String> props = ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false);
+      // this would be 9 if didn't fall back to mariadb
+      Assert.assertEquals(4, props.size());
+      utils.close();
+    }
+
+    @Test
+    public void testMariaFallbackMaria3x()
+    {
+      MockedStatic<ConnectionUriUtils> utils = Mockito.mockStatic(ConnectionUriUtils.class);
+      utils.when(() -> ConnectionUriUtils.tryParseJdbcUriParameters(MARIA_URI, false)).thenCallRealMethod();
+      utils.when(() -> ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MARIA_URI)).thenThrow(ClassNotFoundException.class);
+      utils.when(() -> ConnectionUriUtils.tryParseMariaDb3xConnectionUri(MARIA_URI)).thenCallRealMethod();
+
+      try {
+        Set<String> props = ConnectionUriUtils.tryParseJdbcUriParameters(MARIA_URI, false);
+        // this would be 4 if didn't fall back to mariadb 3x
+        Assert.assertEquals(8, props.size());
+      }
+      catch (RuntimeException e) {
+
+        Assert.assertTrue(e.getMessage().contains("Failed to find MariaDB driver class"));
+      }
+      utils.close();
+    }
+
+    @Test
+    public void testMySqlFallbackMySqlNoDrivers()
+    {
+      MockedStatic<ConnectionUriUtils> utils = Mockito.mockStatic(ConnectionUriUtils.class);
+      utils.when(() -> ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false)).thenCallRealMethod();
+      utils.when(() -> ConnectionUriUtils.tryParseMySqlConnectionUri(MYSQL_URI)).thenThrow(ClassNotFoundException.class);
+      utils.when(() -> ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MYSQL_URI)).thenThrow(ClassNotFoundException.class);
+
+      try {
+        ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false);
+      }
+      catch (RuntimeException e) {
+        Assert.assertTrue(e.getMessage().contains("Failed to find MySQL driver class"));
+      }
+      utils.close();
+    }
+
+    @Test
+    public void testPosgresDriver() throws Exception
+    {
+      Set<String> props = ConnectionUriUtils.tryParsePostgresConnectionUri(POSTGRES_URI);
+      Assert.assertEquals(7, props.size());
+      // postgres adds a few extra system properties, PGDBNAME, PGHOST, PGPORT
+      Assert.assertTrue(props.contains("user"));
+      Assert.assertTrue(props.contains("password"));
+      Assert.assertTrue(props.contains("otherOptions"));
+      Assert.assertTrue(props.contains("keyonly"));
+    }
+
+    @Test
+    public void testMySQLDriver() throws Exception
+    {
+      Set<String> props = ConnectionUriUtils.tryParseMySqlConnectionUri(MYSQL_URI);
+      // mysql actually misses 'keyonly', but spits out several keys that are not actually uri parameters
+      // DBNAME, HOST, PORT, HOST.1, PORT.1, NUM_HOSTS
+      Assert.assertEquals(9, props.size());
+      Assert.assertTrue(props.contains("user"));
+      Assert.assertTrue(props.contains("password"));
+      Assert.assertTrue(props.contains("otherOptions"));
+      Assert.assertFalse(props.contains("keyonly"));
+    }
+
+    @Test
+    public void testMariaDb2xDriver() throws Throwable
+    {
+      Set<String> props = ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MYSQL_URI);
+      // mariadb doesn't spit out any extras other than what the user specified
+      Assert.assertEquals(4, props.size());
+      Assert.assertTrue(props.contains("user"));
+      Assert.assertTrue(props.contains("password"));
+      Assert.assertTrue(props.contains("otherOptions"));
+      Assert.assertTrue(props.contains("keyonly"));
+      props = ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MARIA_URI);
+      Assert.assertEquals(4, props.size());
+      Assert.assertTrue(props.contains("user"));
+      Assert.assertTrue(props.contains("password"));
+      Assert.assertTrue(props.contains("otherOptions"));
+      Assert.assertTrue(props.contains("keyonly"));
+    }
+
+    @Test(expected = ClassNotFoundException.class)
+    public void testMariaDb3xDriver() throws Exception
+    {
+      // at the time of adding this test, mariadb connector/j 3.x does not actually parse jdbc:mysql uris
+      // so this would throw an IAE.class instead of ClassNotFoundException.class if the connector is swapped out
+      // in maven dependencies
+      ConnectionUriUtils.tryParseMariaDb3xConnectionUri(MYSQL_URI);
+    }
+
+    @Test(expected = ClassNotFoundException.class)
+    public void testMariaDb3xDriverMariaUri() throws Exception
+    {
+      // mariadb 3.x driver cannot be loaded alongside 2.x, so this will fail with class not found
+      // however, if we swap out version in pom then we end up with 8 keys where
+      // "database", "addresses", "codecs", and "initialUrl" are added as extras
+      // we should perhaps consider adding them to built-in allowed lists in the future when this driver is no longer
+      // an alpha release
+      Set<String> props = ConnectionUriUtils.tryParseMariaDb3xConnectionUri(MARIA_URI);
+      Assert.assertEquals(8, props.size());
+      Assert.assertTrue(props.contains("user"));
+      Assert.assertTrue(props.contains("password"));
+      Assert.assertTrue(props.contains("otherOptions"));
+      Assert.assertTrue(props.contains("keyonly"));
+    }
+
+    @Test(expected = IAE.class)
+    public void testPostgresInvalidArgs() throws Exception
+    {
+      ConnectionUriUtils.tryParsePostgresConnectionUri(MYSQL_URI);
+    }
+
+    @Test(expected = IAE.class)
+    public void testMySqlInvalidArgs() throws Exception
+    {
+      ConnectionUriUtils.tryParseMySqlConnectionUri(POSTGRES_URI);
+    }
+
+    @Test(expected = IAE.class)
+    public void testMariaDbInvalidArgs() throws Exception
+    {
+      ConnectionUriUtils.tryParseMariaDb2xConnectionUri(POSTGRES_URI);
+    }
   }
 }
diff --git a/distribution/docker/Dockerfile.mariadb b/distribution/docker/Dockerfile.mariadb
new file mode 100644
index 0000000..68a2ca7
--- /dev/null
+++ b/distribution/docker/Dockerfile.mariadb
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+ARG DRUID_RELEASE
+FROM $DRUID_RELEASE
+
+WORKDIR /opt/druid/extensions/mysql-metadata-storage
+
+ARG MARIA_URL=https://repo1.maven.org/maven2/org/mariadb/jdbc/mariadb-java-client/2.7.3/mariadb-java-client-2.7.3.jar
+ARG MARIA_JAR=mariadb-java-client-2.7.3.jar
+ARG MARIA_SHA=4a2edc05bd882ad19371d2615c2635dccf8d74f0
+
+RUN wget -q ${MARIA_URL} \
+ && echo "${MARIA_SHA}  ${MARIA_JAR}" | sha1sum -c \
+ && ln -s ../extensions/mysql-metadata-storage/${MARIA_JAR} /opt/druid/lib
+
+WORKDIR /opt/druid
diff --git a/extensions-core/lookups-cached-global/pom.xml b/extensions-core/lookups-cached-global/pom.xml
index cdb0c1c..d9dda04 100644
--- a/extensions-core/lookups-cached-global/pom.xml
+++ b/extensions-core/lookups-cached-global/pom.xml
@@ -110,16 +110,6 @@
       <artifactId>jsr311-api</artifactId>
       <scope>provided</scope>
     </dependency>
-    <dependency>
-      <groupId>mysql</groupId>
-      <artifactId>mysql-connector-java</artifactId>
-      <version>${mysql.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.postgresql</groupId>
-      <artifactId>postgresql</artifactId>
-    </dependency>
 
     <!-- Tests -->
     <dependency>
@@ -177,5 +167,17 @@
       <artifactId>powermock-api-easymock</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <version>${postgresql.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
index 932191c..57ba41e 100644
--- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
+++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
@@ -24,23 +24,15 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.mysql.jdbc.NonRegisteringDriver;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.apache.druid.utils.ConnectionUriUtils;
-import org.apache.druid.utils.Throwables;
 import org.joda.time.Period;
-import org.postgresql.Driver;
 
 import javax.annotation.Nullable;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
-import java.sql.SQLException;
 import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
 
 /**
  *
@@ -92,13 +84,8 @@
   /**
    * Check the given URL whether it contains non-allowed properties.
    *
-   * This method should be in sync with the following methods:
-   *
-   * - {@code org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()}
-   * - {@code org.apache.druid.firehose.sql.MySQLFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
-   * - {@code org.apache.druid.firehose.sql.PostgresqlFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
-   *
    * @see JdbcAccessSecurityConfig#getAllowedProperties()
+   * @see ConnectionUriUtils#tryParseJdbcUriParameters(String, boolean)
    */
   private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig)
   {
@@ -109,64 +96,8 @@
       return;
     }
 
-    @Nullable final Properties properties; // null when url has an invalid format
-
-    if (url.startsWith(ConnectionUriUtils.MYSQL_PREFIX)) {
-      try {
-        NonRegisteringDriver driver = new NonRegisteringDriver();
-        properties = driver.parseURL(url, null);
-      }
-      catch (SQLException e) {
-        throw new RuntimeException(e);
-      }
-      catch (Throwable e) {
-        if (Throwables.isThrowable(e, NoClassDefFoundError.class)
-            || Throwables.isThrowable(e, ClassNotFoundException.class)) {
-          if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
-            throw new RuntimeException(
-                "Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
-                e
-            );
-          }
-        }
-        throw new RuntimeException(e);
-      }
-    } else if (url.startsWith(ConnectionUriUtils.POSTGRES_PREFIX)) {
-      try {
-        properties = Driver.parseURL(url, null);
-      }
-      catch (Throwable e) {
-        if (Throwables.isThrowable(e, NoClassDefFoundError.class)
-            || Throwables.isThrowable(e, ClassNotFoundException.class)) {
-          if (e.getMessage().contains("org/postgresql/Driver")) {
-            throw new RuntimeException(
-                "Failed to find PostgreSQL driver class. "
-                + "Please check the PostgreSQL connector version 42.2.14 is in the classpath",
-                e
-            );
-          }
-        }
-        throw new RuntimeException(e);
-      }
-    } else {
-      if (securityConfig.isAllowUnknownJdbcUrlFormat()) {
-        properties = new Properties();
-      } else {
-        // unknown format but it is not allowed
-        throw new IAE("Unknown JDBC connection scheme: %s", url.split(":")[1]);
-      }
-    }
-
-    if (properties == null) {
-      // There is something wrong with the URL format.
-      throw new IAE("Invalid URL format [%s]", url);
-    }
-
-    final Set<String> propertyKeys = Sets.newHashSetWithExpectedSize(properties.size());
-    properties.forEach((k, v) -> propertyKeys.add((String) k));
-
     ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
-        propertyKeys,
+        ConnectionUriUtils.tryParseJdbcUriParameters(url, securityConfig.isAllowUnknownJdbcUrlFormat()),
         securityConfig.getSystemPropertyPrefixes(),
         securityConfig.getAllowedProperties()
     );
diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java
index 03141dd..9bdb76e 100644
--- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java
+++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java
@@ -155,7 +155,7 @@
     public void testWhenInvalidUrlFormat()
     {
       expectedException.expect(IllegalArgumentException.class);
-      expectedException.expectMessage("Invalid URL format [jdbc:mysql:/invalid-url::3006]");
+      expectedException.expectMessage("Invalid URL format for MySQL: [jdbc:mysql:/invalid-url::3006]");
       new JdbcExtractionNamespace(
           new MetadataStorageConnectorConfig()
           {
@@ -305,7 +305,7 @@
     public void testWhenInvalidUrlFormat()
     {
       expectedException.expect(IllegalArgumentException.class);
-      expectedException.expectMessage("Invalid URL format [jdbc:postgresql://invalid-url::3006]");
+      expectedException.expectMessage("Invalid URL format for PostgreSQL: [jdbc:postgresql://invalid-url::3006]");
       new JdbcExtractionNamespace(
           new MetadataStorageConnectorConfig()
           {
diff --git a/extensions-core/lookups-cached-single/pom.xml b/extensions-core/lookups-cached-single/pom.xml
index 8c801d2..a75cb5a 100644
--- a/extensions-core/lookups-cached-single/pom.xml
+++ b/extensions-core/lookups-cached-single/pom.xml
@@ -96,16 +96,6 @@
       <artifactId>guava</artifactId>
       <scope>provided</scope>
     </dependency>
-    <dependency>
-      <groupId>mysql</groupId>
-      <artifactId>mysql-connector-java</artifactId>
-      <version>${mysql.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.postgresql</groupId>
-      <artifactId>postgresql</artifactId>
-    </dependency>
 
     <!-- Tests -->
     <dependency>
@@ -139,6 +129,17 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <version>${postgresql.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcher.java b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcher.java
index ddcb5c6..f8d50db 100644
--- a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcher.java
+++ b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcher.java
@@ -23,10 +23,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.mysql.jdbc.NonRegisteringDriver;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -34,8 +31,6 @@
 import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.apache.druid.server.lookup.DataFetcher;
 import org.apache.druid.utils.ConnectionUriUtils;
-import org.apache.druid.utils.Throwables;
-import org.postgresql.Driver;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.TransactionCallback;
 import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
@@ -47,8 +42,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
 import java.util.function.Supplier;
 
 public class JdbcDataFetcher implements DataFetcher<String, String>
@@ -124,13 +117,8 @@
   /**
    * Check the given URL whether it contains non-allowed properties.
    *
-   * This method should be in sync with the following methods:
-   *
-   * - {@code org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()}
-   * - {@code org.apache.druid.firehose.sql.MySQLFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
-   * - {@code org.apache.druid.firehose.sql.PostgresqlFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
-   *
    * @see JdbcAccessSecurityConfig#getAllowedProperties()
+   * @see ConnectionUriUtils#tryParseJdbcUriParameters(String, boolean) 
    */
   private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig)
   {
@@ -141,64 +129,8 @@
       return;
     }
 
-    @Nullable final Properties properties;
-
-    if (url.startsWith(ConnectionUriUtils.MYSQL_PREFIX)) {
-      try {
-        NonRegisteringDriver driver = new NonRegisteringDriver();
-        properties = driver.parseURL(url, null);
-      }
-      catch (SQLException e) {
-        throw new RuntimeException(e);
-      }
-      catch (Throwable e) {
-        if (Throwables.isThrowable(e, NoClassDefFoundError.class)
-            || Throwables.isThrowable(e, ClassNotFoundException.class)) {
-          if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
-            throw new RuntimeException(
-                "Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
-                e
-            );
-          }
-        }
-        throw new RuntimeException(e);
-      }
-    } else if (url.startsWith(ConnectionUriUtils.POSTGRES_PREFIX)) {
-      try {
-        properties = Driver.parseURL(url, null);
-      }
-      catch (Throwable e) {
-        if (Throwables.isThrowable(e, NoClassDefFoundError.class)
-            || Throwables.isThrowable(e, ClassNotFoundException.class)) {
-          if (e.getMessage().contains("org/postgresql/Driver")) {
-            throw new RuntimeException(
-                "Failed to find PostgreSQL driver class. "
-                + "Please check the PostgreSQL connector version 42.2.14 is in the classpath",
-                e
-            );
-          }
-        }
-        throw new RuntimeException(e);
-      }
-    } else {
-      if (securityConfig.isAllowUnknownJdbcUrlFormat()) {
-        properties = new Properties();
-      } else {
-        // unknown format but it is not allowed
-        throw new IAE("Unknown JDBC connection scheme: %s", url.split(":")[1]);
-      }
-    }
-
-    if (properties == null) {
-      // There is something wrong with the URL format.
-      throw new IAE("Invalid URL format [%s]", url);
-    }
-
-    final Set<String> propertyKeys = Sets.newHashSetWithExpectedSize(properties.size());
-    properties.forEach((k, v) -> propertyKeys.add((String) k));
-
     ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
-        propertyKeys,
+        ConnectionUriUtils.tryParseJdbcUriParameters(url, securityConfig.isAllowUnknownJdbcUrlFormat()),
         securityConfig.getSystemPropertyPrefixes(),
         securityConfig.getAllowedProperties()
     );
diff --git a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherUrlCheckTest.java b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherUrlCheckTest.java
index 1dad8e9..8f7f2e9 100644
--- a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherUrlCheckTest.java
+++ b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherUrlCheckTest.java
@@ -147,7 +147,7 @@
     public void testWhenInvalidUrlFormat()
     {
       expectedException.expect(IllegalArgumentException.class);
-      expectedException.expectMessage("Invalid URL format [jdbc:mysql:/invalid-url::3006]");
+      expectedException.expectMessage("Invalid URL format for MySQL: [jdbc:mysql:/invalid-url::3006]");
       new JdbcDataFetcher(
           new MetadataStorageConnectorConfig()
           {
@@ -289,7 +289,7 @@
     public void testWhenInvalidUrlFormat()
     {
       expectedException.expect(IllegalArgumentException.class);
-      expectedException.expectMessage("Invalid URL format [jdbc:postgresql://invalid-url::3006]");
+      expectedException.expectMessage("Invalid URL format for PostgreSQL: [jdbc:postgresql://invalid-url::3006]");
       new JdbcDataFetcher(
           new MetadataStorageConnectorConfig()
           {
diff --git a/extensions-core/mysql-metadata-storage/pom.xml b/extensions-core/mysql-metadata-storage/pom.xml
index 0f89163..0795507 100644
--- a/extensions-core/mysql-metadata-storage/pom.xml
+++ b/extensions-core/mysql-metadata-storage/pom.xml
@@ -89,10 +89,38 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>nl.jqno.equalsverifier</groupId>
+            <artifactId>equalsverifier</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mariadb.jdbc</groupId>
+            <artifactId>mariadb-java-client</artifactId>
+            <version>${mariadb.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java
index 5b622a6..a88a9ed 100644
--- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java
+++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java
@@ -23,18 +23,15 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Sets;
-import com.mysql.jdbc.NonRegisteringDriver;
 import org.apache.commons.dbcp2.BasicDataSource;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
 import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
-import org.apache.druid.utils.Throwables;
+import org.apache.druid.utils.ConnectionUriUtils;
 import org.skife.jdbi.v2.DBI;
 
-import java.sql.SQLException;
-import java.util.Properties;
+import javax.annotation.Nullable;
+import java.util.Objects;
 import java.util.Set;
 
 
@@ -43,17 +40,27 @@
 {
   private final DBI dbi;
   private final MetadataStorageConnectorConfig connectorConfig;
+  @Nullable
+  private final String driverClassName;
 
   @JsonCreator
   public MySQLFirehoseDatabaseConnector(
       @JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
+      @JsonProperty("driverClassName") @Nullable String driverClassName,
       @JacksonInject JdbcAccessSecurityConfig securityConfig
   )
   {
     this.connectorConfig = connectorConfig;
+    this.driverClassName = driverClassName;
     final BasicDataSource datasource = getDatasource(connectorConfig, securityConfig);
     datasource.setDriverClassLoader(getClass().getClassLoader());
-    datasource.setDriverClassName("com.mysql.jdbc.Driver");
+    if (driverClassName != null) {
+      datasource.setDriverClassName(driverClassName);
+    } else if (connectorConfig.getConnectURI().startsWith(ConnectionUriUtils.MARIADB_PREFIX)) {
+      datasource.setDriverClassName(ConnectionUriUtils.MARIADB_DRIVER);
+    } else {
+      datasource.setDriverClassName(ConnectionUriUtils.MYSQL_DRIVER);
+    }
     this.dbi = new DBI(datasource);
   }
 
@@ -63,6 +70,13 @@
     return connectorConfig;
   }
 
+  @Nullable
+  @JsonProperty
+  public String getDriverClassName()
+  {
+    return driverClassName;
+  }
+
   @Override
   public DBI getDBI()
   {
@@ -70,37 +84,30 @@
   }
 
   @Override
-  public Set<String> findPropertyKeysFromConnectURL(String connectUrl)
+  public Set<String> findPropertyKeysFromConnectURL(String connectUrl, boolean allowUnknown)
   {
-    // This method should be in sync with
-    // - org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()
-    // - org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()
-    Properties properties;
-    try {
-      NonRegisteringDriver driver = new NonRegisteringDriver();
-      properties = driver.parseURL(connectUrl, null);
-    }
-    catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
-    catch (Throwable e) {
-      if (Throwables.isThrowable(e, NoClassDefFoundError.class)
-          || Throwables.isThrowable(e, ClassNotFoundException.class)) {
-        if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
-          throw new RuntimeException(
-              "Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
-              e
-          );
-        }
-      }
-      throw new RuntimeException(e);
-    }
+    return ConnectionUriUtils.tryParseJdbcUriParameters(connectUrl, allowUnknown);
+  }
 
-    if (properties == null) {
-      throw new IAE("Invalid URL format for MySQL: [%s]", connectUrl);
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
     }
-    Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
-    properties.forEach((k, v) -> keys.add((String) k));
-    return keys;
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    MySQLFirehoseDatabaseConnector that = (MySQLFirehoseDatabaseConnector) o;
+    return connectorConfig.equals(that.connectorConfig) && Objects.equals(
+        driverClassName,
+        that.driverClassName
+    );
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(connectorConfig, driverClassName);
   }
 }
diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java
index 715a7f2..df91208 100644
--- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java
+++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java
@@ -33,7 +33,6 @@
 import org.apache.druid.metadata.SQLMetadataConnector;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.skife.jdbi.v2.util.StringMapper;
 
 import java.io.File;
@@ -46,7 +45,6 @@
   private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT";
   private static final String QUOTE_STRING = "`";
   private static final String COLLATION = "CHARACTER SET utf8mb4 COLLATE utf8mb4_bin";
-  private static final String MYSQL_JDBC_DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver";
 
   private final DBI dbi;
 
@@ -54,20 +52,20 @@
   public MySQLConnector(
       Supplier<MetadataStorageConnectorConfig> config,
       Supplier<MetadataStorageTablesConfig> dbTables,
-      MySQLConnectorConfig connectorConfig
+      MySQLConnectorSslConfig connectorSslConfig,
+      MySQLConnectorDriverConfig driverConfig
   )
   {
     super(config, dbTables);
-
     try {
-      Class.forName(MYSQL_JDBC_DRIVER_CLASS_NAME, false, getClass().getClassLoader());
+      Class.forName(driverConfig.getDriverClassName(), false, getClass().getClassLoader());
     }
     catch (ClassNotFoundException e) {
       throw new ISE(e, "Could not find %s on the classpath. The MySQL Connector library is not included in the Druid "
                    + "distribution but is required to use MySQL. Please download a compatible library (for example "
                    + "'mysql-connector-java-5.1.48.jar') and place it under 'extensions/mysql-metadata-storage/'. See "
                    + "https://druid.apache.org/downloads for more details.",
-                MYSQL_JDBC_DRIVER_CLASS_NAME
+                    driverConfig.getDriverClassName()
       );
     }
 
@@ -75,67 +73,67 @@
     // MySQL driver is classloader isolated as part of the extension
     // so we need to help JDBC find the driver
     datasource.setDriverClassLoader(getClass().getClassLoader());
-    datasource.setDriverClassName(MYSQL_JDBC_DRIVER_CLASS_NAME);
-    datasource.addConnectionProperty("useSSL", String.valueOf(connectorConfig.isUseSSL()));
-    if (connectorConfig.isUseSSL()) {
+    datasource.setDriverClassName(driverConfig.getDriverClassName());
+    datasource.addConnectionProperty("useSSL", String.valueOf(connectorSslConfig.isUseSSL()));
+    if (connectorSslConfig.isUseSSL()) {
       log.info("SSL is enabled on this MySQL connection. ");
 
       datasource.addConnectionProperty(
           "verifyServerCertificate",
-          String.valueOf(connectorConfig.isVerifyServerCertificate())
+          String.valueOf(connectorSslConfig.isVerifyServerCertificate())
       );
-      if (connectorConfig.isVerifyServerCertificate()) {
+      if (connectorSslConfig.isVerifyServerCertificate()) {
         log.info("Server certificate verification is enabled. ");
 
-        if (connectorConfig.getTrustCertificateKeyStoreUrl() != null) {
+        if (connectorSslConfig.getTrustCertificateKeyStoreUrl() != null) {
           datasource.addConnectionProperty(
               "trustCertificateKeyStoreUrl",
-              new File(connectorConfig.getTrustCertificateKeyStoreUrl()).toURI().toString()
+              new File(connectorSslConfig.getTrustCertificateKeyStoreUrl()).toURI().toString()
           );
         }
-        if (connectorConfig.getTrustCertificateKeyStoreType() != null) {
+        if (connectorSslConfig.getTrustCertificateKeyStoreType() != null) {
           datasource.addConnectionProperty(
               "trustCertificateKeyStoreType",
-              connectorConfig.getTrustCertificateKeyStoreType()
+              connectorSslConfig.getTrustCertificateKeyStoreType()
           );
         }
-        if (connectorConfig.getTrustCertificateKeyStorePassword() == null) {
+        if (connectorSslConfig.getTrustCertificateKeyStorePassword() == null) {
           log.warn(
               "Trust store password is empty. Ensure that the trust store has been configured with an empty password.");
         } else {
           datasource.addConnectionProperty(
               "trustCertificateKeyStorePassword",
-              connectorConfig.getTrustCertificateKeyStorePassword()
+              connectorSslConfig.getTrustCertificateKeyStorePassword()
           );
         }
       }
-      if (connectorConfig.getClientCertificateKeyStoreUrl() != null) {
+      if (connectorSslConfig.getClientCertificateKeyStoreUrl() != null) {
         datasource.addConnectionProperty(
             "clientCertificateKeyStoreUrl",
-            new File(connectorConfig.getClientCertificateKeyStoreUrl()).toURI().toString()
+            new File(connectorSslConfig.getClientCertificateKeyStoreUrl()).toURI().toString()
         );
       }
-      if (connectorConfig.getClientCertificateKeyStoreType() != null) {
+      if (connectorSslConfig.getClientCertificateKeyStoreType() != null) {
         datasource.addConnectionProperty(
             "clientCertificateKeyStoreType",
-            connectorConfig.getClientCertificateKeyStoreType()
+            connectorSslConfig.getClientCertificateKeyStoreType()
         );
       }
-      if (connectorConfig.getClientCertificateKeyStorePassword() != null) {
+      if (connectorSslConfig.getClientCertificateKeyStorePassword() != null) {
         datasource.addConnectionProperty(
             "clientCertificateKeyStorePassword",
-            connectorConfig.getClientCertificateKeyStorePassword()
+            connectorSslConfig.getClientCertificateKeyStorePassword()
         );
       }
       Joiner joiner = Joiner.on(",").skipNulls();
-      if (connectorConfig.getEnabledSSLCipherSuites() != null) {
+      if (connectorSslConfig.getEnabledSSLCipherSuites() != null) {
         datasource.addConnectionProperty(
             "enabledSSLCipherSuites",
-            joiner.join(connectorConfig.getEnabledSSLCipherSuites())
+            joiner.join(connectorSslConfig.getEnabledSSLCipherSuites())
         );
       }
-      if (connectorConfig.getEnabledTLSProtocols() != null) {
-        datasource.addConnectionProperty("enabledTLSProtocols", joiner.join(connectorConfig.getEnabledTLSProtocols()));
+      if (connectorSslConfig.getEnabledTLSProtocols() != null) {
+        datasource.addConnectionProperty("enabledTLSProtocols", joiner.join(connectorSslConfig.getEnabledTLSProtocols()));
       }
     }
 
@@ -220,24 +218,19 @@
   )
   {
     return getDBI().withHandle(
-        new HandleCallback<Void>()
-        {
-          @Override
-          public Void withHandle(Handle handle)
-          {
-            handle.createStatement(
-                StringUtils.format(
-                    "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
-                    tableName,
-                    keyColumn,
-                    valueColumn
-                )
-            )
-                  .bind("key", key)
-                  .bind("value", value)
-                  .execute();
-            return null;
-          }
+        handle -> {
+          handle.createStatement(
+              StringUtils.format(
+                  "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
+                  tableName,
+                  keyColumn,
+                  valueColumn
+              )
+          )
+                .bind("key", key)
+                .bind("value", value)
+                .execute();
+          return null;
         }
     );
   }
diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorDriverConfig.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorDriverConfig.java
new file mode 100644
index 0000000..0f6fa03
--- /dev/null
+++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorDriverConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.storage.mysql;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class MySQLConnectorDriverConfig
+{
+  @JsonProperty
+  private String driverClassName = "com.mysql.jdbc.Driver";
+
+  @JsonProperty
+  public String getDriverClassName()
+  {
+    return driverClassName;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    MySQLConnectorDriverConfig that = (MySQLConnectorDriverConfig) o;
+    return driverClassName.equals(that.driverClassName);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(driverClassName);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "MySQLConnectorDriverConfig{" +
+           "driverClassName='" + driverClassName + '\'' +
+           '}';
+  }
+}
diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorConfig.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorSslConfig.java
similarity index 97%
rename from extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorConfig.java
rename to extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorSslConfig.java
index 7b6bf9a..1ead8b5 100644
--- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorConfig.java
+++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorSslConfig.java
@@ -24,7 +24,7 @@
 
 import java.util.List;
 
-public class MySQLConnectorConfig
+public class MySQLConnectorSslConfig
 {
   @JsonProperty
   private boolean useSSL = false;
@@ -109,7 +109,7 @@
   @Override
   public String toString()
   {
-    return "MySQLConnectorConfig{" +
+    return "MySQLConnectorSslConfig{" +
            "useSSL='" + useSSL + '\'' +
            ", clientCertificateKeyStoreUrl='" + clientCertificateKeyStoreUrl + '\'' +
            ", clientCertificateKeyStoreType='" + clientCertificateKeyStoreType + '\'' +
diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModule.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModule.java
index 9a37888..2cb8cc7 100644
--- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModule.java
+++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModule.java
@@ -65,7 +65,8 @@
   {
     super.configure(binder);
 
-    JsonConfigProvider.bind(binder, "druid.metadata.mysql.ssl", MySQLConnectorConfig.class);
+    JsonConfigProvider.bind(binder, "druid.metadata.mysql.ssl", MySQLConnectorSslConfig.class);
+    JsonConfigProvider.bind(binder, "druid.metadata.mysql.driver", MySQLConnectorDriverConfig.class);
 
     PolyBind
         .optionBinder(binder, Key.get(MetadataStorageProvider.class))
diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java
index 150f3ca..b46cb26 100644
--- a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java
+++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java
@@ -19,10 +19,17 @@
 
 package org.apache.druid.firehose.sql;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
+import org.apache.druid.metadata.storage.mysql.MySQLMetadataStorageModule;
 import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -31,10 +38,57 @@
 
 public class MySQLFirehoseDatabaseConnectorTest
 {
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+  private static final JdbcAccessSecurityConfig INJECTED_CONF = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
+
+  static {
+    MAPPER.registerModules(new MySQLMetadataStorageModule().getJacksonModules());
+    MAPPER.setInjectableValues(new InjectableValues.Std().addValue(JdbcAccessSecurityConfig.class, INJECTED_CONF));
+  }
+
   @Rule
   public final ExpectedException expectedException = ExpectedException.none();
 
   @Test
+  public void testSerde() throws JsonProcessingException
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mysql://localhost:3306/test";
+      }
+    };
+    MySQLFirehoseDatabaseConnector connector = new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        null,
+        INJECTED_CONF
+    );
+    MySQLFirehoseDatabaseConnector andBack = MAPPER.readValue(MAPPER.writeValueAsString(connector), MySQLFirehoseDatabaseConnector.class);
+    Assert.assertEquals(connector, andBack);
+
+    // test again with classname
+    connector = new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        "some.class.name.Driver",
+        INJECTED_CONF
+    );
+    andBack = MAPPER.readValue(MAPPER.writeValueAsString(connector), MySQLFirehoseDatabaseConnector.class);
+    Assert.assertEquals(connector, andBack);
+  }
+
+  @Test
+  public void testEqualsAndHashcode()
+  {
+    EqualsVerifier.forClass(MySQLFirehoseDatabaseConnector.class)
+                  .usingGetClass()
+                  .withNonnullFields("connectorConfig")
+                  .withIgnoredFields("dbi")
+                  .verify();
+  }
+
+  @Test
   public void testSuccessWhenNoPropertyInUriAndNoAllowlist()
   {
     MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
@@ -50,6 +104,7 @@
 
     new MySQLFirehoseDatabaseConnector(
         connectorConfig,
+        null,
         securityConfig
     );
   }
@@ -70,6 +125,7 @@
 
     new MySQLFirehoseDatabaseConnector(
         connectorConfig,
+        null,
         securityConfig
     );
   }
@@ -93,6 +149,7 @@
 
     new MySQLFirehoseDatabaseConnector(
         connectorConfig,
+        null,
         securityConfig
     );
   }
@@ -115,11 +172,37 @@
 
     new MySQLFirehoseDatabaseConnector(
         connectorConfig,
+        null,
         securityConfig
     );
   }
 
   @Test
+  public void testSuccessOnlyValidPropertyMariaDb()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mariadb://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(
+        ImmutableSet.of("user", "password", "keyonly", "etc")
+    );
+
+    new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        null,
+        securityConfig
+    );
+  }
+
+
+
+  @Test
   public void testFailOnlyInvalidProperty()
   {
     MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
@@ -138,6 +221,7 @@
 
     new MySQLFirehoseDatabaseConnector(
         connectorConfig,
+        null,
         securityConfig
     );
   }
@@ -161,6 +245,31 @@
 
     new MySQLFirehoseDatabaseConnector(
         connectorConfig,
+        null,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testFailValidAndInvalidPropertyMariadb()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mariadb://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user", "nonenone"));
+
+    expectedException.expectMessage("The property [password] is not in the allowed list");
+    expectedException.expect(IllegalArgumentException.class);
+
+    new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        null,
         securityConfig
     );
   }
@@ -194,6 +303,7 @@
 
     new MySQLFirehoseDatabaseConnector(
         connectorConfig,
+        null,
         securityConfig
     );
   }
@@ -211,10 +321,11 @@
       }
     };
 
-    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expect(RuntimeException.class);
     expectedException.expectMessage(StringUtils.format("Invalid URL format for MySQL: [%s]", url));
     new MySQLFirehoseDatabaseConnector(
         connectorConfig,
+        null,
         new JdbcAccessSecurityConfig()
     );
   }
diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorDriverConfigTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorDriverConfigTest.java
new file mode 100644
index 0000000..c1d2e4e
--- /dev/null
+++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorDriverConfigTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.storage.mysql;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class MySQLConnectorDriverConfigTest
+{
+  @Test
+  public void testEqualsAndHashcode()
+  {
+    EqualsVerifier.simple()
+                  .forClass(MySQLConnectorDriverConfig.class)
+                  .usingGetClass()
+                  .withNonnullFields("driverClassName")
+                  .verify();
+  }
+}
diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java
new file mode 100644
index 0000000..4c39506
--- /dev/null
+++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.storage.mysql;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.JsonConfigurator;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.MetadataConfigModule;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class MySQLMetadataStorageModuleTest
+{
+  @Test
+  public void testSslConfig()
+  {
+    final Injector injector = createInjector();
+    final String propertyPrefix = "druid.metadata.mysql.ssl";
+    final JsonConfigProvider<MySQLConnectorSslConfig> provider = JsonConfigProvider.of(
+        propertyPrefix,
+        MySQLConnectorSslConfig.class
+    );
+    final Properties properties = new Properties();
+    properties.setProperty(propertyPrefix + ".useSSL", "true");
+    properties.setProperty(propertyPrefix + ".trustCertificateKeyStoreUrl", "url");
+    properties.setProperty(propertyPrefix + ".trustCertificateKeyStoreType", "type");
+    properties.setProperty(propertyPrefix + ".trustCertificateKeyStorePassword", "secret");
+    properties.setProperty(propertyPrefix + ".clientCertificateKeyStoreUrl", "url");
+    properties.setProperty(propertyPrefix + ".clientCertificateKeyStoreType", "type");
+    properties.setProperty(propertyPrefix + ".clientCertificateKeyStorePassword", "secret");
+    properties.setProperty(propertyPrefix + ".enabledSSLCipherSuites", "[\"some\", \"ciphers\"]");
+    properties.setProperty(propertyPrefix + ".enabledTLSProtocols", "[\"some\", \"protocols\"]");
+    properties.setProperty(propertyPrefix + ".verifyServerCertificate", "true");
+    provider.inject(properties, injector.getInstance(JsonConfigurator.class));
+    final MySQLConnectorSslConfig config = provider.get().get();
+    Assert.assertTrue(config.isUseSSL());
+    Assert.assertEquals("url", config.getTrustCertificateKeyStoreUrl());
+    Assert.assertEquals("type", config.getTrustCertificateKeyStoreType());
+    Assert.assertEquals("secret", config.getTrustCertificateKeyStorePassword());
+    Assert.assertEquals("url", config.getClientCertificateKeyStoreUrl());
+    Assert.assertEquals("type", config.getClientCertificateKeyStoreType());
+    Assert.assertEquals("secret", config.getClientCertificateKeyStorePassword());
+    Assert.assertEquals(ImmutableList.of("some", "ciphers"), config.getEnabledSSLCipherSuites());
+    Assert.assertEquals(ImmutableList.of("some", "protocols"), config.getEnabledTLSProtocols());
+    Assert.assertTrue(config.isVerifyServerCertificate());
+  }
+
+  @Test
+  public void testDriverConfigDefault()
+  {
+    final Injector injector = createInjector();
+    final String propertyPrefix = "druid.metadata.mysql.driver";
+    final JsonConfigProvider<MySQLConnectorDriverConfig> provider = JsonConfigProvider.of(
+        propertyPrefix,
+        MySQLConnectorDriverConfig.class
+    );
+    final Properties properties = new Properties();
+    provider.inject(properties, injector.getInstance(JsonConfigurator.class));
+    final MySQLConnectorDriverConfig config = provider.get().get();
+    Assert.assertEquals(new MySQLConnectorDriverConfig().getDriverClassName(), config.getDriverClassName());
+  }
+
+  @Test
+  public void testDriverConfig()
+  {
+    final Injector injector = createInjector();
+    final String propertyPrefix = "druid.metadata.mysql.driver";
+    final JsonConfigProvider<MySQLConnectorDriverConfig> provider = JsonConfigProvider.of(
+        propertyPrefix,
+        MySQLConnectorDriverConfig.class
+    );
+    final Properties properties = new Properties();
+    properties.setProperty(propertyPrefix + ".driverClassName", "some.driver.classname");
+    provider.inject(properties, injector.getInstance(JsonConfigurator.class));
+    final MySQLConnectorDriverConfig config = provider.get().get();
+    Assert.assertEquals("some.driver.classname", config.getDriverClassName());
+  }
+
+  private Injector createInjector()
+  {
+    MySQLMetadataStorageModule module = new MySQLMetadataStorageModule();
+    Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
+        ImmutableList.of(
+            new MetadataConfigModule(),
+            new LifecycleModule(),
+            module,
+            new Module()
+            {
+              @Override
+              public void configure(Binder binder)
+              {
+                module.createBindingChoices(binder, "mysql");
+              }
+
+              @Provides
+              public ServiceEmitter getEmitter()
+              {
+                return new ServiceEmitter("test", "localhost", new NoopEmitter());
+              }
+            }
+        )
+    );
+    ObjectMapper mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
+    mapper.registerModules(module.getJacksonModules());
+    return injector;
+  }
+}
diff --git a/extensions-core/postgresql-metadata-storage/pom.xml b/extensions-core/postgresql-metadata-storage/pom.xml
index 32a2819..b5ddcce 100644
--- a/extensions-core/postgresql-metadata-storage/pom.xml
+++ b/extensions-core/postgresql-metadata-storage/pom.xml
@@ -86,12 +86,34 @@
             <artifactId>commons-dbcp2</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>nl.jqno.equalsverifier</groupId>
+            <artifactId>equalsverifier</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${parent.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
index bcf3e58..d9880d7 100644
--- a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
+++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
@@ -23,16 +23,14 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Sets;
 import org.apache.commons.dbcp2.BasicDataSource;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
 import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
-import org.postgresql.Driver;
+import org.apache.druid.utils.ConnectionUriUtils;
 import org.skife.jdbi.v2.DBI;
 
-import java.util.Properties;
+import java.util.Objects;
 import java.util.Set;
 
 
@@ -68,19 +66,35 @@
   }
 
   @Override
-  public Set<String> findPropertyKeysFromConnectURL(String connectUri)
+  public Set<String> findPropertyKeysFromConnectURL(String connectUri, boolean allowUnknown)
   {
-    // This method should be in sync with
-    // - org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()
-    // - org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()
+    return ConnectionUriUtils.tryParseJdbcUriParameters(connectUri, allowUnknown);
+  }
 
-    // Postgresql JDBC driver is embedded and thus must be loaded.
-    Properties properties = Driver.parseURL(connectUri, null);
-    if (properties == null) {
-      throw new IAE("Invalid URL format for PostgreSQL: [%s]", connectUri);
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
     }
-    Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
-    properties.forEach((k, v) -> keys.add((String) k));
-    return keys;
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PostgresqlFirehoseDatabaseConnector that = (PostgresqlFirehoseDatabaseConnector) o;
+    return connectorConfig.equals(that.connectorConfig);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(connectorConfig);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "PostgresqlFirehoseDatabaseConnector{" +
+           "connectorConfig=" + connectorConfig +
+           '}';
   }
 }
diff --git a/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java
index b1a50bb..9b93f01 100644
--- a/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java
+++ b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java
@@ -19,9 +19,16 @@
 
 package org.apache.druid.firehose;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
+import org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
 import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -30,9 +37,50 @@
 
 public class PostgresqlFirehoseDatabaseConnectorTest
 {
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+  private static final JdbcAccessSecurityConfig INJECTED_CONF = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
+
+  static {
+    MAPPER.registerModules(new PostgreSQLMetadataStorageModule().getJacksonModules());
+    MAPPER.setInjectableValues(new InjectableValues.Std().addValue(JdbcAccessSecurityConfig.class, INJECTED_CONF));
+  }
+
   @Rule
   public final ExpectedException expectedException = ExpectedException.none();
 
+
+  @Test
+  public void testSerde() throws JsonProcessingException
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:postgresql://localhost:3306/test";
+      }
+    };
+    PostgresqlFirehoseDatabaseConnector connector = new PostgresqlFirehoseDatabaseConnector(
+        connectorConfig,
+        INJECTED_CONF
+    );
+    PostgresqlFirehoseDatabaseConnector andBack = MAPPER.readValue(
+        MAPPER.writeValueAsString(connector),
+        PostgresqlFirehoseDatabaseConnector.class
+    );
+    Assert.assertEquals(connector, andBack);
+  }
+
+  @Test
+  public void testEqualsAndHashcode()
+  {
+    EqualsVerifier.forClass(PostgresqlFirehoseDatabaseConnector.class)
+                  .usingGetClass()
+                  .withNonnullFields("connectorConfig")
+                  .withIgnoredFields("dbi")
+                  .verify();
+  }
+
   @Test
   public void testSuccessWhenNoPropertyInUriAndNoAllowlist()
   {
diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile
index 8076240..845bf21 100644
--- a/integration-tests/docker/Dockerfile
+++ b/integration-tests/docker/Dockerfile
@@ -29,6 +29,8 @@
 
 FROM druidbase
 ARG MYSQL_VERSION
+ARG MARIA_VERSION
+ARG MYSQL_DRIVER_CLASSNAME=com.mysql.jdbc.Driver
 ARG CONFLUENT_VERSION
 
 # Verify Java version
@@ -47,16 +49,23 @@
 
 # Download the MySQL Java connector
 # target path must match the exact path referenced in environment-configs/common
-RUN wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \
-    -O /usr/local/druid/lib/mysql-connector-java.jar
+# alternatively: Download the MariaDB Java connector, and pretend it is the mysql connector
+RUN if [ "$MYSQL_DRIVER_CLASSNAME" = "com.mysql.jdbc.Driver" ] ; \
+    then wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \
+    -O /usr/local/druid/lib/mysql-connector-java.jar; \
+    elif [ "$MYSQL_DRIVER_CLASSNAME" = "org.mariadb.jdbc.Driver" ] ; \
+    then wget -q "https://repo1.maven.org/maven2/org/mariadb/jdbc/mariadb-java-client/$MARIA_VERSION/mariadb-java-client-$MARIA_VERSION.jar" \
+    -O /usr/local/druid/lib/mysql-connector-java.jar; \
+    fi
 
+# download kafka protobuf provider
 RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \
     -O /usr/local/druid/lib/kafka-protobuf-provider.jar
 
 # Add sample data
 # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
 RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
-      && java -cp "/usr/local/druid/lib/*" -Ddruid.metadata.storage.type=mysql org.apache.druid.cli.Main tools metadata-init --connectURI="jdbc:mysql://localhost:3306/druid" --user=druid --password=diurd \
+      && java -cp "/usr/local/druid/lib/*" -Ddruid.metadata.storage.type=mysql -Ddruid.metadata.mysql.driver.driverClassName=$MYSQL_DRIVER_CLASSNAME org.apache.druid.cli.Main tools metadata-init --connectURI="jdbc:mysql://localhost:3306/druid" --user=druid --password=diurd \
       && /etc/init.d/mysql stop
 ADD test-data /test-data
 
@@ -105,8 +114,9 @@
 EXPOSE 8300 8301 8302 8303 8304 8305
 EXPOSE 9092 9093
 
+ENV MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME
 WORKDIR /var/lib/druid
-ENTRYPOINT  /tls/generate-server-certs-and-keystores.sh \
+ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \
             && . /druid.sh \
             # Create druid service config files with all the config variables
             && setupConfig \
diff --git a/integration-tests/docker/druid.sh b/integration-tests/docker/druid.sh
index baa0e97..adece18 100755
--- a/integration-tests/docker/druid.sh
+++ b/integration-tests/docker/druid.sh
@@ -101,6 +101,9 @@
     export AWS_REGION=us-east-1
   fi
 
+  if [ "$MYSQL_DRIVER_CLASSNAME" != "com.mysql.jdbc.Driver" ] ; then
+    setKey $DRUID_SERVICE druid.metadata.mysql.driver.driverClassName $MYSQL_DRIVER_CLASSNAME
+  fi
 
   # The SqlInputSource tests in the "input-source" test group require data to be setup in MySQL before running the tests.
   if [ "$DRUID_INTEGRATION_TEST_GROUP" = "input-source" ] ; then
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 99e0ffd..188e9e4 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -491,6 +491,7 @@
                                         <DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>
                                         <DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER>
                                         <MYSQL_VERSION>${mysql.version}</MYSQL_VERSION>
+                                        <MARIA_VERSION>2.7.3</MARIA_VERSION>
                                         <CONFLUENT_VERSION>5.5.1</CONFLUENT_VERSION>
                                         <KAFKA_VERSION>${apache.kafka.version}</KAFKA_VERSION>
                                         <ZK_VERSION>${zookeeper.version}</ZK_VERSION>
diff --git a/integration-tests/script/docker_build_containers.sh b/integration-tests/script/docker_build_containers.sh
index 8e93d4f..7a18d77 100755
--- a/integration-tests/script/docker_build_containers.sh
+++ b/integration-tests/script/docker_build_containers.sh
@@ -22,21 +22,21 @@
 if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ]
 then
   echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version"
-  docker build -t druid/cluster --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
+  docker build -t druid/cluster --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME $SHARED_DIR/docker
 else
   echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}"
   case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in
   8)
     echo "Build druid-cluster with Java 8"
-    docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+    docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
     ;;
   11)
     echo "Build druid-cluster with Java 11"
-    docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+    docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
     ;;
   15)
     echo "Build druid-cluster with Java 15"
-    docker build -t druid/cluster --build-arg JDK_VERSION=15-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+    docker build -t druid/cluster --build-arg JDK_VERSION=15-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg USE_MARIA --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
     ;;
   *)
     echo "Invalid JVM Runtime given. Stopping"
diff --git a/pom.xml b/pom.xml
index fd31c0f..47d6e2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,7 @@
         <codehaus.jackson.version>1.9.13</codehaus.jackson.version>
         <log4j.version>2.8.2</log4j.version>
         <mysql.version>5.1.48</mysql.version>
+        <mariadb.version>2.7.3</mariadb.version>
         <netty3.version>3.10.6.Final</netty3.version>
         <netty4.version>4.1.63.Final</netty4.version>
         <postgresql.version>42.2.14</postgresql.version>
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java
index 5a14c02..11d4673 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java
@@ -95,7 +95,7 @@
       // You don't want to do anything with properties.
       return;
     }
-    final Set<String> propertyKeyFromConnectURL = findPropertyKeysFromConnectURL(urlString);
+    final Set<String> propertyKeyFromConnectURL = findPropertyKeysFromConnectURL(urlString, securityConfig.isAllowUnknownJdbcUrlFormat());
     ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
         propertyKeyFromConnectURL,
         securityConfig.getSystemPropertyPrefixes(),
@@ -113,5 +113,5 @@
   /**
    * Extract property keys from the given JDBC URL.
    */
-  public abstract Set<String> findPropertyKeysFromConnectURL(String connectUri);
+  public abstract Set<String> findPropertyKeysFromConnectURL(String connectUri, boolean allowUnknown);
 }
diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
index ee56039..7edf954 100644
--- a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
@@ -298,7 +298,7 @@
     }
 
     @Override
-    public Set<String> findPropertyKeysFromConnectURL(String connectUri)
+    public Set<String> findPropertyKeysFromConnectURL(String connectUri, boolean allowUnknown)
     {
       return ImmutableSet.of("user", "create");
     }
diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java
index c21b5cc..462df23 100644
--- a/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java
+++ b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java
@@ -82,7 +82,7 @@
     }
 
     @Override
-    public Set<String> findPropertyKeysFromConnectURL(String connectUri)
+    public Set<String> findPropertyKeysFromConnectURL(String connectUri, boolean allowUnknown)
     {
       return ImmutableSet.of("user", "create");
     }