HBASE-28547 Support specifying connection configuration through queries of the connection uri (#5853)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index b9b156b..144a790 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hbase.util.Strings;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -569,10 +570,16 @@
     Configuration conf, final User user, Map<String, byte[]> connectionAttributes) {
     return TraceUtil.tracedFuture(() -> {
       ConnectionRegistry registry;
+      Configuration appliedConf;
       try {
-        registry = connectionUri != null
-          ? ConnectionRegistryFactory.create(connectionUri, conf, user)
-          : ConnectionRegistryFactory.create(conf, user);
+        if (connectionUri != null) {
+          appliedConf = new Configuration(conf);
+          Strings.applyURIQueriesToConf(connectionUri, appliedConf);
+          registry = ConnectionRegistryFactory.create(connectionUri, appliedConf, user);
+        } else {
+          appliedConf = conf;
+          registry = ConnectionRegistryFactory.create(appliedConf, user);
+        }
       } catch (Exception e) {
         return FutureUtils.failedFuture(e);
       }
@@ -588,12 +595,12 @@
           future.completeExceptionally(new IOException("clusterid came back null"));
           return;
         }
-        Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
-          AsyncConnectionImpl.class, AsyncConnection.class);
+        Class<? extends AsyncConnection> clazz = appliedConf.getClass(
+          HBASE_CLIENT_ASYNC_CONNECTION_IMPL, AsyncConnectionImpl.class, AsyncConnection.class);
         try {
-          future.complete(
-            user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
-              .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes)));
+          future.complete(user.runAs((PrivilegedExceptionAction<
+            ? extends AsyncConnection>) () -> ReflectionUtils.newInstance(clazz, appliedConf,
+              registry, clusterId, null, user, connectionAttributes)));
         } catch (Exception e) {
           registry.close();
           future.completeExceptionally(e);
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionFactoryApplyURIQueries.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionFactoryApplyURIQueries.java
new file mode 100644
index 0000000..806c5ed
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionFactoryApplyURIQueries.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestConnectionFactoryApplyURIQueries {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestConnectionFactoryApplyURIQueries.class);
+
+  private Configuration conf;
+
+  private MockedStatic<ConnectionRegistryFactory> mockedConnectionRegistryFactory;
+
+  private ConnectionRegistry registry;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    mockedConnectionRegistryFactory = mockStatic(ConnectionRegistryFactory.class);
+    registry = mock(ConnectionRegistry.class);
+    mockedConnectionRegistryFactory
+      .when(() -> ConnectionRegistryFactory.create(any(), any(), any())).thenReturn(registry);
+    when(registry.getClusterId()).thenReturn(CompletableFuture.completedFuture("cluster"));
+  }
+
+  @After
+  public void tearDown() {
+    mockedConnectionRegistryFactory.closeOnDemand();
+  }
+
+  @Test
+  public void testApplyURIQueries() throws Exception {
+    ConnectionFactory.createConnection(new URI("hbase+rpc://server:16010?a=1&b=2&c"), conf);
+    ArgumentCaptor<Configuration> captor = ArgumentCaptor.forClass(Configuration.class);
+    mockedConnectionRegistryFactory
+      .verify(() -> ConnectionRegistryFactory.create(any(), captor.capture(), any()));
+    Configuration c = captor.getValue();
+    assertEquals("1", c.get("a"));
+    assertEquals("2", c.get("b"));
+    assertEquals("", c.get("c"));
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java
index 3baab9c..b5d760b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java
@@ -17,7 +17,15 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
@@ -94,4 +102,37 @@
     int numPaddingCharacters = length - input.length();
     return StringUtils.repeat(padding, numPaddingCharacters) + input;
   }
