HBASE-28547 Support specifying connection configuration through queries of the connection uri (#5853)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index b9b156b..144a790 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hbase.util.Strings;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -569,10 +570,16 @@
Configuration conf, final User user, Map<String, byte[]> connectionAttributes) {
return TraceUtil.tracedFuture(() -> {
ConnectionRegistry registry;
+ Configuration appliedConf;
try {
- registry = connectionUri != null
- ? ConnectionRegistryFactory.create(connectionUri, conf, user)
- : ConnectionRegistryFactory.create(conf, user);
+ if (connectionUri != null) {
+ appliedConf = new Configuration(conf);
+ Strings.applyURIQueriesToConf(connectionUri, appliedConf);
+ registry = ConnectionRegistryFactory.create(connectionUri, appliedConf, user);
+ } else {
+ appliedConf = conf;
+ registry = ConnectionRegistryFactory.create(appliedConf, user);
+ }
} catch (Exception e) {
return FutureUtils.failedFuture(e);
}
@@ -588,12 +595,12 @@
future.completeExceptionally(new IOException("clusterid came back null"));
return;
}
- Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
- AsyncConnectionImpl.class, AsyncConnection.class);
+ Class<? extends AsyncConnection> clazz = appliedConf.getClass(
+ HBASE_CLIENT_ASYNC_CONNECTION_IMPL, AsyncConnectionImpl.class, AsyncConnection.class);
try {
- future.complete(
- user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
- .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes)));
+ future.complete(user.runAs((PrivilegedExceptionAction<
+ ? extends AsyncConnection>) () -> ReflectionUtils.newInstance(clazz, appliedConf,
+ registry, clusterId, null, user, connectionAttributes)));
} catch (Exception e) {
registry.close();
future.completeExceptionally(e);
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionFactoryApplyURIQueries.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionFactoryApplyURIQueries.java
new file mode 100644
index 0000000..806c5ed
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionFactoryApplyURIQueries.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestConnectionFactoryApplyURIQueries {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestConnectionFactoryApplyURIQueries.class);
+
+ private Configuration conf;
+
+ private MockedStatic<ConnectionRegistryFactory> mockedConnectionRegistryFactory;
+
+ private ConnectionRegistry registry;
+
+ @Before
+ public void setUp() {
+ conf = HBaseConfiguration.create();
+ mockedConnectionRegistryFactory = mockStatic(ConnectionRegistryFactory.class);
+ registry = mock(ConnectionRegistry.class);
+ mockedConnectionRegistryFactory
+ .when(() -> ConnectionRegistryFactory.create(any(), any(), any())).thenReturn(registry);
+ when(registry.getClusterId()).thenReturn(CompletableFuture.completedFuture("cluster"));
+ }
+
+ @After
+ public void tearDown() {
+ mockedConnectionRegistryFactory.closeOnDemand();
+ }
+
+ @Test
+ public void testApplyURIQueries() throws Exception {
+ ConnectionFactory.createConnection(new URI("hbase+rpc://server:16010?a=1&b=2&c"), conf);
+ ArgumentCaptor<Configuration> captor = ArgumentCaptor.forClass(Configuration.class);
+ mockedConnectionRegistryFactory
+ .verify(() -> ConnectionRegistryFactory.create(any(), captor.capture(), any()));
+ Configuration c = captor.getValue();
+ assertEquals("1", c.get("a"));
+ assertEquals("2", c.get("b"));
+ assertEquals("", c.get("c"));
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java
index 3baab9c..b5d760b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java
@@ -17,7 +17,15 @@
*/
package org.apache.hadoop.hbase.util;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
@@ -94,4 +102,37 @@
int numPaddingCharacters = length - input.length();
return StringUtils.repeat(padding, numPaddingCharacters) + input;
}
+
+ /**
+ * Parse the query string of an URI to a key value map. If a single key occurred multiple times,
+ * only the first one will take effect.
+ */
+ public static Map<String, String> parseURIQueries(URI uri) {
+ if (StringUtils.isBlank(uri.getRawQuery())) {
+ return Collections.emptyMap();
+ }
+ return Splitter.on('&').trimResults().splitToStream(uri.getRawQuery()).map(kv -> {
+ int idx = kv.indexOf('=');
+ try {
+ if (idx > 0) {
+ return Pair.newPair(
+ URLDecoder.decode(kv.substring(0, idx), StandardCharsets.UTF_8.name()),
+ URLDecoder.decode(kv.substring(idx + 1), StandardCharsets.UTF_8.name()));
+ } else {
+ return Pair.newPair(URLDecoder.decode(kv, StandardCharsets.UTF_8.name()), "");
+ }
+ } catch (UnsupportedEncodingException e) {
+ // should not happen
+ throw new AssertionError(e);
+ }
+ }).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond, (v1, v2) -> v1));
+ }
+
+ /**
+ * Apply the key value pairs in the query string of the given URI to the given Configuration. If a
+ * single key occurred multiple times, only the first one will take effect.
+ */
+ public static void applyURIQueriesToConf(URI uri, Configuration conf) {
+ parseURIQueries(uri).forEach(conf::set);
+ }
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestStrings.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestStrings.java
index e280341..8528fd8 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestStrings.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestStrings.java
@@ -17,51 +17,79 @@
*/
package org.apache.hadoop.hbase.util;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Assert;
import org.junit.ClassRule;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
@Category({ SmallTests.class })
public class TestStrings {
- @Rule
- public final ExpectedException thrown = ExpectedException.none();
-
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestStrings.class);
@Test
public void testAppendKeyValue() {
- Assert.assertEquals("foo, bar=baz",
+ assertEquals("foo, bar=baz",
Strings.appendKeyValue(new StringBuilder("foo"), "bar", "baz").toString());
- Assert.assertEquals("bar->baz",
+ assertEquals("bar->baz",
Strings.appendKeyValue(new StringBuilder(), "bar", "baz", "->", "| ").toString());
- Assert.assertEquals("foo, bar=baz",
+ assertEquals("foo, bar=baz",
Strings.appendKeyValue(new StringBuilder("foo"), "bar", "baz", "=", ", ").toString());
- Assert.assertEquals("foo| bar->baz",
+ assertEquals("foo| bar->baz",
Strings.appendKeyValue(new StringBuilder("foo"), "bar", "baz", "->", "| ").toString());
}
@Test
public void testDomainNamePointerToHostName() {
- Assert.assertNull(Strings.domainNamePointerToHostName(null));
- Assert.assertEquals("foo", Strings.domainNamePointerToHostName("foo"));
- Assert.assertEquals("foo.com", Strings.domainNamePointerToHostName("foo.com"));
- Assert.assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com"));
- Assert.assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com."));
+ assertNull(Strings.domainNamePointerToHostName(null));
+ assertEquals("foo", Strings.domainNamePointerToHostName("foo"));
+ assertEquals("foo.com", Strings.domainNamePointerToHostName("foo.com"));
+ assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com"));
+ assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com."));
}
@Test
public void testPadFront() {
- Assert.assertEquals("ddfoo", Strings.padFront("foo", 'd', 5));
+ assertEquals("ddfoo", Strings.padFront("foo", 'd', 5));
+ assertThrows(IllegalArgumentException.class, () -> Strings.padFront("foo", 'd', 1));
+ }
- thrown.expect(IllegalArgumentException.class);
- Strings.padFront("foo", 'd', 1);
+ @Test
+ public void testParseURIQueries() throws Exception {
+ Map<String,
+ String> queries = Strings.parseURIQueries(new URI("hbase+rpc://server01:123?a=1&b=2&a=3&"
+ + URLEncoder.encode("& ?", StandardCharsets.UTF_8.name()) + "=&"
+ + URLEncoder.encode("===", StandardCharsets.UTF_8.name())));
+ assertEquals("1", queries.get("a"));
+ assertEquals("2", queries.get("b"));
+ assertEquals("", queries.get("& ?"));
+ assertEquals("", queries.get("==="));
+ assertEquals(4, queries.size());
+
+ assertTrue(Strings.parseURIQueries(new URI("hbase+zk://zk1:2181/")).isEmpty());
+ assertTrue(Strings.parseURIQueries(new URI("hbase+zk://zk1:2181/?")).isEmpty());
+ assertTrue(Strings.parseURIQueries(new URI("hbase+zk://zk1:2181/?#anchor")).isEmpty());
+ }
+
+ @Test
+ public void testApplyURIQueriesToConf() throws Exception {
+ Configuration conf = new Configuration();
+ Strings.applyURIQueriesToConf(new URI("hbase+zk://aaa:2181/root?a=1&b=2&c"), conf);
+ assertEquals("1", conf.get("a"));
+ assertEquals("2", conf.get("b"));
+ assertEquals("", conf.get("c"));
}
}