HBASE-28436 Use connection url to specify the connection registry information (#5770)

Signed-off-by: Istvan Toth <stoty@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Reviewed-by: Bryan Beaudreault <bbeaudreault@apache.org>
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index d4ee67f..ea99023 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -171,6 +171,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-library</artifactId>
       <scope>test</scope>
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index 716fb48..f4ef449 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.Map;
@@ -89,41 +90,55 @@
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection();
-   * Table table = connection.getTable(TableName.valueOf("mytable"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * </pre>
    *
    * @return Connection object for <code>conf</code>
    */
   public static Connection createConnection() throws IOException {
-    Configuration conf = HBaseConfiguration.create();
-    return createConnection(conf, null, AuthUtil.loginClient(conf));
+    return createConnection(HBaseConfiguration.create());
+  }
+
+  /**
+   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
+   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
+   * connection share zookeeper connection, meta cache, and connections to region servers and
+   * masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri) throws IOException {
+    return createConnection(connectionUri, HBaseConfiguration.create());
   }
 
   /**
    * Create a new Connection instance using the passed <code>conf</code> instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("mytable"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * </pre>
    *
@@ -137,20 +152,41 @@
   /**
    * Create a new Connection instance using the passed <code>conf</code> instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("mytable"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration conf)
+    throws IOException {
+    return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf));
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
    * }
    * </pre>
    *
@@ -166,20 +202,42 @@
   /**
    * Create a new Connection instance using the passed <code>conf</code> instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("table1"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @param pool          the thread pool to use for batch operations
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration conf,
+    ExecutorService pool) throws IOException {
+    return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf));
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
    * }
    * </pre>
    *
@@ -194,20 +252,42 @@
   /**
    * Create a new Connection instance using the passed <code>conf</code> instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("table1"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @param user          the user the connection is for
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration conf, User user)
+    throws IOException {
+    return createConnection(connectionUri, conf, null, user);
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
    * }
    * </pre>
    *
@@ -224,20 +304,43 @@
   /**
    * Create a new Connection instance using the passed <code>conf</code> instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("table1"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @param user          the user the connection is for
+   * @param pool          the thread pool to use for batch operations
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration conf,
+    ExecutorService pool, User user) throws IOException {
+    return createConnection(connectionUri, conf, pool, user, Collections.emptyMap());
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
    * }
    * </pre>
    *
@@ -249,6 +352,37 @@
    */
   public static Connection createConnection(Configuration conf, ExecutorService pool,
     final User user, Map<String, byte[]> connectionAttributes) throws IOException {
+    return createConnection(null, conf, pool, user, connectionAttributes);
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+   * created from returned connection share zookeeper connection(if used), meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * Connection connection = ConnectionFactory.createConnection(conf);
+   * Table table = connection.getTable(TableName.valueOf("table1"));
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * </pre>
+   *
+   * @param connectionUri        the connection uri for the hbase cluster
+   * @param conf                 configuration
+   * @param user                 the user the connection is for
+   * @param pool                 the thread pool to use for batch operations
+   * @param connectionAttributes attributes to be sent along to server during connection establish
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration conf,
+    ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes)
+    throws IOException {
     Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
       ConnectionOverAsyncConnection.class, Connection.class);
     if (clazz != ConnectionOverAsyncConnection.class) {
@@ -263,7 +397,7 @@
         throw new IOException(e);
       }
     } else {
-      return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes))
+      return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes))
         .toConnection();
     }
   }
@@ -278,6 +412,16 @@
   }
 
   /**
+   * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration.
+   * @param connectionUri the connection uri for the hbase cluster
+   * @see #createAsyncConnection(URI, Configuration)
+   * @return AsyncConnection object wrapped by CompletableFuture
+   */
+  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri) {
+    return createAsyncConnection(connectionUri, HBaseConfiguration.create());
+  }
+
+  /**
    * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
    * User object created by {@link UserProvider}. The given {@code conf} will also be used to
    * initialize the {@link UserProvider}.
@@ -287,6 +431,21 @@
    * @see UserProvider
    */
   public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
