[CALCITE-5581] Implement Basic client side load balancing in Avatica Driver
diff --git a/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java b/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
index 62c5702..50ddedd 100644
--- a/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
+++ b/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
@@ -16,9 +16,12 @@
*/
package org.apache.calcite.avatica;
+import org.apache.calcite.avatica.ha.ShuffledRoundRobinLBStrategy;
import org.apache.calcite.avatica.remote.AvaticaHttpClientFactoryImpl;
import org.apache.calcite.avatica.remote.HostnameVerificationConfigurable.HostnameVerification;
+import org.apache.hc.core5.util.Timeout;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
@@ -33,6 +36,7 @@
* Enumeration of Avatica's built-in connection properties.
*/
public enum BuiltInConnectionProperty implements ConnectionProperty {
+
/** Factory. */
FACTORY("factory", Type.PLUGIN, null, false),
@@ -91,7 +95,36 @@
TRANSPARENT_RECONNECTION("transparent_reconnection", Type.BOOLEAN, Boolean.FALSE, false),
/** Number of rows to fetch per call. */
- FETCH_SIZE("fetch_size", Type.NUMBER, AvaticaStatement.DEFAULT_FETCH_SIZE, false);
+ FETCH_SIZE("fetch_size", Type.NUMBER, AvaticaStatement.DEFAULT_FETCH_SIZE, false),
+
+ /** Avatica connection HA property - use client side load balancing **/
+ USE_CLIENT_SIDE_LB("use_client_side_lb", Type.BOOLEAN, Boolean.FALSE, false),
+
+ /** Avatica connection HA property - Load balanced URLs **/
+ LB_URLS("lb_urls", Type.STRING, "", false),
+
+ /** Avatica connection HA property - Load balancing strategy **/
+ LB_STRATEGY("lb_strategy", Type.PLUGIN,
+ ShuffledRoundRobinLBStrategy.class.getName(), false),
+
+ /**
+ * The number of retries we need for failover during client side load balancing.
+ */
+ LB_CONNECTION_FAILOVER_RETRIES("lb_connection_failover_retries",
+ Type.NUMBER, 3, false),
+
+ /**
+ * The amount of time in millis that the driver should wait before attempting
+ * connection failover
+ */
+ LB_CONNECTION_FAILOVER_SLEEP_TIME("lb_connection_failover_sleep_time",
+ Type.NUMBER, 1000, false),
+
+ /**
+ * HTTP Connection Timeout in milliseconds.
+ */
+ HTTP_CONNECTION_TIMEOUT("http_connection_timeout",
+ Type.NUMBER, Timeout.ofMinutes(3).toMilliseconds(), false);
private final String camelName;
private final Type type;
@@ -117,6 +150,7 @@
for (BuiltInConnectionProperty p : BuiltInConnectionProperty.values()) {
LOCAL_PROPS.add(p.camelName());
}
+
}
BuiltInConnectionProperty(String camelName, Type type, Object defaultValue,
diff --git a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
index bc06f8a..9388dba 100644
--- a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
+++ b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
@@ -16,6 +16,7 @@
*/
package org.apache.calcite.avatica;
+import org.apache.calcite.avatica.ha.LBStrategy;
import org.apache.calcite.avatica.remote.AvaticaHttpClientFactory;
import org.apache.calcite.avatica.remote.HostnameVerificationConfigurable.HostnameVerification;
import org.apache.calcite.avatica.remote.Service;
@@ -66,6 +67,18 @@
boolean transparentReconnectionEnabled();
/** @see BuiltInConnectionProperty#FETCH_SIZE */
int fetchSize();
+ /** @see BuiltInConnectionProperty#USE_CLIENT_SIDE_LB #**/
+ boolean useClientSideLb();
+ /** @see BuiltInConnectionProperty#LB_URLS **/
+ String getLbURLs();
+ /** @see BuiltInConnectionProperty#LB_STRATEGY **/
+ LBStrategy getLBStrategy();
+ /** @see BuiltInConnectionProperty#LB_CONNECTION_FAILOVER_RETRIES **/
+ int getLBConnectionFailoverRetries();
+ /** @see BuiltInConnectionProperty#LB_CONNECTION_FAILOVER_SLEEP_TIME **/
+ long getLBConnectionFailoverSleepTime();
+ /** @see BuiltInConnectionProperty#HTTP_CONNECTION_TIMEOUT **/
+ long getHttpConnectionTimeout();
}
// End ConnectionConfig.java
diff --git a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
index 0bb3677..eb79da6 100644
--- a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
+++ b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.calcite.avatica;
+import org.apache.calcite.avatica.ha.LBStrategy;
import org.apache.calcite.avatica.remote.AvaticaHttpClientFactory;
import org.apache.calcite.avatica.remote.HostnameVerificationConfigurable.HostnameVerification;
import org.apache.calcite.avatica.remote.Service;
@@ -29,6 +30,7 @@
/** Implementation of {@link ConnectionConfig}. */
public class ConnectionConfigImpl implements ConnectionConfig {
+
protected final Properties properties;
public ConnectionConfigImpl(Properties properties) {
@@ -137,6 +139,31 @@
return BuiltInConnectionProperty.FETCH_SIZE.wrap(properties).getInt();
}
+ @Override public boolean useClientSideLb() {
+ return BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.wrap(properties).getBoolean();
+ }
+
+ public String getLbURLs() {
+ return BuiltInConnectionProperty.LB_URLS.wrap(properties).getString();
+ }
+
+ public LBStrategy getLBStrategy() {
+ return BuiltInConnectionProperty.LB_STRATEGY.wrap(properties)
+ .getPlugin(LBStrategy.class, null);
+ }
+
+ public int getLBConnectionFailoverRetries() {
+ return BuiltInConnectionProperty.LB_CONNECTION_FAILOVER_RETRIES.wrap(properties).getInt();
+ }
+
+ public long getLBConnectionFailoverSleepTime() {
+ return BuiltInConnectionProperty.LB_CONNECTION_FAILOVER_SLEEP_TIME.wrap(properties).getLong();
+ }
+
+ public long getHttpConnectionTimeout() {
+ return BuiltInConnectionProperty.HTTP_CONNECTION_TIMEOUT.wrap(properties).getLong();
+ }
+
/** Converts a {@link Properties} object containing (name, value)
* pairs into a map whose keys are
* {@link org.apache.calcite.avatica.InternalProperty} objects.
diff --git a/core/src/main/java/org/apache/calcite/avatica/ha/LBStrategy.java b/core/src/main/java/org/apache/calcite/avatica/ha/LBStrategy.java
new file mode 100644
index 0000000..a867489
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/avatica/ha/LBStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.ha;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+
+public interface LBStrategy {
+ /**
+ * Get load balanced URL given the connection configuration.
+ */
+ String getLbURL(ConnectionConfig config);
+}
diff --git a/core/src/main/java/org/apache/calcite/avatica/ha/RandomSelectLBStrategy.java b/core/src/main/java/org/apache/calcite/avatica/ha/RandomSelectLBStrategy.java
new file mode 100644
index 0000000..171051d
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/avatica/ha/RandomSelectLBStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.ha;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+/**
+ * Random Select strategy for client side load balancing.
+ */
+public class RandomSelectLBStrategy implements LBStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(RandomSelectLBStrategy.class);
+ public static final String URL_SEPERATOR_CHAR = ",";
+ private final Random random = new Random();
+ @Override
+ public String getLbURL(ConnectionConfig config) {
+ String[] urls = config.getLbURLs().split(URL_SEPERATOR_CHAR);
+ String url = urls[random.nextInt(urls.length)];
+ LOG.info("Selected URL:{}", url);
+ return url;
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/avatica/ha/RoundRobinLBStrategy.java b/core/src/main/java/org/apache/calcite/avatica/ha/RoundRobinLBStrategy.java
new file mode 100644
index 0000000..44451df
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/avatica/ha/RoundRobinLBStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.ha;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Round Robin strategy for client side load balancing.
+ * Its implemented it as a singleton so that we can maintain state
+ * i.e. which URL was last used from the list of URLs specified.
+ */
+public class RoundRobinLBStrategy implements LBStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(RoundRobinLBStrategy.class);
+
+ public static final RoundRobinLBStrategy INSTANCE = new RoundRobinLBStrategy();
+ private RoundRobinLBStrategy() { }
+ public static final String URL_SEPERATOR_CHAR = ",";
+
+ Map<String, Integer> configToIndexServedMap = new HashMap<>();
+ Map<String, String[]> configToUrlListMap = new HashMap<>();
+
+ @Override
+ public synchronized String getLbURL(ConnectionConfig config) {
+ String key = getKey(config);
+ if (!configToIndexServedMap.containsKey(key)) {
+ configToIndexServedMap.put(key, 0);
+ configToUrlListMap.put(key, config.getLbURLs().split(URL_SEPERATOR_CHAR));
+ }
+ String[] urls = configToUrlListMap.get(key);
+ int urlIndex = configToIndexServedMap.get(key);
+ configToIndexServedMap.put(key, (urlIndex + 1) % urls.length);
+ String url = urls[urlIndex];
+ LOG.info("Selected URL:{}", url);
+ return url;
+ }
+ private static String getKey(ConnectionConfig config) {
+ return config.getLbURLs();
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/avatica/ha/ShuffledRoundRobinLBStrategy.java b/core/src/main/java/org/apache/calcite/avatica/ha/ShuffledRoundRobinLBStrategy.java
new file mode 100644
index 0000000..ecbfa6b
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/avatica/ha/ShuffledRoundRobinLBStrategy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.ha;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+/**
+ * Shuffled Round Robin strategy for client side load balancing.
+ * It starts with some random position in a list of URLs, and then returns subsequent URL
+ * in a RoundRobin manner.
+ * It's implemented it as a singleton so that we can maintain state
+ * i.e. which URL was last used from the list of URLs specified.
+ */
+public class ShuffledRoundRobinLBStrategy implements LBStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(ShuffledRoundRobinLBStrategy.class);
+
+ public static final ShuffledRoundRobinLBStrategy INSTANCE = new ShuffledRoundRobinLBStrategy();
+ private ShuffledRoundRobinLBStrategy() { }
+ public static final String URL_SEPERATOR_CHAR = ",";
+
+ Map<String, Integer> configToIndexServedMap = new HashMap<>();
+ Map<String, String[]> configToUrlListMap = new HashMap<>();
+
+ @Override
+ public synchronized String getLbURL(ConnectionConfig config) {
+ String key = getKey(config);
+ String lbURLs = config.getLbURLs();
+ if (!configToIndexServedMap.containsKey(key)) {
+ configToIndexServedMap.put(key, 0);
+ initialiseUrlList(key, lbURLs);
+ }
+ String[] urls = configToUrlListMap.get(key);
+ int urlIndex = configToIndexServedMap.get(key);
+
+ String url = urls[urlIndex];
+ LOG.info("Selected URL:{}", url);
+ urlIndex = (urlIndex + 1) % urls.length;
+ configToIndexServedMap.put(key, urlIndex);
+ return url;
+ }
+
+ private void initialiseUrlList(String key, String lbURLs) {
+ String[] urls = lbURLs.split(URL_SEPERATOR_CHAR);
+ List<String> list = Arrays.asList(urls);
+ Collections.shuffle(list);
+ urls = list.toArray(urls);
+ configToUrlListMap.put(key, urls);
+ }
+
+ private static String getKey(ConnectionConfig config) {
+ return config.getLbURLs();
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/avatica/ha/package-info.java b/core/src/main/java/org/apache/calcite/avatica/ha/package-info.java
new file mode 100644
index 0000000..eddc6be
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/avatica/ha/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * JDBC driver with client side load balancing feature.
+ */
+package org.apache.calcite.avatica.ha;
+
+// End package-info.java
diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java b/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
index 66b7ca8..1c5327f 100644
--- a/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
+++ b/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.avatica.remote;
+import org.apache.calcite.avatica.ConnectionConfig;
+
import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.client5.http.SystemDefaultDnsResolver;
import org.apache.hc.client5.http.auth.AuthSchemeFactory;
@@ -27,6 +29,7 @@
import org.apache.hc.client5.http.auth.StandardAuthScheme;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.classic.methods.HttpPost;
+import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.auth.BasicAuthCache;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.auth.BasicSchemeFactory;
@@ -34,6 +37,7 @@
import org.apache.hc.client5.http.impl.auth.SPNegoSchemeFactory;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -57,6 +61,7 @@
import java.net.URL;
import java.security.Principal;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
/**
* A common class to invoke HTTP requests against the Avatica server agnostic of the data being
@@ -91,11 +96,18 @@
this.uri = toURI(Objects.requireNonNull(url));
}
- protected void initializeClient(PoolingHttpClientConnectionManager pool) {
+ protected void initializeClient(PoolingHttpClientConnectionManager pool,
+ ConnectionConfig config) {
this.authCache = new BasicAuthCache();
// A single thread-safe HttpClient, pooling connections via the
// ConnectionManager
- this.client = HttpClients.custom().setConnectionManager(pool).build();
+ RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+ RequestConfig requestConfig = requestConfigBuilder
+ .setConnectTimeout(config.getHttpConnectionTimeout(), TimeUnit.MILLISECONDS)
+ .build();
+ HttpClientBuilder httpClientBuilder = HttpClients.custom().setConnectionManager(pool)
+ .setDefaultRequestConfig(requestConfig);
+ this.client = httpClientBuilder.build();
}
@Override public byte[] send(byte[] request) {
@@ -217,8 +229,9 @@
}
}
- @Override public void setHttpClientPool(PoolingHttpClientConnectionManager pool) {
- initializeClient(pool);
+ @Override public void setHttpClientPool(PoolingHttpClientConnectionManager pool,
+ ConnectionConfig config) {
+ initializeClient(pool, config);
}
}
diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java b/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
index e0f3446..0b9d66c 100644
--- a/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
+++ b/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
@@ -64,7 +64,7 @@
if (client instanceof HttpClientPoolConfigurable) {
PoolingHttpClientConnectionManager pool = CommonsHttpClientPoolCache.getPool(config);
- ((HttpClientPoolConfigurable) client).setHttpClientPool(pool);
+ ((HttpClientPoolConfigurable) client).setHttpClientPool(pool, config);
} else {
// Kept for backwards compatibility, the current AvaticaCommonsHttpClientImpl
// does not implement these interfaces
diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java b/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java
index 49dbd04..064a2a7 100644
--- a/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java
+++ b/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java
@@ -59,7 +59,7 @@
*/
public enum Serialization {
JSON,
- PROTOBUF;
+ PROTOBUF
}
@Override protected String getConnectStringPrefix() {
@@ -77,7 +77,7 @@
}
@Override protected Collection<ConnectionProperty> getConnectionProperties() {
- final List<ConnectionProperty> list = new ArrayList<ConnectionProperty>();
+ final List<ConnectionProperty> list = new ArrayList<>();
Collections.addAll(list, BuiltInConnectionProperty.values());
Collections.addAll(list, AvaticaRemoteConnectionProperty.values());
return list;
@@ -135,7 +135,7 @@
throw new IllegalArgumentException("Unhandled serialization type: " + serializationType);
}
} else {
- service = new MockJsonService(Collections.<String, String>emptyMap());
+ service = new MockJsonService(Collections.emptyMap());
}
return service;
}
@@ -149,8 +149,14 @@
*/
AvaticaHttpClient getHttpClient(AvaticaConnection connection, ConnectionConfig config) {
URL url;
+ String urlStr;
+ if (config.useClientSideLb()) {
+ urlStr = config.getLBStrategy().getLbURL(config);
+ } else {
+ urlStr = config.url();
+ }
try {
- url = new URL(config.url());
+ url = new URL(urlStr);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
@@ -159,25 +165,56 @@
return httpClientFactory.getClient(url, config, connection.getKerberosConnection());
}
-
@Override public Connection connect(String url, Properties info)
throws SQLException {
- AvaticaConnection conn = (AvaticaConnection) super.connect(url, info);
- if (conn == null) {
- // It's not an url for our driver
- return null;
- }
+ int retries = 0;
+ int currentRetry = 0;
+ long failoverSleepTime = 0;
+ do {
+ long startTime = System.currentTimeMillis();
+ AvaticaConnection conn = (AvaticaConnection) super.connect(url, info);
+ if (conn == null) {
+ // It's not an url for our driver
+ return null;
+ }
- Service service = conn.getService();
+ ConnectionConfig config = conn.config();
+ if (config.useClientSideLb()) {
+ retries = config.getLBConnectionFailoverRetries();
+ failoverSleepTime = config.getLBConnectionFailoverSleepTime();
+ }
- // super.connect(...) should be creating a service and setting it in the AvaticaConnection
- assert null != service;
+ Service service = conn.getService();
- service.apply(
- new Service.OpenConnectionRequest(conn.id,
- Service.OpenConnectionRequest.serializeProperties(info)));
-
- return conn;
+ // super.connect(...) should be creating a service and setting it in the AvaticaConnection
+ assert null != service;
+ try {
+ service.apply(
+ new Service.OpenConnectionRequest(conn.id,
+ Service.OpenConnectionRequest.serializeProperties(info)));
+ return conn;
+ } catch (Exception e) {
+ long endTime = System.currentTimeMillis();
+ long elapsedTime = endTime - startTime;
+ LOG.warn("Connection Failed: {}", e.getMessage());
+ LOG.debug("Failure detected in: {} milliseconds", elapsedTime);
+ if (currentRetry < retries) {
+ currentRetry++;
+ if (failoverSleepTime > 0) {
+ try {
+ LOG.info("Sleeping for {} milliseconds before load balancer failover",
+ failoverSleepTime);
+ Thread.sleep(failoverSleepTime);
+ } catch (InterruptedException ex) {
+ throw new SQLException(ex);
+ }
+ }
+ LOG.info("Load balancer failover retry: {}", currentRetry);
+ } else {
+ throw e;
+ }
+ }
+ } while (true);
}
Serialization getSerialization(ConnectionConfig config) {
diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/HttpClientPoolConfigurable.java b/core/src/main/java/org/apache/calcite/avatica/remote/HttpClientPoolConfigurable.java
index 7bdf24e..80b3a94 100644
--- a/core/src/main/java/org/apache/calcite/avatica/remote/HttpClientPoolConfigurable.java
+++ b/core/src/main/java/org/apache/calcite/avatica/remote/HttpClientPoolConfigurable.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.avatica.remote;
+import org.apache.calcite.avatica.ConnectionConfig;
+
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
/**
@@ -27,9 +29,10 @@
* Sets a PoolingHttpClientConnectionManager containing the collection of SSL/TLS server
* keys and truststores to use for HTTPS calls.
*
- * @param pool The http connection pool
+ * @param pool The http connection pool
+ * @param config The connection config
*/
- void setHttpClientPool(PoolingHttpClientConnectionManager pool);
+ void setHttpClientPool(PoolingHttpClientConnectionManager pool, ConnectionConfig config);
}
// End HttpClientPoolConfigurable.java
diff --git a/core/src/test/java/org/apache/calcite/avatica/ha/RandomSelectLBStrategyTest.java b/core/src/test/java/org/apache/calcite/avatica/ha/RandomSelectLBStrategyTest.java
new file mode 100644
index 0000000..b5df977
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/avatica/ha/RandomSelectLBStrategyTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.ha;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RandomSelectLBStrategyTest {
+
+ ConnectionConfig mockedConnectionConfig = Mockito.mock(ConnectionConfig.class);
+ RandomSelectLBStrategy randomSelectLBStrategy = new RandomSelectLBStrategy();
+
+ int numberOfHost = 100;
+
+ @Test
+ public void getLbURL() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numberOfHost; i++) {
+ sb.append("http://host").append(i).append(",");
+ }
+ String inputString = sb.substring(0, sb.length() - 1);
+ Mockito.when(mockedConnectionConfig.getLbURLs()).thenReturn(inputString);
+ sb.delete(0, sb.length());
+ for (int i = 0; i < numberOfHost; i++) {
+ sb.append(randomSelectLBStrategy.getLbURL(mockedConnectionConfig)).append(",");
+ }
+ String actualString = sb.substring(0, sb.length() - 1);
+
+ Assert.assertNotEquals(inputString, actualString);
+ }
+}
diff --git a/core/src/test/java/org/apache/calcite/avatica/ha/RoundRobinLBStrategyTest.java b/core/src/test/java/org/apache/calcite/avatica/ha/RoundRobinLBStrategyTest.java
new file mode 100644
index 0000000..356c0d4
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/avatica/ha/RoundRobinLBStrategyTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.ha;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RoundRobinLBStrategyTest {
+
+ ConnectionConfig mockedConnectionConfig = Mockito.mock(ConnectionConfig.class);
+ RoundRobinLBStrategy roundRobinLBStrategy = RoundRobinLBStrategy.INSTANCE;
+
+ @Test
+ public void getLbURL() {
+ String inputString = "http://host1.com,http://host2.com,http://host3.com";
+ Mockito.when(mockedConnectionConfig.getLbURLs()).thenReturn(inputString);
+ String[] urls = inputString.split(",");
+
+ Assert.assertEquals(urls[0], roundRobinLBStrategy.getLbURL(mockedConnectionConfig));
+ Assert.assertEquals(urls[1], roundRobinLBStrategy.getLbURL(mockedConnectionConfig));
+ Assert.assertEquals(urls[2], roundRobinLBStrategy.getLbURL(mockedConnectionConfig));
+ }
+
+}
diff --git a/core/src/test/java/org/apache/calcite/avatica/ha/ShuffledRoundRobinLBStrategyTest.java b/core/src/test/java/org/apache/calcite/avatica/ha/ShuffledRoundRobinLBStrategyTest.java
new file mode 100644
index 0000000..57737dc
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/avatica/ha/ShuffledRoundRobinLBStrategyTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.ha;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Optional;
+
+public class ShuffledRoundRobinLBStrategyTest {
+
+ ConnectionConfig mockedConnectionConfig = Mockito.mock(ConnectionConfig.class);
+ ShuffledRoundRobinLBStrategy shuffledRoundRobinLBStrategy =
+ Mockito.spy(ShuffledRoundRobinLBStrategy.INSTANCE);
+
+ @Test
+ public void getLbURL() throws NoSuchFieldException, IllegalAccessException {
+ String inputString = "http://host1.com,http://host2.com,http://host3.com";
+ Mockito.when(mockedConnectionConfig.getLbURLs()).thenReturn(inputString);
+ String firstURL = shuffledRoundRobinLBStrategy.getLbURL(mockedConnectionConfig);
+
+ String[] expectedUrls = getShuffledURLsFromStateState(
+ shuffledRoundRobinLBStrategy);
+
+ Assert.assertEquals(expectedUrls[0], firstURL);
+
+ for (int i = 1; i < expectedUrls.length; i++) {
+ Assert.assertEquals(expectedUrls[i],
+ shuffledRoundRobinLBStrategy.getLbURL(mockedConnectionConfig));
+ }
+ }
+ private String[] getShuffledURLsFromStateState(
+ ShuffledRoundRobinLBStrategy shuffledRoundRobinLBStrategy)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field fConfigToUrlListMap = ShuffledRoundRobinLBStrategy
+ .class.getDeclaredField("configToUrlListMap");
+ fConfigToUrlListMap.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Map<String, String[]> configToUrlListMap = (Map<String, String[]>) fConfigToUrlListMap
+ .get(shuffledRoundRobinLBStrategy);
+ Optional<String[]> oCachedUrls = configToUrlListMap.values().stream().findFirst();
+ Assert.assertTrue(oCachedUrls.isPresent());
+ return oCachedUrls.get();
+ }
+}
diff --git a/server/src/test/java/org/apache/calcite/avatica/ha/ConnectionPropertiesHATest.java b/server/src/test/java/org/apache/calcite/avatica/ha/ConnectionPropertiesHATest.java
new file mode 100644
index 0000000..ff81f67
--- /dev/null
+++ b/server/src/test/java/org/apache/calcite/avatica/ha/ConnectionPropertiesHATest.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.ha;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.BuiltInConnectionProperty;
+import org.apache.calcite.avatica.remote.AvaticaCommonsHttpClientImpl;
+import org.apache.calcite.avatica.remote.AvaticaHttpClient;
+import org.apache.calcite.avatica.remote.AvaticaServersForTest;
+import org.apache.calcite.avatica.remote.Driver;
+import org.apache.calcite.avatica.remote.RemoteProtobufService;
+import org.apache.calcite.avatica.server.AvaticaProtobufHandler;
+import org.apache.calcite.avatica.server.HttpServer;
+import org.apache.calcite.avatica.server.Main;
+
+import org.apache.hc.client5.http.ConnectTimeoutException;
+import org.apache.hc.client5.http.HttpHostConnectException;
+import org.apache.hc.core5.util.Timeout;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConnectionPropertiesHATest {
+ private static final AvaticaServersForTest SERVERS = new AvaticaServersForTest();
+ private static final String[] SERVER_ARGS = {
+ AvaticaServersForTest.FullyRemoteJdbcMetaFactory.class.getName()
+ };
+ public static final int NO_OF_SERVERS = 5;
+ public static final String HTTP_LOCALHOST = "http://localhost:";
+ public static final String COMMA = ",";
+ public static final String OS_NAME_LOWERCASE =
+ System.getProperty("os.name").toLowerCase(Locale.ROOT);
+ public static final String WINDOWS_OS_PREFIX = "windows";
+ private static String lbURLs = "";
+ private static final int START_PORT = 10000;
+ private static String[] urls;
+
+ @BeforeClass
+ public static void beforeClass()
+ throws ClassNotFoundException,
+ InvocationTargetException,
+ InstantiationException,
+ IllegalAccessException,
+ NoSuchMethodException {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < NO_OF_SERVERS; i++) {
+ int port = START_PORT + i;
+ Main.start(SERVER_ARGS, port, AvaticaProtobufHandler::new);
+ sb.append(HTTP_LOCALHOST).append(port).append(COMMA);
+ }
+ lbURLs = sb.substring(0, sb.length() - 1);
+ urls = lbURLs.split(COMMA);
+ }
+
+ @Test
+ public void connectionPropertiesNoHATest() throws Exception {
+ Properties properties = new Properties();
+ String url = SERVERS.getJdbcUrl(10000, Driver.Serialization.PROTOBUF);
+ AvaticaConnection conn1 = (AvaticaConnection) DriverManager.getConnection(url, properties);
+ Assert.assertNotNull(conn1);
+ }
+
+ @Test
+ public void connectionPropertiesHATestRandomSelectLB() {
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), lbURLs);
+ properties.put(
+ BuiltInConnectionProperty.LB_STRATEGY.name(), RandomSelectLBStrategy.class.getName());
+
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+
+ for (int i = 0; i < NO_OF_SERVERS; i++) {
+ try {
+ getConnectionURI((AvaticaConnection) DriverManager.getConnection(url, properties));
+ } catch (Exception e) {
+ Assert.fail(); // Verify that exception is not generated.
+ }
+ }
+ }
+
+ @Test
+ public void connectionPropertiesHATestRoundRobinLB() throws Exception {
+
+ resetRoundRobinLBStrategyState();
+
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), lbURLs);
+ properties.put(
+ BuiltInConnectionProperty.LB_STRATEGY.name(), RoundRobinLBStrategy.class.getName());
+
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+
+ String uri1 =
+ getConnectionURI((AvaticaConnection) DriverManager.getConnection(url, properties));
+ Assert.assertEquals(urls[0], uri1);
+
+ String uri2 =
+ getConnectionURI((AvaticaConnection) DriverManager.getConnection(url, properties));
+ Assert.assertEquals(urls[1], uri2);
+
+ String uri3 =
+ getConnectionURI((AvaticaConnection) DriverManager.getConnection(url, properties));
+ Assert.assertEquals(urls[2], uri3);
+ }
+
+ @Test
+ public void connectionPropertiesHATestShuffledRoundRobinLB() throws Exception {
+ resetShuffledRoundRobinLBStrategyState();
+
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), lbURLs);
+ properties.put(
+ BuiltInConnectionProperty.LB_STRATEGY.name(), ShuffledRoundRobinLBStrategy.class.getName());
+
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+
+ String firstConnectiondURL =
+ getConnectionURI((AvaticaConnection) DriverManager.getConnection(url, properties));
+
+ Assert.assertNotNull(firstConnectiondURL);
+
+ for (int i = 0; i < NO_OF_SERVERS; i++) {
+ try {
+ getConnectionURI((AvaticaConnection) DriverManager.getConnection(url, properties));
+ } catch (Exception e) {
+ Assert.fail(); // In System test verify that exception is not generated.
+ }
+ }
+ }
+
+ @Test
+ public void connectionPropertiesHATestInvalidLB() throws Exception {
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), lbURLs);
+ properties.put(BuiltInConnectionProperty.LB_STRATEGY.name(), "com.incorrect.badLb");
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+ try {
+ DriverManager.getConnection(url, properties);
+ } catch (RuntimeException re) {
+ Assert.assertTrue(re.getCause() instanceof ClassNotFoundException);
+ }
+ }
+
+ @Test
+ public void testConnectionPropertiesHATestLongURlList() throws Exception {
+ resetRoundRobinLBStrategyState();
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 1000; i++) {
+ sb.append("http://localhost:").append(START_PORT + i).append(",");
+ }
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), sb.substring(0, sb.length() - 1));
+ properties.put(
+ BuiltInConnectionProperty.LB_STRATEGY.name(), RoundRobinLBStrategy.class.getName());
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+
+ AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url, properties);
+ String uri1 = getConnectionURI(conn);
+ Assert.assertEquals(urls[0], uri1);
+ }
+
+ @Test
+ public void testConnectionPropertiesHATestInvalidLBUrl() throws Exception {
+ resetRoundRobinLBStrategyState();
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), "http://invalid:" + START_PORT);
+ properties.put(
+ BuiltInConnectionProperty.LB_STRATEGY.name(), RoundRobinLBStrategy.class.getName());
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+ try {
+ DriverManager.getConnection(url, properties);
+ } catch (RuntimeException re) {
+ Assert.assertTrue(re.getCause() instanceof UnknownHostException);
+ }
+ }
+
+ @Test
+ public void testConnectionPropertiesHALBFailover() throws Exception {
+ resetRoundRobinLBStrategyState();
+
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(BuiltInConnectionProperty.LB_CONNECTION_FAILOVER_RETRIES.name(), "1");
+ properties.put(BuiltInConnectionProperty.LB_CONNECTION_FAILOVER_SLEEP_TIME.name(), "100");
+ properties.put(BuiltInConnectionProperty.HTTP_CONNECTION_TIMEOUT.name(), "300");
+ properties.put(
+ BuiltInConnectionProperty.LB_STRATEGY.name(), RoundRobinLBStrategy.class.getName());
+
+ // Invalid URL at first position in lb_urls
+ StringBuilder sb = new StringBuilder("http://invalidurl:").append(START_PORT).append(",");
+
+ // Put Valid URL at second position in lb_urls. This should be returned during failover.
+ sb.append(urls[0]).append(",");
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), sb.substring(0, sb.length() - 1));
+
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+ AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url, properties);
+ String uri = getConnectionURI(connection);
+ Assert.assertEquals(urls[0], uri);
+ }
+
+ @Test
+ public void testConnectionPropertiesHAHttpConnectionTimeout5Sec() throws Exception {
+ // Skip the test for Windows.
+ Assume.assumeFalse(OS_NAME_LOWERCASE.startsWith(WINDOWS_OS_PREFIX));
+ Properties properties = new Properties();
+
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(BuiltInConnectionProperty.HTTP_CONNECTION_TIMEOUT.name(), "5000");
+ properties.put(BuiltInConnectionProperty.LB_CONNECTION_FAILOVER_RETRIES.name(), "0");
+ // 240.0.0.1 is special URL which should result in connection timeout.
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), "http://240.0.0.1:" + 9000);
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+ long startTime = System.currentTimeMillis();
+ try {
+ DriverManager.getConnection(url, properties);
+ } catch (RuntimeException re) {
+ long endTime = System.currentTimeMillis();
+ long elapsedTime = endTime - startTime;
+ Assert.assertTrue(elapsedTime < Timeout.ofMinutes(3).toMilliseconds());
+ Assert.assertTrue(elapsedTime >= 5000);
+ Assert.assertTrue(re.getCause() instanceof ConnectTimeoutException);
+ }
+ }
+
+ @Test
+ public void testConnectionPropertiesCreateStatementAfterDisconnect() throws Exception {
+ resetRoundRobinLBStrategyState();
+ // Start a new server at port 100 port from the startport
+ int test_server_port = START_PORT + 100;
+ HttpServer avaticaServer =
+ Main.start(SERVER_ARGS, test_server_port, AvaticaProtobufHandler::new);
+
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(BuiltInConnectionProperty.LB_CONNECTION_FAILOVER_RETRIES.name(), "2");
+ properties.put(BuiltInConnectionProperty.LB_CONNECTION_FAILOVER_SLEEP_TIME.name(), "100");
+ properties.put(BuiltInConnectionProperty.HTTP_CONNECTION_TIMEOUT.name(), "300");
+ properties.put(
+ BuiltInConnectionProperty.LB_STRATEGY.name(), RoundRobinLBStrategy.class.getName());
+ StringBuilder sb = new StringBuilder();
+ // First URL will be server we started in this test
+ sb.append("http://localhost:").append(test_server_port).append(",");
+ for (int i = 0; i < NO_OF_SERVERS; i++) {
+ sb.append("http://localhost:").append(START_PORT + i).append(",");
+ }
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), sb.substring(0, sb.length() - 1));
+
+ // Create a connection
+ String url = SERVERS.getJdbcUrl(test_server_port, Driver.Serialization.PROTOBUF);
+ AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url, properties);
+
+ // Create statement
+ Statement stmt = conn.createStatement();
+
+ String tableName = "TEST_TABLE";
+ // Execute some queries
+ assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName));
+ assertFalse(stmt.execute("CREATE TABLE " + tableName + " (pk integer, msg varchar(10))"));
+ assertEquals(1, stmt.executeUpdate("INSERT INTO " + tableName + " VALUES(1, 'abcd')"));
+
+ ResultSet results = stmt.executeQuery("SELECT count(1) FROM " + tableName);
+ assertNotNull(results);
+ assertTrue(results.next());
+ assertEquals(1, results.getInt(1));
+
+ // Stop a server
+ avaticaServer.stop();
+
+ // Execute query on statement - It fails with SQL exception.
+ try {
+ stmt.execute("SELECT count(1) FROM " + tableName);
+ } catch (Exception e) {
+ assertTrue(e instanceof SQLException);
+ assertTrue(
+ e.getMessage().toLowerCase(Locale.ROOT).contains("connection refused")
+ || e.getMessage().toLowerCase(Locale.ROOT).contains("connection abort"));
+ }
+
+ // Create statement with conn - Fails with HttpHostConnectException.
+ try {
+ Statement stmt2 = conn.createStatement();
+ stmt2.execute("SELECT count(1) FROM " + tableName);
+ fail("Should have thrown connection refused error.");
+ } catch (Exception e) {
+ assertTrue(e instanceof RuntimeException);
+ assertNotNull(e.getCause());
+ assertTrue(e.getCause() instanceof HttpHostConnectException);
+ assertTrue(e.getMessage().contains("Connection refused"));
+ }
+ }
+
+ @Test
+ public void testShuffledRoundRobinLBStrategyThreadSafe() throws Exception {
+ resetShuffledRoundRobinLBStrategyState();
+
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(
+ BuiltInConnectionProperty.LB_STRATEGY.name(), ShuffledRoundRobinLBStrategy.class.getName());
+ StringBuilder sb = new StringBuilder();
+ // First URL will be server we started in this test
+ for (int i = 0; i < NO_OF_SERVERS; i++) {
+ sb.append("http://localhost:").append(START_PORT + i).append(",");
+ }
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), sb.substring(0, sb.length() - 1));
+
+ // Create a connection
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+ Callable<AvaticaConnection> callable =
+ () -> (AvaticaConnection) DriverManager.getConnection(url, properties);
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ Future<AvaticaConnection> result1 = executorService.submit(callable);
+ Future<AvaticaConnection> result2 = executorService.submit(callable);
+ executorService.shutdown();
+
+ AvaticaConnection connection1 = result1.get();
+ AvaticaConnection connection2 = result2.get();
+ assertNotNull(connection1);
+ assertNotNull(connection2);
+
+ // Verify that both threads got connections with different hosts
+ assertNotEquals(getConnectionURI(connection1), getConnectionURI(connection2));
+ }
+
+ @Test
+ public void testRoundRobinLBStrategyThreadSafe() throws Exception {
+ resetRoundRobinLBStrategyState();
+ Properties properties = new Properties();
+ properties.put(BuiltInConnectionProperty.USE_CLIENT_SIDE_LB.name(), "true");
+ properties.put(
+ BuiltInConnectionProperty.LB_STRATEGY.name(), RoundRobinLBStrategy.class.getName());
+ StringBuilder sb = new StringBuilder();
+ // First URL will be server we started in this test
+ for (int i = 0; i < NO_OF_SERVERS; i++) {
+ sb.append("http://localhost:").append(START_PORT + i).append(",");
+ }
+ properties.put(BuiltInConnectionProperty.LB_URLS.name(), sb.substring(0, sb.length() - 1));
+
+ // Create a connection
+ String url = SERVERS.getJdbcUrl(START_PORT, Driver.Serialization.PROTOBUF);
+ Callable<AvaticaConnection> callable =
+ () -> (AvaticaConnection) DriverManager.getConnection(url, properties);
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ Future<AvaticaConnection> result1 = executorService.submit(callable);
+ Future<AvaticaConnection> result2 = executorService.submit(callable);
+ executorService.shutdown();
+
+ AvaticaConnection connection1 = result1.get();
+ AvaticaConnection connection2 = result2.get();
+ assertNotNull(connection1);
+ assertNotNull(connection2);
+
+ // Verify URLs are not same when connections are created in different threads.
+ String url1 = getConnectionURI(connection1);
+ String url2 = getConnectionURI(connection2);
+
+ //Verify that both threads got connections with different hosts
+ assertNotEquals(url1, url2);
+
+ //Verify that T1 picked-up URL0 and T2 picked-up URL1 or vice versa
+ assertTrue(urls[0].equals(url1) && urls[1].equals(url2)
+ || urls[0].equals(url2) && urls[1].equals(url1));
+ }
+
+ private String getConnectionURI(AvaticaConnection conn)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field fService = AvaticaConnection.class.getDeclaredField("service");
+ fService.setAccessible(true);
+ RemoteProtobufService service = (RemoteProtobufService) fService.get(conn);
+
+ Field fClient = RemoteProtobufService.class.getDeclaredField("client");
+ fClient.setAccessible(true);
+ AvaticaHttpClient client = (AvaticaHttpClient) fClient.get(service);
+
+ Field fUri = AvaticaCommonsHttpClientImpl.class.getDeclaredField("uri");
+ fUri.setAccessible(true);
+ URI uri = (URI) fUri.get(client);
+
+ return uri.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void resetRoundRobinLBStrategyState()
+ throws NoSuchFieldException, IllegalAccessException {
+ Field configToIndexServedMapField =
+ RoundRobinLBStrategy.class.getDeclaredField("configToIndexServedMap");
+ configToIndexServedMapField.setAccessible(true);
+ Map<String, Integer> configToIndexServedMap =
+ (Map<String, Integer>) configToIndexServedMapField.get(RoundRobinLBStrategy.INSTANCE);
+ configToIndexServedMap.clear();
+
+ Field configToUrlListMapField =
+ RoundRobinLBStrategy.class.getDeclaredField("configToIndexServedMap");
+ configToIndexServedMapField.setAccessible(true);
+ Map<String, Integer> configToUrlListMap =
+ (Map<String, Integer>) configToUrlListMapField.get(RoundRobinLBStrategy.INSTANCE);
+ configToUrlListMap.clear();
+
+ }
+ @SuppressWarnings("unchecked")
+ private void resetShuffledRoundRobinLBStrategyState()
+ throws NoSuchFieldException, IllegalAccessException {
+ Field configToIndexServedMapField =
+ ShuffledRoundRobinLBStrategy.class.getDeclaredField("configToIndexServedMap");
+ configToIndexServedMapField.setAccessible(true);
+ Map<String, Integer> configToIndexServedMap =
+ (Map<String, Integer>) configToIndexServedMapField
+ .get(ShuffledRoundRobinLBStrategy.INSTANCE);
+ configToIndexServedMap.clear();
+
+ Field configToUrlListMapField =
+ ShuffledRoundRobinLBStrategy.class.getDeclaredField("configToIndexServedMap");
+ configToIndexServedMapField.setAccessible(true);
+ Map<String, Integer> configToUrlListMap =
+ (Map<String, Integer>) configToUrlListMapField
+ .get(ShuffledRoundRobinLBStrategy.INSTANCE);
+ configToUrlListMap.clear();
+ }
+}
diff --git a/server/src/test/java/org/apache/calcite/avatica/server/HttpServerSpnegoWithoutJaasTest.java b/server/src/test/java/org/apache/calcite/avatica/server/HttpServerSpnegoWithoutJaasTest.java
index 85d9f60..6fc9b9a 100644
--- a/server/src/test/java/org/apache/calcite/avatica/server/HttpServerSpnegoWithoutJaasTest.java
+++ b/server/src/test/java/org/apache/calcite/avatica/server/HttpServerSpnegoWithoutJaasTest.java
@@ -225,7 +225,7 @@
final AvaticaCommonsHttpClientImpl httpClient =
new AvaticaCommonsHttpClientImpl(httpServerUrl);
httpClient.setGSSCredential(credential);
- httpClient.setHttpClientPool(pool);
+ httpClient.setHttpClientPool(pool, config);
return httpClient.send(new byte[0]);
}
diff --git a/site/_docs/client_reference.md b/site/_docs/client_reference.md
index c3835c7..94e324d 100644
--- a/site/_docs/client_reference.md
+++ b/site/_docs/client_reference.md
@@ -193,3 +193,57 @@
: _Default_: `false`.
: _Required_: No.
+
+<strong><a name="use_client_side_lb" href="#use_client_side_lb">use_client_side_lb</a></strong>
+
+: _Description_: Enables the client side load-balancing.
+
+: _Default_: `false`.
+
+: _Required_: No.
+
+<strong><a name="lb_urls" href="#lb_urls">lb_urls</a></strong>
+
+: _Description_: List of URLs in a comma separated format, for example "URL1,URL2...URLn", to be used by the client side
+load balancer. Depending on the load balancing strategy, load balancer selects one of the URLs from the list.
+
+: _Default_: `null`.
+
+: _Required_: No.
+
+<strong><a name="lb_strategy" href="#lb_strategy">lb_strategy</a></strong>
+
+: _Description_: The load balancing strategy to be used by the client side load balancer. It must be a fully qualified
+Java class name which implements `org.apache.calcite.avatica.ha.LBStrategy`. Three implementations are provided
+`org.apache.calcite.avatica.ha.RandomSelectLBStrategy`, `org.apache.calcite.avatica.ha.RoundRobinLBStrategy` and
+`org.apache.calcite.avatica.ha.ShuffledRoundRobinLBStrategy`
+
+: _Default_: `org.apache.calcite.avatica.ha.ShuffledRoundRobinLBStrategy`.
+
+: _Required_: No.
+
+<strong><a name="lb_connection_failover_retries" href="#lb_connection_failover_retries">lb_connection_failover_retries</a></strong>
+
+: _Description_: Number of times that the load balancer tries to retry the connection with another URL (fail-over).
+When the connection fails, load balancer retries the connection with another URL, chosen by the load balancing strategy.
+
+: _Default_: `3`.
+
+: _Required_: No.
+
+<strong><a name="lb_connection_failover_sleep_time" href="#lb_connection_failover_sleep_time">lb_connection_failover_sleep_time</a></strong>
+
+: _Description_: The amount of time in milliseconds that the load balancer sleeps before attempting the next connection
+failover retry.
+
+: _Default_: `1000`.
+
+: _Required_: No.
+
+<strong><a name="http_connection_timeout" href="#http_connection_timeout">http_connection_timeout</a></strong>
+
+: _Description_: Timeout in milliseconds for the connection between the Avatica HTTP client and server.
+
+: _Default_: `180000` (3 minutes).
+
+: _Required_: No.