+
+  /**
+   * Parse the query string of an URI to a key value map. If a single key occurred multiple times,
+   * only the first one will take effect.
+   */
+  public static Map<String, String> parseURIQueries(URI uri) {
+    if (StringUtils.isBlank(uri.getRawQuery())) {
+      return Collections.emptyMap();
+    }
+    return Splitter.on('&').trimResults().splitToStream(uri.getRawQuery()).map(kv -> {
+      int idx = kv.indexOf('=');
+      try {
+        if (idx > 0) {
+          return Pair.newPair(
+            URLDecoder.decode(kv.substring(0, idx), StandardCharsets.UTF_8.name()),
+            URLDecoder.decode(kv.substring(idx + 1), StandardCharsets.UTF_8.name()));
+        } else {
+          return Pair.newPair(URLDecoder.decode(kv, StandardCharsets.UTF_8.name()), "");
+        }
+      } catch (UnsupportedEncodingException e) {
+        // should not happen
+        throw new AssertionError(e);
+      }
+    }).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond, (v1, v2) -> v1));
+  }
+
+  /**
+   * Apply the key value pairs in the query string of the given URI to the given Configuration. If a
+   * single key occurred multiple times, only the first one will take effect.
+   */
+  public static void applyURIQueriesToConf(URI uri, Configuration conf) {
+    parseURIQueries(uri).forEach(conf::set);
+  }
 }
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestStrings.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestStrings.java
index e280341..8528fd8 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestStrings.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestStrings.java
@@ -17,51 +17,79 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Assert;
 import org.junit.ClassRule;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
 
 @Category({ SmallTests.class })
 public class TestStrings {
 
-  @Rule
-  public final ExpectedException thrown = ExpectedException.none();
-
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestStrings.class);
 
   @Test
   public void testAppendKeyValue() {
-    Assert.assertEquals("foo, bar=baz",
+    assertEquals("foo, bar=baz",
       Strings.appendKeyValue(new StringBuilder("foo"), "bar", "baz").toString());
-    Assert.assertEquals("bar->baz",
+    assertEquals("bar->baz",
       Strings.appendKeyValue(new StringBuilder(), "bar", "baz", "->", "| ").toString());
-    Assert.assertEquals("foo, bar=baz",
+    assertEquals("foo, bar=baz",
       Strings.appendKeyValue(new StringBuilder("foo"), "bar", "baz", "=", ", ").toString());
-    Assert.assertEquals("foo| bar->baz",
+    assertEquals("foo| bar->baz",
       Strings.appendKeyValue(new StringBuilder("foo"), "bar", "baz", "->", "| ").toString());
   }
 
   @Test
   public void testDomainNamePointerToHostName() {
-    Assert.assertNull(Strings.domainNamePointerToHostName(null));
-    Assert.assertEquals("foo", Strings.domainNamePointerToHostName("foo"));
-    Assert.assertEquals("foo.com", Strings.domainNamePointerToHostName("foo.com"));
-    Assert.assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com"));
-    Assert.assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com."));
+    assertNull(Strings.domainNamePointerToHostName(null));
+    assertEquals("foo", Strings.domainNamePointerToHostName("foo"));
+    assertEquals("foo.com", Strings.domainNamePointerToHostName("foo.com"));
+    assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com"));
+    assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com."));
   }
 
   @Test
   public void testPadFront() {
-    Assert.assertEquals("ddfoo", Strings.padFront("foo", 'd', 5));
+    assertEquals("ddfoo", Strings.padFront("foo", 'd', 5));
+    assertThrows(IllegalArgumentException.class, () -> Strings.padFront("foo", 'd', 1));
+  }
 
-    thrown.expect(IllegalArgumentException.class);
-    Strings.padFront("foo", 'd', 1);
+  @Test
+  public void testParseURIQueries() throws Exception {
+    Map<String,
+      String> queries = Strings.parseURIQueries(new URI("hbase+rpc://server01:123?a=1&b=2&a=3&"
+        + URLEncoder.encode("& ?", StandardCharsets.UTF_8.name()) + "=&"
+        + URLEncoder.encode("===", StandardCharsets.UTF_8.name())));
+    assertEquals("1", queries.get("a"));
+    assertEquals("2", queries.get("b"));
+    assertEquals("", queries.get("& ?"));
+    assertEquals("", queries.get("==="));
+    assertEquals(4, queries.size());
+
+    assertTrue(Strings.parseURIQueries(new URI("hbase+zk://zk1:2181/")).isEmpty());
+    assertTrue(Strings.parseURIQueries(new URI("hbase+zk://zk1:2181/?")).isEmpty());
+    assertTrue(Strings.parseURIQueries(new URI("hbase+zk://zk1:2181/?#anchor")).isEmpty());
+  }
+
+  @Test
+  public void testApplyURIQueriesToConf() throws Exception {
+    Configuration conf = new Configuration();
+    Strings.applyURIQueriesToConf(new URI("hbase+zk://aaa:2181/root?a=1&b=2&c"), conf);
+    assertEquals("1", conf.get("a"));
+    assertEquals("2", conf.get("b"));
+    assertEquals("", conf.get("c"));
   }
 }