+    return createAsyncConnection(null, conf);
+  }
+
+  /**
+   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri},
+   * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will
+   * also be used to initialize the {@link UserProvider}.
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @return AsyncConnection object wrapped by CompletableFuture
+   * @see #createAsyncConnection(Configuration, User)
+   * @see UserProvider
+   */
+  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
+    Configuration conf) {
     User user;
     try {
       user = AuthUtil.loginClient(conf);
@@ -295,7 +454,7 @@
       future.completeExceptionally(e);
       return future;
     }
-    return createAsyncConnection(conf, user);
+    return createAsyncConnection(connectionUri, conf, user);
   }
 
   /**
@@ -315,7 +474,28 @@
    */
   public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
     final User user) {
-    return createAsyncConnection(conf, user, null);
+    return createAsyncConnection(null, conf, user);
+  }
+
+  /**
+   * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
+   * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
+   * All tables and interfaces created from returned connection share zookeeper connection(if used),
+   * meta cache, and connections to region servers and masters.
+   * <p>
+   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
+   * connection instance.
+   * <p>
+   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
+   * as it is thread safe.
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @param user          the user the asynchronous connection is for
+   * @return AsyncConnection object wrapped by CompletableFuture
+   */
+  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
+    Configuration conf, final User user) {
+    return createAsyncConnection(connectionUri, conf, user, null);
   }
 
   /**
@@ -336,9 +516,38 @@
    */
   public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
     final User user, Map<String, byte[]> connectionAttributes) {
+    return createAsyncConnection(null, conf, user, connectionAttributes);
+  }
+
+  /**
+   * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
+   * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
+   * All tables and interfaces created from returned connection share zookeeper connection(if used),
+   * meta cache, and connections to region servers and masters.
+   * <p>
+   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
+   * connection instance.
+   * <p>
+   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
+   * as it is thread safe.
+   * @param connectionUri        the connection uri for the hbase cluster
+   * @param conf                 configuration
+   * @param user                 the user the asynchronous connection is for
+   * @param connectionAttributes attributes to be sent along to server during connection establish
+   * @return AsyncConnection object wrapped by CompletableFuture
+   */
+  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
+    Configuration conf, final User user, Map<String, byte[]> connectionAttributes) {
     return TraceUtil.tracedFuture(() -> {
+      ConnectionRegistry registry;
+      try {
+        registry = connectionUri != null
+          ? ConnectionRegistryFactory.create(connectionUri, conf, user)
+          : ConnectionRegistryFactory.create(conf, user);
+      } catch (Exception e) {
+        return FutureUtils.failedFuture(e);
+      }
       CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
-      ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf, user);
       addListener(registry.getClusterId(), (clusterId, error) -> {
         if (error != null) {
           registry.close();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
index 415d463..5eef2c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
@@ -17,27 +17,77 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
-
+import java.io.IOException;
+import java.net.URI;
+import java.util.ServiceLoader;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 
 /**
- * Factory class to get the instance of configured connection registry.
+ * The entry point for creating a {@link ConnectionRegistry}.
  */
 @InterfaceAudience.Private
 final class ConnectionRegistryFactory {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class);
+
+  private static final ImmutableMap<String, ConnectionRegistryURIFactory> CREATORS;
+  static {
+    ImmutableMap.Builder<String, ConnectionRegistryURIFactory> builder = ImmutableMap.builder();
+    for (ConnectionRegistryURIFactory factory : ServiceLoader
+      .load(ConnectionRegistryURIFactory.class)) {
+      builder.put(factory.getScheme().toLowerCase(), factory);
+    }
+    // throw IllegalArgumentException if there are duplicated keys
+    CREATORS = builder.buildOrThrow();
+  }
+
   private ConnectionRegistryFactory() {
   }
 
-  /** Returns The connection registry implementation to use. */
-  static ConnectionRegistry getRegistry(Configuration conf, User user) {
+  /**
+   * Returns the connection registry implementation to use, for the given connection url
+   * {@code uri}.
+   * <p/>
+   * We use {@link ServiceLoader} to load different implementations, and use the scheme of the given
+   * {@code uri} to select. And if there is no protocol specified, or we can not find a
+   * {@link ConnectionRegistryURIFactory} implementation for the given scheme, we will fallback to
+   * use the old way to create the {@link ConnectionRegistry}. Notice that, if fallback happens, the
+   * specified connection url {@code uri} will not take effect, we will load all the related
+   * configurations from the given Configuration instance {@code conf}
+   */
+  static ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
+    if (StringUtils.isBlank(uri.getScheme())) {
+      LOG.warn("No scheme specified for {}, fallback to use old way", uri);
+      return create(conf, user);
+    }
+    ConnectionRegistryURIFactory creator = CREATORS.get(uri.getScheme().toLowerCase());
+    if (creator == null) {
+      LOG.warn("No creator registered for {}, fallback to use old way", uri);
+      return create(conf, user);
+    }
+    return creator.create(uri, conf, user);
+  }
+
+  /**
+   * Returns the connection registry implementation to use.
+   * <p/>
+   * This is used when we do not have a connection url, we will use the old way to load the
+   * connection registry, by checking the
+   * {@literal HConstants#CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY} configuration.
+   */
+  static ConnectionRegistry create(Configuration conf, User user) {
     Class<? extends ConnectionRegistry> clazz =
-      conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class,
-        ConnectionRegistry.class);
+      conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        RpcConnectionRegistry.class, ConnectionRegistry.class);
     return ReflectionUtils.newInstance(clazz, conf, user);
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java
new file mode 100644
index 0000000..ab2037a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * For creating different {@link ConnectionRegistry} implementation.
+ */
+@InterfaceAudience.Private
+public interface ConnectionRegistryURIFactory {
+
+  /**
+   * Instantiate the {@link ConnectionRegistry} using the given parameters.
+   */
+  ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException;
+
+  /**
+   * The scheme for this implementation. Used to register this URI factory to the
+   * {@link ConnectionRegistryFactory}.
+   */
+  String getScheme();
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java
new file mode 100644
index 0000000..cb2338b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connection registry creator implementation for creating {@link RpcConnectionRegistry}.
+ */
+@InterfaceAudience.Private
+public class RpcConnectionRegistryCreator implements ConnectionRegistryURIFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RpcConnectionRegistryCreator.class);
+
+  @Override
+  public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
+    assert getScheme().equals(uri.getScheme());
+    LOG.debug("connect to hbase cluster with rpc bootstrap servers='{}'", uri.getAuthority());
+    Configuration c = new Configuration(conf);
+    c.set(RpcConnectionRegistry.BOOTSTRAP_NODES, uri.getAuthority());
+    return new RpcConnectionRegistry(c, user);
+  }
+
+  @Override
+  public String getScheme() {
+    return "hbase+rpc";
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java
new file mode 100644
index 0000000..8aa51e0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connection registry creator implementation for creating {@link ZKConnectionRegistry}.
+ */
+@InterfaceAudience.Private
+public class ZKConnectionRegistryCreator implements ConnectionRegistryURIFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistryCreator.class);
+
+  @Override
+  public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
+    assert getScheme().equals(uri.getScheme());
+    LOG.debug("connect to hbase cluster with zk quorum='{}' and parent='{}'", uri.getAuthority(),
+      uri.getPath());
+    Configuration c = new Configuration(conf);
+    c.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, uri.getAuthority());
+    c.set(HConstants.ZOOKEEPER_ZNODE_PARENT, uri.getPath());
+    return new ZKConnectionRegistry(c, user);
+  }
+
+  @Override
+  public String getScheme() {
+    return "hbase+zk";
+  }
+}
diff --git a/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory
new file mode 100644
index 0000000..b25a569
--- /dev/null
+++ b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.hadoop.hbase.client.RpcConnectionRegistryCreator
+org.apache.hadoop.hbase.client.ZKConnectionRegistryCreator
\ No newline at end of file
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java
new file mode 100644
index 0000000..4dabd89
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+
+import java.net.URI;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+
+/**
+ * Make sure we can successfully parse the URI component
+ */
+@Category({ ClientTests.class, SmallTests.class })
+public class TestConnectionRegistryCreatorUriParsing {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestConnectionRegistryCreatorUriParsing.class);
+
+  private Configuration conf;
+
+  private User user;
+
+  private MockedConstruction<RpcConnectionRegistry> mockedRpcRegistry;
+
+  private MockedConstruction<ZKConnectionRegistry> mockedZkRegistry;
+
+  private MockedStatic<ReflectionUtils> mockedReflectionUtils;
+
+  private List<?> args;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    user = mock(User.class);
+    args = null;
+    mockedRpcRegistry = mockConstruction(RpcConnectionRegistry.class, (mock, context) -> {
+      args = context.arguments();
+    });
+    mockedZkRegistry = mockConstruction(ZKConnectionRegistry.class, (mock, context) -> {
+      args = context.arguments();
+    });
+    mockedReflectionUtils = mockStatic(ReflectionUtils.class);
+  }
+
+  @After
+  public void tearDown() {
+    mockedRpcRegistry.closeOnDemand();
+    mockedZkRegistry.closeOnDemand();
+    mockedReflectionUtils.closeOnDemand();
+  }
+
+  @Test
+  public void testParseRpcSingle() throws Exception {
+    ConnectionRegistryFactory.create(new URI("hbase+rpc://server1:123"), conf, user);
+    assertEquals(1, mockedRpcRegistry.constructed().size());
+    assertSame(user, args.get(1));
+    Configuration conf = (Configuration) args.get(0);
+    assertEquals("server1:123", conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES));
+  }
+
+  @Test
+  public void testParseRpcMultiple() throws Exception {
+    ConnectionRegistryFactory.create(new URI("hbase+rpc://server1:123,server2:456,server3:789"),
+      conf, user);
+    assertEquals(1, mockedRpcRegistry.constructed().size());
+    assertSame(user, args.get(1));
+    Configuration conf = (Configuration) args.get(0);
+    assertEquals("server1:123,server2:456,server3:789",
+      conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES));
+  }
+
+  @Test
+  public void testParseZkSingle() throws Exception {
+    ConnectionRegistryFactory.create(new URI("hbase+zk://server1:123/root"), conf, user);
+    assertEquals(1, mockedZkRegistry.constructed().size());
+    assertSame(user, args.get(1));
+    Configuration conf = (Configuration) args.get(0);
+    assertEquals("server1:123", conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM));
+    assertEquals("/root", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+  }
+
+  @Test
+  public void testParseZkMultiple() throws Exception {
+    ConnectionRegistryFactory
+      .create(new URI("hbase+zk://server1:123,server2:456,server3:789/root/path"), conf, user);
+    assertEquals(1, mockedZkRegistry.constructed().size());
+    assertSame(user, args.get(1));
+    Configuration conf = (Configuration) args.get(0);
+    assertEquals("server1:123,server2:456,server3:789",
+      conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM));
+    assertEquals("/root/path", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+  }
+
+  @Test
+  public void testFallbackNoScheme() throws Exception {
+    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
+      ConnectionRegistry.class);
+    ConnectionRegistryFactory.create(new URI("server1:2181/path"), conf, user);
+    ArgumentCaptor<Class<?>> clazzCaptor = ArgumentCaptor.forClass(Class.class);
+    ArgumentCaptor<Object[]> argsCaptor = ArgumentCaptor.forClass(Object[].class);
+    mockedReflectionUtils
+      .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), argsCaptor.capture()));
+    assertEquals(ZKConnectionRegistry.class, clazzCaptor.getValue());
+    assertSame(conf, argsCaptor.getValue()[0]);
+    assertSame(user, argsCaptor.getValue()[1]);
+  }
+
+  @Test
+  public void testFallbackNoCreator() throws Exception {
+    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class,
+      ConnectionRegistry.class);
+    ConnectionRegistryFactory.create(new URI("hbase+tls://server1:123/path"), conf, user);
+    ArgumentCaptor<Class<?>> clazzCaptor = ArgumentCaptor.forClass(Class.class);
+    ArgumentCaptor<Object[]> argsCaptor = ArgumentCaptor.forClass(Object[].class);
+    mockedReflectionUtils
+      .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), argsCaptor.capture()));
+    assertEquals(RpcConnectionRegistry.class, clazzCaptor.getValue());
+    assertSame(conf, argsCaptor.getValue()[0]);
+    assertSame(user, argsCaptor.getValue()[1]);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 7225f92..ed90863 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -64,7 +64,7 @@
    */
   public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
     SocketAddress localAddress, User user) throws IOException {
-    return createAsyncClusterConnection(conf, ConnectionRegistryFactory.getRegistry(conf, user),
+    return createAsyncClusterConnection(conf, ConnectionRegistryFactory.create(conf, user),
       localAddress, user);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
index 0ff1057..031dff7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
@@ -60,7 +60,7 @@
     UTIL.getAdmin().createTable(td, SPLIT_KEYS);
     UTIL.waitTableAvailable(TABLE_NAME);
     try (ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration(), User.getCurrent())) {
+      ConnectionRegistryFactory.create(UTIL.getConfiguration(), User.getCurrent())) {
       RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
     }
     UTIL.getAdmin().balancerSwitch(false, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
index da400f2..bb0eb31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
@@ -56,7 +56,7 @@
     TestAsyncAdminBase.setUpBeforeClass();
     HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
     try (ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent())) {
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent())) {
       RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 90d2cb5..e14cd32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -107,8 +107,7 @@
       testUtil = miniClusterRule.getTestingUtility();
       HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3);
       testUtil.waitUntilNoRegionsInTransition();
