HBASE-28436 Use connection url to specify the connection registry information (#5770)
Signed-off-by: Istvan Toth <stoty@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Reviewed-by: Bryan Beaudreault <bbeaudreault@apache.org>
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index d4ee67f..ea99023 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -171,6 +171,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index 716fb48..f4ef449 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.Map;
@@ -89,41 +90,55 @@
* instance. Typical usage:
*
* <pre>
- * Connection connection = ConnectionFactory.createConnection();
- * Table table = connection.getTable(TableName.valueOf("mytable"));
- * try {
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
* table.get(...);
* ...
- * } finally {
- * table.close();
- * connection.close();
* }
* </pre>
*
* @return Connection object for <code>conf</code>
*/
public static Connection createConnection() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- return createConnection(conf, null, AuthUtil.loginClient(conf));
+ return createConnection(HBaseConfiguration.create());
+ }
+
+ /**
+ * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
+ * housekeeping for a connection to the cluster. All tables and interfaces created from returned
+ * connection share zookeeper connection, meta cache, and connections to region servers and
+ * masters. <br>
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection
+ * instance. Typical usage:
+ *
+ * <pre>
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
+ * table.get(...);
+ * ...
+ * }
+ * </pre>
+ *
+ * @param connectionUri the connection uri for the hbase cluster
+ * @return Connection object for <code>conf</code>
+ */
+ public static Connection createConnection(URI connectionUri) throws IOException {
+ return createConnection(connectionUri, HBaseConfiguration.create());
}
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
- * created from returned connection share zookeeper connection, meta cache, and connections to
- * region servers and masters. <br>
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* <pre>
- * Connection connection = ConnectionFactory.createConnection(conf);
- * Table table = connection.getTable(TableName.valueOf("mytable"));
- * try {
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
* table.get(...);
* ...
- * } finally {
- * table.close();
- * connection.close();
* }
* </pre>
*
@@ -137,20 +152,41 @@
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
- * created from returned connection share zookeeper connection, meta cache, and connections to
- * region servers and masters. <br>
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* <pre>
- * Connection connection = ConnectionFactory.createConnection(conf);
- * Table table = connection.getTable(TableName.valueOf("mytable"));
- * try {
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
* table.get(...);
* ...
- * } finally {
- * table.close();
- * connection.close();
+ * }
+ * </pre>
+ *
+ * @param connectionUri the connection uri for the hbase cluster
+ * @param conf configuration
+ * @return Connection object for <code>conf</code>
+ */
+ public static Connection createConnection(URI connectionUri, Configuration conf)
+ throws IOException {
+ return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf));
+ }
+
+ /**
+ * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+ * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection
+ * instance. Typical usage:
+ *
+ * <pre>
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
+ * table.get(...);
+ * ...
* }
* </pre>
*
@@ -166,20 +202,42 @@
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
- * created from returned connection share zookeeper connection, meta cache, and connections to
- * region servers and masters. <br>
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* <pre>
- * Connection connection = ConnectionFactory.createConnection(conf);
- * Table table = connection.getTable(TableName.valueOf("table1"));
- * try {
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
* table.get(...);
* ...
- * } finally {
- * table.close();
- * connection.close();
+ * }
+ * </pre>
+ *
+ * @param connectionUri the connection uri for the hbase cluster
+ * @param conf configuration
+ * @param pool the thread pool to use for batch operations
+ * @return Connection object for <code>conf</code>
+ */
+ public static Connection createConnection(URI connectionUri, Configuration conf,
+ ExecutorService pool) throws IOException {
+ return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf));
+ }
+
+ /**
+ * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+ * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection
+ * instance. Typical usage:
+ *
+ * <pre>
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
+ * table.get(...);
+ * ...
* }
* </pre>
*
@@ -194,20 +252,42 @@
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
- * created from returned connection share zookeeper connection, meta cache, and connections to
- * region servers and masters. <br>
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* <pre>
- * Connection connection = ConnectionFactory.createConnection(conf);
- * Table table = connection.getTable(TableName.valueOf("table1"));
- * try {
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
* table.get(...);
* ...
- * } finally {
- * table.close();
- * connection.close();
+ * }
+ * </pre>
+ *
+ * @param connectionUri the connection uri for the hbase cluster
+ * @param conf configuration
+ * @param user the user the connection is for
+ * @return Connection object for <code>conf</code>
+ */
+ public static Connection createConnection(URI connectionUri, Configuration conf, User user)
+ throws IOException {
+ return createConnection(connectionUri, conf, null, user);
+ }
+
+ /**
+ * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+ * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection
+ * instance. Typical usage:
+ *
+ * <pre>
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
+ * table.get(...);
+ * ...
* }
* </pre>
*
@@ -224,20 +304,43 @@
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
- * created from returned connection share zookeeper connection, meta cache, and connections to
- * region servers and masters. <br>
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* <pre>
- * Connection connection = ConnectionFactory.createConnection(conf);
- * Table table = connection.getTable(TableName.valueOf("table1"));
- * try {
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
* table.get(...);
* ...
- * } finally {
- * table.close();
- * connection.close();
+ * }
+ * </pre>
+ *
+ * @param connectionUri the connection uri for the hbase cluster
+ * @param conf configuration
+ * @param user the user the connection is for
+ * @param pool the thread pool to use for batch operations
+ * @return Connection object for <code>conf</code>
+ */
+ public static Connection createConnection(URI connectionUri, Configuration conf,
+ ExecutorService pool, User user) throws IOException {
+ return createConnection(connectionUri, conf, pool, user, Collections.emptyMap());
+ }
+
+ /**
+ * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+ * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection
+ * instance. Typical usage:
+ *
+ * <pre>
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
+ * table.get(...);
+ * ...
* }
* </pre>
*
@@ -249,6 +352,37 @@
*/
public static Connection createConnection(Configuration conf, ExecutorService pool,
final User user, Map<String, byte[]> connectionAttributes) throws IOException {
+ return createConnection(null, conf, pool, user, connectionAttributes);
+ }
+
+ /**
+ * Create a new Connection instance using the passed <code>conf</code> instance. Connection
+ * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
+ * created from returned connection share zookeeper connection(if used), meta cache, and
+ * connections to region servers and masters. <br>
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection
+ * instance. Typical usage:
+ *
+ * <pre>
+ * Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"));
+ * try (Connection connection = ConnectionFactory.createConnection(conf);
+ * Table table = connection.getTable(TableName.valueOf("table1"))) {
+ * table.get(...);
+ * ...
+ * }
+ * </pre>
+ *
+ * @param connectionUri the connection uri for the hbase cluster
+ * @param conf configuration
+ * @param user the user the connection is for
+ * @param pool the thread pool to use for batch operations
+ * @param connectionAttributes attributes to be sent along to server during connection establish
+ * @return Connection object for <code>conf</code>
+ */
+ public static Connection createConnection(URI connectionUri, Configuration conf,
+ ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes)
+ throws IOException {
Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionOverAsyncConnection.class, Connection.class);
if (clazz != ConnectionOverAsyncConnection.class) {
@@ -263,7 +397,7 @@
throw new IOException(e);
}
} else {
- return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes))
+ return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes))
.toConnection();
}
}
@@ -278,6 +412,16 @@
}
/**
+ * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration.
+ * @param connectionUri the connection uri for the hbase cluster
+ * @see #createAsyncConnection(URI, Configuration)
+ * @return AsyncConnection object wrapped by CompletableFuture
+ */
+ public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri) {
+ return createAsyncConnection(connectionUri, HBaseConfiguration.create());
+ }
+
+ /**
* Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
* User object created by {@link UserProvider}. The given {@code conf} will also be used to
* initialize the {@link UserProvider}.
@@ -287,6 +431,21 @@
* @see UserProvider
*/
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
+ return createAsyncConnection(null, conf);
+ }
+
+ /**
+ * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri},
+ * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will
+ * also be used to initialize the {@link UserProvider}.
+ * @param connectionUri the connection uri for the hbase cluster
+ * @param conf configuration
+ * @return AsyncConnection object wrapped by CompletableFuture
+ * @see #createAsyncConnection(Configuration, User)
+ * @see UserProvider
+ */
+ public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
+ Configuration conf) {
User user;
try {
user = AuthUtil.loginClient(conf);
@@ -295,7 +454,7 @@
future.completeExceptionally(e);
return future;
}
- return createAsyncConnection(conf, user);
+ return createAsyncConnection(connectionUri, conf, user);
}
/**
@@ -315,7 +474,28 @@
*/
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
final User user) {
- return createAsyncConnection(conf, user, null);
+ return createAsyncConnection(null, conf, user);
+ }
+
+ /**
+ * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
+ * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
+ * All tables and interfaces created from returned connection share zookeeper connection(if used),
+ * meta cache, and connections to region servers and masters.
+ * <p>
+ * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
+ * connection instance.
+ * <p>
+ * Usually you should only create one AsyncConnection instance in your code and use it everywhere
+ * as it is thread safe.
+ * @param connectionUri the connection uri for the hbase cluster
+ * @param conf configuration
+ * @param user the user the asynchronous connection is for
+ * @return AsyncConnection object wrapped by CompletableFuture
+ */
+ public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
+ Configuration conf, final User user) {
+ return createAsyncConnection(connectionUri, conf, user, null);
}
/**
@@ -336,9 +516,38 @@
*/
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
final User user, Map<String, byte[]> connectionAttributes) {
+ return createAsyncConnection(null, conf, user, connectionAttributes);
+ }
+
+ /**
+ * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
+ * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
+ * All tables and interfaces created from returned connection share zookeeper connection(if used),
+ * meta cache, and connections to region servers and masters.
+ * <p>
+ * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
+ * connection instance.
+ * <p>
+ * Usually you should only create one AsyncConnection instance in your code and use it everywhere
+ * as it is thread safe.
+ * @param connectionUri the connection uri for the hbase cluster
+ * @param conf configuration
+ * @param user the user the asynchronous connection is for
+ * @param connectionAttributes attributes to be sent along to server during connection establish
+ * @return AsyncConnection object wrapped by CompletableFuture
+ */
+ public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
+ Configuration conf, final User user, Map<String, byte[]> connectionAttributes) {
return TraceUtil.tracedFuture(() -> {
+ ConnectionRegistry registry;
+ try {
+ registry = connectionUri != null
+ ? ConnectionRegistryFactory.create(connectionUri, conf, user)
+ : ConnectionRegistryFactory.create(conf, user);
+ } catch (Exception e) {
+ return FutureUtils.failedFuture(e);
+ }
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
- ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf, user);
addListener(registry.getClusterId(), (clusterId, error) -> {
if (error != null) {
registry.close();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
index 415d463..5eef2c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
@@ -17,27 +17,77 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
-
+import java.io.IOException;
+import java.net.URI;
+import java.util.ServiceLoader;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
/**
- * Factory class to get the instance of configured connection registry.
+ * The entry point for creating a {@link ConnectionRegistry}.
*/
@InterfaceAudience.Private
final class ConnectionRegistryFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class);
+
+ private static final ImmutableMap<String, ConnectionRegistryURIFactory> CREATORS;
+ static {
+ ImmutableMap.Builder<String, ConnectionRegistryURIFactory> builder = ImmutableMap.builder();
+ for (ConnectionRegistryURIFactory factory : ServiceLoader
+ .load(ConnectionRegistryURIFactory.class)) {
+ builder.put(factory.getScheme().toLowerCase(), factory);
+ }
+ // throw IllegalArgumentException if there are duplicated keys
+ CREATORS = builder.buildOrThrow();
+ }
+
private ConnectionRegistryFactory() {
}
- /** Returns The connection registry implementation to use. */
- static ConnectionRegistry getRegistry(Configuration conf, User user) {
+ /**
+ * Returns the connection registry implementation to use, for the given connection url
+ * {@code uri}.
+ * <p/>
+ * We use {@link ServiceLoader} to load different implementations, and use the scheme of the given
+ * {@code uri} to select. And if there is no protocol specified, or we can not find a
+ * {@link ConnectionRegistryURIFactory} implementation for the given scheme, we will fallback to
+ * use the old way to create the {@link ConnectionRegistry}. Notice that, if fallback happens, the
+ * specified connection url {@code uri} will not take effect, we will load all the related
+ * configurations from the given Configuration instance {@code conf}
+ */
+ static ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
+ if (StringUtils.isBlank(uri.getScheme())) {
+ LOG.warn("No scheme specified for {}, fallback to use old way", uri);
+ return create(conf, user);
+ }
+ ConnectionRegistryURIFactory creator = CREATORS.get(uri.getScheme().toLowerCase());
+ if (creator == null) {
+ LOG.warn("No creator registered for {}, fallback to use old way", uri);
+ return create(conf, user);
+ }
+ return creator.create(uri, conf, user);
+ }
+
+ /**
+ * Returns the connection registry implementation to use.
+ * <p/>
+ * This is used when we do not have a connection url, we will use the old way to load the
+ * connection registry, by checking the
+ * {@literal HConstants#CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY} configuration.
+ */
+ static ConnectionRegistry create(Configuration conf, User user) {
Class<? extends ConnectionRegistry> clazz =
- conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class,
- ConnectionRegistry.class);
+ conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ RpcConnectionRegistry.class, ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf, user);
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java
new file mode 100644
index 0000000..ab2037a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * For creating different {@link ConnectionRegistry} implementation.
+ */
+@InterfaceAudience.Private
+public interface ConnectionRegistryURIFactory {
+
+ /**
+ * Instantiate the {@link ConnectionRegistry} using the given parameters.
+ */
+ ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException;
+
+ /**
+ * The scheme for this implementation. Used to register this URI factory to the
+ * {@link ConnectionRegistryFactory}.
+ */
+ String getScheme();
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java
new file mode 100644
index 0000000..cb2338b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connection registry creator implementation for creating {@link RpcConnectionRegistry}.
+ */
+@InterfaceAudience.Private
+public class RpcConnectionRegistryCreator implements ConnectionRegistryURIFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RpcConnectionRegistryCreator.class);
+
+ @Override
+ public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
+ assert getScheme().equals(uri.getScheme());
+ LOG.debug("connect to hbase cluster with rpc bootstrap servers='{}'", uri.getAuthority());
+ Configuration c = new Configuration(conf);
+ c.set(RpcConnectionRegistry.BOOTSTRAP_NODES, uri.getAuthority());
+ return new RpcConnectionRegistry(c, user);
+ }
+
+ @Override
+ public String getScheme() {
+ return "hbase+rpc";
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java
new file mode 100644
index 0000000..8aa51e0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connection registry creator implementation for creating {@link ZKConnectionRegistry}.
+ */
+@InterfaceAudience.Private
+public class ZKConnectionRegistryCreator implements ConnectionRegistryURIFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistryCreator.class);
+
+ @Override
+ public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
+ assert getScheme().equals(uri.getScheme());
+ LOG.debug("connect to hbase cluster with zk quorum='{}' and parent='{}'", uri.getAuthority(),
+ uri.getPath());
+ Configuration c = new Configuration(conf);
+ c.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, uri.getAuthority());
+ c.set(HConstants.ZOOKEEPER_ZNODE_PARENT, uri.getPath());
+ return new ZKConnectionRegistry(c, user);
+ }
+
+ @Override
+ public String getScheme() {
+ return "hbase+zk";
+ }
+}
diff --git a/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory
new file mode 100644
index 0000000..b25a569
--- /dev/null
+++ b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.hadoop.hbase.client.RpcConnectionRegistryCreator
+org.apache.hadoop.hbase.client.ZKConnectionRegistryCreator
\ No newline at end of file
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java
new file mode 100644
index 0000000..4dabd89
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+
+import java.net.URI;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+
+/**
+ * Make sure we can successfully parse the URI component
+ */
+@Category({ ClientTests.class, SmallTests.class })
+public class TestConnectionRegistryCreatorUriParsing {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestConnectionRegistryCreatorUriParsing.class);
+
+ private Configuration conf;
+
+ private User user;
+
+ private MockedConstruction<RpcConnectionRegistry> mockedRpcRegistry;
+
+ private MockedConstruction<ZKConnectionRegistry> mockedZkRegistry;
+
+ private MockedStatic<ReflectionUtils> mockedReflectionUtils;
+
+ private List<?> args;
+
+ @Before
+ public void setUp() {
+ conf = HBaseConfiguration.create();
+ user = mock(User.class);
+ args = null;
+ mockedRpcRegistry = mockConstruction(RpcConnectionRegistry.class, (mock, context) -> {
+ args = context.arguments();
+ });
+ mockedZkRegistry = mockConstruction(ZKConnectionRegistry.class, (mock, context) -> {
+ args = context.arguments();
+ });
+ mockedReflectionUtils = mockStatic(ReflectionUtils.class);
+ }
+
+ @After
+ public void tearDown() {
+ mockedRpcRegistry.closeOnDemand();
+ mockedZkRegistry.closeOnDemand();
+ mockedReflectionUtils.closeOnDemand();
+ }
+
+ @Test
+ public void testParseRpcSingle() throws Exception {
+ ConnectionRegistryFactory.create(new URI("hbase+rpc://server1:123"), conf, user);
+ assertEquals(1, mockedRpcRegistry.constructed().size());
+ assertSame(user, args.get(1));
+ Configuration conf = (Configuration) args.get(0);
+ assertEquals("server1:123", conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES));
+ }
+
+ @Test
+ public void testParseRpcMultiple() throws Exception {
+ ConnectionRegistryFactory.create(new URI("hbase+rpc://server1:123,server2:456,server3:789"),
+ conf, user);
+ assertEquals(1, mockedRpcRegistry.constructed().size());
+ assertSame(user, args.get(1));
+ Configuration conf = (Configuration) args.get(0);
+ assertEquals("server1:123,server2:456,server3:789",
+ conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES));
+ }
+
+ @Test
+ public void testParseZkSingle() throws Exception {
+ ConnectionRegistryFactory.create(new URI("hbase+zk://server1:123/root"), conf, user);
+ assertEquals(1, mockedZkRegistry.constructed().size());
+ assertSame(user, args.get(1));
+ Configuration conf = (Configuration) args.get(0);
+ assertEquals("server1:123", conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM));
+ assertEquals("/root", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ }
+
+ @Test
+ public void testParseZkMultiple() throws Exception {
+ ConnectionRegistryFactory
+ .create(new URI("hbase+zk://server1:123,server2:456,server3:789/root/path"), conf, user);
+ assertEquals(1, mockedZkRegistry.constructed().size());
+ assertSame(user, args.get(1));
+ Configuration conf = (Configuration) args.get(0);
+ assertEquals("server1:123,server2:456,server3:789",
+ conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM));
+ assertEquals("/root/path", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ }
+
+ @Test
+ public void testFallbackNoScheme() throws Exception {
+ conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
+ ConnectionRegistry.class);
+ ConnectionRegistryFactory.create(new URI("server1:2181/path"), conf, user);
+ ArgumentCaptor<Class<?>> clazzCaptor = ArgumentCaptor.forClass(Class.class);
+ ArgumentCaptor<Object[]> argsCaptor = ArgumentCaptor.forClass(Object[].class);
+ mockedReflectionUtils
+ .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), argsCaptor.capture()));
+ assertEquals(ZKConnectionRegistry.class, clazzCaptor.getValue());
+ assertSame(conf, argsCaptor.getValue()[0]);
+ assertSame(user, argsCaptor.getValue()[1]);
+ }
+
+ @Test
+ public void testFallbackNoCreator() throws Exception {
+ conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class,
+ ConnectionRegistry.class);
+ ConnectionRegistryFactory.create(new URI("hbase+tls://server1:123/path"), conf, user);
+ ArgumentCaptor<Class<?>> clazzCaptor = ArgumentCaptor.forClass(Class.class);
+ ArgumentCaptor<Object[]> argsCaptor = ArgumentCaptor.forClass(Object[].class);
+ mockedReflectionUtils
+ .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), argsCaptor.capture()));
+ assertEquals(RpcConnectionRegistry.class, clazzCaptor.getValue());
+ assertSame(conf, argsCaptor.getValue()[0]);
+ assertSame(user, argsCaptor.getValue()[1]);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 7225f92..ed90863 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -64,7 +64,7 @@
*/
public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
SocketAddress localAddress, User user) throws IOException {
- return createAsyncClusterConnection(conf, ConnectionRegistryFactory.getRegistry(conf, user),
+ return createAsyncClusterConnection(conf, ConnectionRegistryFactory.create(conf, user),
localAddress, user);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
index 0ff1057..031dff7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
@@ -60,7 +60,7 @@
UTIL.getAdmin().createTable(td, SPLIT_KEYS);
UTIL.waitTableAvailable(TABLE_NAME);
try (ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration(), User.getCurrent())) {
+ ConnectionRegistryFactory.create(UTIL.getConfiguration(), User.getCurrent())) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
}
UTIL.getAdmin().balancerSwitch(false, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
index da400f2..bb0eb31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
@@ -56,7 +56,7 @@
TestAsyncAdminBase.setUpBeforeClass();
HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
try (ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent())) {
+ ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent())) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 90d2cb5..e14cd32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -107,8 +107,7 @@
testUtil = miniClusterRule.getTestingUtility();
HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3);
testUtil.waitUntilNoRegionsInTransition();
- registry =
- ConnectionRegistryFactory.getRegistry(testUtil.getConfiguration(), User.getCurrent());
+ registry = ConnectionRegistryFactory.create(testUtil.getConfiguration(), User.getCurrent());
RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(testUtil, registry);
admin.balancerSwitch(false).get();
locator = new AsyncMetaRegionLocator(registry);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index a6d0ab8..6a5230b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -128,7 +128,7 @@
// Enable meta replica LoadBalance mode for this connection.
c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, metaReplicaMode.toString());
ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+ ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
conn =
new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), null, User.getCurrent());
locator = new AsyncNonMetaRegionLocator(conn);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index 50c9ab9..439d527 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -125,7 +125,7 @@
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+ ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
index bacd7bb..2291c28 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
@@ -100,7 +100,7 @@
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+ ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent());
LOCATOR = CONN.getLocator();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 3c83271..baa4ee7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -73,7 +73,7 @@
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+ ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent());
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
index 0de59a4..2803db2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
@@ -95,8 +95,7 @@
FailPrimaryMetaScanCp.class.getName());
UTIL.startMiniCluster(3);
HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
- try (ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(conf, User.getCurrent())) {
+ try (ConnectionRegistry registry = ConnectionRegistryFactory.create(conf, User.getCurrent())) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
}
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java
new file mode 100644
index 0000000..5746ffa
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic read write operation with different {@link ConnectionRegistry} implementations.
+ */
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestBasicReadWriteWithDifferentConnectionRegistries {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBasicReadWriteWithDifferentConnectionRegistries.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestBasicReadWriteWithDifferentConnectionRegistries.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ public enum RegistryImpl {
+ ZK,
+ RPC,
+ ZK_URI,
+ RPC_URI
+ }
+
+ @Parameter
+ public RegistryImpl impl;
+
+ @Rule
+ public final TableNameTestRule name = new TableNameTestRule();
+
+ private byte[] FAMILY = Bytes.toBytes("family");
+
+ private Connection conn;
+
+ @Parameters(name = "{index}: impl={0}")
+ public static List<Object[]> data() {
+ List<Object[]> data = new ArrayList<Object[]>();
+ for (RegistryImpl impl : RegistryImpl.values()) {
+ data.add(new Object[] { impl });
+ }
+ return data;
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ switch (impl) {
+ case ZK: {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ ZKConnectionRegistry.class, ConnectionRegistry.class);
+ String quorum = UTIL.getZkCluster().getAddress().toString();
+ String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+ conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, quorum);
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, path);
+ LOG.info("connect to cluster through zk quorum={} and parent={}", quorum, path);
+ conn = ConnectionFactory.createConnection(conf);
+ break;
+ }
+ case RPC: {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ RpcConnectionRegistry.class, ConnectionRegistry.class);
+ String bootstrapServers =
+ UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString();
+ conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, bootstrapServers);
+ LOG.info("connect to cluster through rpc bootstrap servers={}", bootstrapServers);
+ conn = ConnectionFactory.createConnection(conf);
+ break;
+ }
+ case ZK_URI: {
+ String quorum = UTIL.getZkCluster().getAddress().toString();
+ String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+ URI connectionUri = new URI("hbase+zk://" + quorum + path);
+ LOG.info("connect to cluster through connection url: {}", connectionUri);
+ conn = ConnectionFactory.createConnection(connectionUri);
+ break;
+ }
+ case RPC_URI: {
+ URI connectionUri = new URI("hbase+rpc://"
+ + UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString());
+ LOG.info("connect to cluster through connection url: {}", connectionUri);
+ conn = ConnectionFactory.createConnection(connectionUri);
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown impl: " + impl);
+ }
+ try (Admin admin = conn.getAdmin()) {
+ admin.createTable(TableDescriptorBuilder.newBuilder(name.getTableName())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TableName tableName = name.getTableName();
+ try (Admin admin = conn.getAdmin()) {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+ conn.close();
+ }
+
+ @Test
+ public void testReadWrite() throws Exception {
+ byte[] row = Bytes.toBytes("row");
+ byte[] qualifier = Bytes.toBytes("qualifier");
+ byte[] value = Bytes.toBytes("value");
+ try (Table table = conn.getTable(name.getTableName())) {
+ Put put = new Put(row).addColumn(FAMILY, qualifier, value);
+ table.put(put);
+ Result result = table.get(new Get(row));
+ assertArrayEquals(value, result.getValue(FAMILY, qualifier));
+ table.delete(new Delete(row));
+ assertFalse(table.exists(new Get(row)));
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
index 5c78e53..12f278e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
@@ -77,8 +77,7 @@
() -> TEST_UTIL.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
>= numOfMetaReplica);
- registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+ registry = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
CONN = new AsyncConnectionImpl(conf, registry, registry.getClusterId().get(), null,
User.getCurrent());
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index beb054e..29223de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -64,8 +64,7 @@
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(3);
HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
- REGISTRY =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
+ REGISTRY = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
}