-      registry =
-        ConnectionRegistryFactory.getRegistry(testUtil.getConfiguration(), User.getCurrent());
+      registry = ConnectionRegistryFactory.create(testUtil.getConfiguration(), User.getCurrent());
       RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(testUtil, registry);
       admin.balancerSwitch(false).get();
       locator = new AsyncMetaRegionLocator(registry);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index a6d0ab8..6a5230b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -128,7 +128,7 @@
     // Enable meta replica LoadBalance mode for this connection.
     c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, metaReplicaMode.toString());
     ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
     conn =
       new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), null, User.getCurrent());
     locator = new AsyncNonMetaRegionLocator(conn);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index 50c9ab9..439d527 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -125,7 +125,7 @@
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
       registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
index bacd7bb..2291c28 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
@@ -100,7 +100,7 @@
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
       registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = CONN.getLocator();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 3c83271..baa4ee7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -73,7 +73,7 @@
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
       registry.getClusterId().get(), null, User.getCurrent());
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
index 0de59a4..2803db2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
@@ -95,8 +95,7 @@
       FailPrimaryMetaScanCp.class.getName());
     UTIL.startMiniCluster(3);
     HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
-    try (ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(conf, User.getCurrent())) {
+    try (ConnectionRegistry registry = ConnectionRegistryFactory.create(conf, User.getCurrent())) {
       RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
     }
     try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java
new file mode 100644
index 0000000..5746ffa
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic read write operation with different {@link ConnectionRegistry} implementations.
+ */
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestBasicReadWriteWithDifferentConnectionRegistries {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBasicReadWriteWithDifferentConnectionRegistries.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestBasicReadWriteWithDifferentConnectionRegistries.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  public enum RegistryImpl {
+    ZK,
+    RPC,
+    ZK_URI,
+    RPC_URI
+  }
+
+  @Parameter
+  public RegistryImpl impl;
+
+  @Rule
+  public final TableNameTestRule name = new TableNameTestRule();
+
+  private byte[] FAMILY = Bytes.toBytes("family");
+
+  private Connection conn;
+
+  @Parameters(name = "{index}: impl={0}")
+  public static List<Object[]> data() {
+    List<Object[]> data = new ArrayList<Object[]>();
+    for (RegistryImpl impl : RegistryImpl.values()) {
+      data.add(new Object[] { impl });
+    }
+    return data;
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    switch (impl) {
+      case ZK: {
+        Configuration conf = HBaseConfiguration.create();
+        conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+          ZKConnectionRegistry.class, ConnectionRegistry.class);
+        String quorum = UTIL.getZkCluster().getAddress().toString();
+        String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+        conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, quorum);
+        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, path);
+        LOG.info("connect to cluster through zk quorum={} and parent={}", quorum, path);
+        conn = ConnectionFactory.createConnection(conf);
+        break;
+      }
+      case RPC: {
+        Configuration conf = HBaseConfiguration.create();
+        conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+          RpcConnectionRegistry.class, ConnectionRegistry.class);
+        String bootstrapServers =
+          UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString();
+        conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, bootstrapServers);
+        LOG.info("connect to cluster through rpc bootstrap servers={}", bootstrapServers);
+        conn = ConnectionFactory.createConnection(conf);
+        break;
+      }
+      case ZK_URI: {
+        String quorum = UTIL.getZkCluster().getAddress().toString();
+        String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+        URI connectionUri = new URI("hbase+zk://" + quorum + path);
+        LOG.info("connect to cluster through connection url: {}", connectionUri);
+        conn = ConnectionFactory.createConnection(connectionUri);
+        break;
+      }
+      case RPC_URI: {
+        URI connectionUri = new URI("hbase+rpc://"
+          + UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString());
+        LOG.info("connect to cluster through connection url: {}", connectionUri);
+        conn = ConnectionFactory.createConnection(connectionUri);
+        break;
+      }
+      default:
+        throw new IllegalArgumentException("Unknown impl: " + impl);
+    }
+    try (Admin admin = conn.getAdmin()) {
+      admin.createTable(TableDescriptorBuilder.newBuilder(name.getTableName())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TableName tableName = name.getTableName();
+    try (Admin admin = conn.getAdmin()) {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+    conn.close();
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+    byte[] row = Bytes.toBytes("row");
+    byte[] qualifier = Bytes.toBytes("qualifier");
+    byte[] value = Bytes.toBytes("value");
+    try (Table table = conn.getTable(name.getTableName())) {
+      Put put = new Put(row).addColumn(FAMILY, qualifier, value);
+      table.put(put);
+      Result result = table.get(new Get(row));
+      assertArrayEquals(value, result.getValue(FAMILY, qualifier));
+      table.delete(new Delete(row));
+      assertFalse(table.exists(new Get(row)));
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
index 5c78e53..12f278e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
@@ -77,8 +77,7 @@
       () -> TEST_UTIL.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
           >= numOfMetaReplica);
 
-    registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+    registry = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
     CONN = new AsyncConnectionImpl(conf, registry, registry.getClusterId().get(), null,
       User.getCurrent());
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index beb054e..29223de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -64,8 +64,7 @@
   public static void setUp() throws Exception {
     TEST_UTIL.startMiniCluster(3);
     HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
-    REGISTRY =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+    REGISTRY = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
     RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
   }