TINKERPOP-2445 Parallelize connection creation (#1360)
Changes include:
- Added a new NoHostAvailableException
- Changed ConnectionException to extend RuntimeException
- Add powermock dependency
- Do not try to reconnect to host if the connection pool initialization to that host has failed
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 4343093..4eb11fc 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -64,6 +64,9 @@
* Deprecated `BytecodeUtil` and merged its functionality to the existing `BytecodeHelper`.
* Added configuring implementation in HasStep
* Remove static initialization for `GraphSONMessageSerializerV1d0` and `GraphSONMessageSerializerV1d0` in Java driver.
+* Connections to the server in a connection pool are created in parallel instead of serially in Java Driver.
+* Connection pools for multiple endpoints are created in parallel instead of serially in Java Driver.
+* Introduced new HostNotAvailable exception to represent cases when no server with active connection is available.
[[release-3-4-8]]
=== TinkerPop 3.4.8 (Release Date: August 3, 2020)
diff --git a/gremlin-driver/pom.xml b/gremlin-driver/pom.xml
index bd997a6..7bffa41 100644
--- a/gremlin-driver/pom.xml
+++ b/gremlin-driver/pom.xml
@@ -94,6 +94,16 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 596910a..5460e75 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -47,6 +47,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static java.lang.Math.toIntExact;
@@ -255,8 +256,16 @@
// forgot to enable it or perhaps the server is not configured for websockets.
handler.handshakeFuture().sync();
} catch (Exception ex) {
- throw new RuntimeException(new ConnectionException(connection.getUri(),
- "Could not complete websocket handshake - ensure that client protocol matches server", ex));
+ String errMsg = "";
+ if (ex instanceof TimeoutException) {
+ errMsg = "Timed out while waiting to complete the connection setup. Consider increasing the " +
+ "WebSocket handshake timeout duration.";
+ } else {
+ errMsg = "Could not complete connection setup to the server. Ensure that SSL is correctly " +
+ "configured at both the client and the server. Ensure that client WebSocket handshake " +
+ "protocol matches the server. Ensure that the server is still reachable.";
+ }
+ throw new ConnectionException(connection.getUri(), errMsg, ex);
}
}
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 9cbc337..af4ab31 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -18,7 +18,10 @@
*/
package org.apache.tinkerpop.gremlin.driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -26,8 +29,6 @@
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
@@ -37,11 +38,13 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
@@ -105,7 +108,7 @@
* one or more globally defined {@link Graph} or {@link TraversalSource} server bindings for the context of
* the created {@code Client}.
*/
- public Client alias(final Map<String,String> aliases) {
+ public Client alias(final Map<String, String> aliases) {
return new AliasClusteredClient(this, aliases, settings);
}
@@ -154,8 +157,7 @@
* A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
*
* @param bytecode request in the form of gremlin {@link Bytecode}
- * @param options for the request
- *
+ * @param options for the request
* @see #submit(Bytecode)
*/
public ResultSet submit(final Bytecode bytecode, final RequestOptions options) {
@@ -181,8 +183,7 @@
* A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
*
* @param bytecode request in the form of gremlin {@link Bytecode}
- * @param options for the request
- *
+ * @param options for the request
* @see #submitAsync(Bytecode)
*/
public CompletableFuture<ResultSet> submitAsync(final Bytecode bytecode, final RequestOptions options) {
@@ -201,8 +202,14 @@
logger.debug("Initializing client on cluster [{}]", cluster);
cluster.init();
+
initializeImplementation();
+ // throw an error if no host is available even after initialization is complete.
+ if (cluster.availableHosts().isEmpty()) {
+ throw new NoHostAvailableException();
+ }
+
initialized = true;
return this;
}
@@ -223,7 +230,7 @@
* this method to concatenating a Gremlin script from dynamically produced strings and sending it to
* {@link #submit(String)}. Parameterized scripts will perform better.
*
- * @param gremlin the gremlin script to execute
+ * @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
*/
public ResultSet submit(final String gremlin, final Map<String, Object> parameters) {
@@ -263,7 +270,7 @@
* The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
* write of the request completes.
*
- * @param gremlin the gremlin script to execute
+ * @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
*/
public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, Object> parameters) {
@@ -279,15 +286,15 @@
* The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
* write of the request completes.
*
- * @param gremlin the gremlin script to execute
- * @param parameters a map of parameters that will be bound to the script on execution
+ * @param gremlin the gremlin script to execute
+ * @param parameters a map of parameters that will be bound to the script on execution
* @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
* @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
*/
@Deprecated
public CompletableFuture<ResultSet> submitAsync(final String gremlin, final String graphOrTraversalSource,
final Map<String, Object> parameters) {
- Map<String,String> aliases = null;
+ Map<String, String> aliases = null;
if (graphOrTraversalSource != null && !graphOrTraversalSource.isEmpty()) {
aliases = makeDefaultAliasMap(graphOrTraversalSource);
}
@@ -299,15 +306,15 @@
* The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
* write of the request completes.
*
- * @param gremlin the gremlin script to execute
+ * @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
- * @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the
- * script where the key is the alias name and the value represents the global variable on the
- * server
+ * @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the
+ * script where the key is the alias name and the value represents the global variable on the
+ * server
* @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
*/
@Deprecated
- public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String,String> aliases,
+ public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, String> aliases,
final Map<String, Object> parameters) {
final RequestOptions.Builder options = RequestOptions.build();
if (aliases != null && !aliases.isEmpty()) {
@@ -402,8 +409,8 @@
return cluster;
}
- protected Map<String,String> makeDefaultAliasMap(final String graphOrTraversalSource) {
- final Map<String,String> aliases = new HashMap<>();
+ protected Map<String, String> makeDefaultAliasMap(final String graphOrTraversalSource) {
+ final Map<String, String> aliases = new HashMap<>();
aliases.put("g", graphOrTraversalSource);
return aliases;
}
@@ -443,8 +450,8 @@
* this method to concatenating a Gremlin script from dynamically produced strings and sending it to
* {@link #submit(String)}. Parameterized scripts will perform better.
*
- * @param gremlin the gremlin script to execute
- * @param parameters a map of parameters that will be bound to the script on execution
+ * @param gremlin the gremlin script to execute
+ * @param parameters a map of parameters that will be bound to the script on execution
* @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
*/
public ResultSet submit(final String gremlin, final String graphOrTraversalSource, final Map<String, Object> parameters) {
@@ -460,7 +467,7 @@
*/
@Override
public Client alias(final String graphOrTraversalSource) {
- final Map<String,String> aliases = new HashMap<>();
+ final Map<String, String> aliases = new HashMap<>();
aliases.put("g", graphOrTraversalSource);
return alias(aliases);
}
@@ -469,7 +476,7 @@
* {@inheritDoc}
*/
@Override
- public Client alias(final Map<String,String> aliases) {
+ public Client alias(final Map<String, String> aliases) {
return new AliasClusteredClient(this, aliases, settings);
}
@@ -492,7 +499,7 @@
// you can get no possible hosts in more than a few situations. perhaps the servers are just all down.
// or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server).
if (!possibleHosts.hasNext())
- throw new TimeoutException("Timed out while waiting for an available host - check the client configuration and connectivity to the server if this message persists");
+ throw new NoHostAvailableException();
final Host bestHost = possibleHosts.next();
final ConnectionPool pool = hostConnectionPools.get(bestHost);
@@ -504,18 +511,20 @@
*/
@Override
protected void initializeImplementation() {
- cluster.allHosts().forEach(host -> {
- try {
- // hosts that don't initialize connection pools will come up as a dead host
- hostConnectionPools.put(host, new ConnectionPool(host, this));
-
- // added a new host to the cluster so let the load-balancer know
- this.cluster.loadBalancingStrategy().onNew(host);
- } catch (Exception ex) {
- // catch connection errors and prevent them from failing the creation
- logger.warn("Could not initialize connection pool for {} - will try later", host);
+ try {
+ CompletableFuture.allOf(cluster.allHosts().stream()
+ .map(host -> CompletableFuture.runAsync(() -> initializeConnectionSetupForHost.accept(host), cluster.executor()))
+ .toArray(CompletableFuture[]::new))
+ .join();
+ } catch (CompletionException ex) {
+ Throwable cause = null;
+ Throwable result = ex;
+ if (null != (cause = ex.getCause())) {
+ result = cause;
}
- });
+
+ logger.error("", result);
+ }
}
/**
@@ -526,11 +535,27 @@
if (closing.get() != null)
return closing.get();
- final CompletableFuture[] poolCloseFutures = new CompletableFuture[hostConnectionPools.size()];
- hostConnectionPools.values().stream().map(ConnectionPool::closeAsync).collect(Collectors.toList()).toArray(poolCloseFutures);
- closing.set(CompletableFuture.allOf(poolCloseFutures));
+ final CompletableFuture<Void> allPoolsClosedFuture =
+ CompletableFuture.allOf(hostConnectionPools.values().stream()
+ .map(ConnectionPool::closeAsync)
+ .toArray(CompletableFuture[]::new));
+
+ closing.set(allPoolsClosedFuture);
return closing.get();
}
+
+ private Consumer<Host> initializeConnectionSetupForHost = host -> {
+ try {
+ // hosts that don't initialize connection pools will come up as a dead host
+ hostConnectionPools.put(host, new ConnectionPool(host, ClusteredClient.this));
+
+ // added a new host to the cluster so let the load-balancer know
+ ClusteredClient.this.cluster.loadBalancingStrategy().onNew(host);
+ } catch (RuntimeException ex) {
+ final String errMsg = "Could not initialize client for " + host;
+ throw new RuntimeException(errMsg, ex);
+ }
+ };
}
/**
@@ -539,10 +564,10 @@
*/
public static class AliasClusteredClient extends Client {
private final Client client;
- private final Map<String,String> aliases = new HashMap<>();
+ private final Map<String, String> aliases = new HashMap<>();
final CompletableFuture<Void> close = new CompletableFuture<>();
- AliasClusteredClient(final Client client, final Map<String,String> aliases, final Client.Settings settings) {
+ AliasClusteredClient(final Client client, final Map<String, String> aliases, final Client.Settings settings) {
super(client.cluster, settings);
this.client = client;
this.aliases.putAll(aliases);
@@ -559,8 +584,8 @@
// need to call buildMessage() right away to get client specific configurations, that way request specific
// ones can override as needed
final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_BYTECODE)
- .processor("traversal")
- .addArg(Tokens.ARGS_GREMLIN, bytecode));
+ .processor("traversal")
+ .addArg(Tokens.ARGS_GREMLIN, bytecode));
// apply settings if they were made available
options.getBatchSize().ifPresent(batchSize -> request.add(Tokens.ARGS_BATCH_SIZE, batchSize));
@@ -582,7 +607,7 @@
// overrides which should be mucked with
if (!aliases.isEmpty()) {
final Map original = (Map) msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap());
- aliases.forEach((k,v) -> {
+ aliases.forEach((k, v) -> {
if (!original.containsKey(k))
builder.addArg(Tokens.ARGS_ALIASES, aliases);
});
@@ -708,7 +733,12 @@
if (hosts.isEmpty()) throw new IllegalStateException("No available host in the cluster");
Collections.shuffle(hosts);
final Host host = hosts.get(0);
- connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
+
+ try {
+ connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
+ } catch (RuntimeException ex) {
+ logger.error("Could not initialize client for {}", host, ex);
+ }
}
@Override
@@ -758,7 +788,8 @@
public static class Builder {
private Optional<SessionSettings> session = Optional.empty();
- private Builder() {}
+ private Builder() {
+ }
/**
* Enables a session. By default this will create a random session name and configure transactions to be
@@ -842,7 +873,8 @@
private String sessionId = UUID.randomUUID().toString();
private boolean forceClosed = false;
- private Builder() {}
+ private Builder() {
+ }
/**
* If enabled, transactions will be "managed" such that each request will represent a complete transaction.
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 9636df9..26699bc 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -101,7 +101,7 @@
this.maxInProcess = maxInProcess;
this.keepAliveInterval = pool.settings().keepAliveInterval;
- connectionLabel = String.format("Connection{host=%s}", pool.host);
+ connectionLabel = "Connection{host=" + pool.host + "}";
if (cluster.isClosing())
throw new IllegalStateException("Cannot open a connection with the cluster after close() is called");
@@ -128,9 +128,8 @@
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- if(logger.isDebugEnabled()) {
- logger.debug("OnChannelClose callback called for channel {}", channel.id().asShortText());
- }
+ logger.debug("OnChannelClose callback called for channel {}", channel);
+
// Replace the channel if it was not intentionally closed using CloseAsync method.
if (thisConnection.closeFuture.get() == null) {
// delegate the task to worker thread and free up the event loop
@@ -143,11 +142,11 @@
// Default WebSocketChannelizer uses Netty's IdleStateHandler
if (!(channelizer instanceof Channelizer.WebSocketChannelizer)) {
+ logger.debug("Using custom keep alive handler.");
scheduleKeepAlive();
}
- } catch (Exception ie) {
- logger.debug("Error opening connection on {}", uri);
- throw new ConnectionException(uri, "Could not open connection", ie);
+ } catch (Exception ex) {
+ throw new ConnectionException(uri, "Could not open " + this.toString(), ex);
}
}
@@ -273,6 +272,7 @@
// Default WebSocketChannelizer uses Netty's IdleStateHandler
if (!(channelizer instanceof Channelizer.WebSocketChannelizer)) {
+ logger.debug("Using custom keep alive handler.");
scheduleKeepAlive();
}
@@ -409,10 +409,10 @@
/**
* Returns the short ID for the underlying channel for this connection.
* <p>
- * Currently only used for testing.
+ * Visible for testing.
*/
String getChannelId() {
- return (channel != null) ? channel.id().asShortText() : "";
+ return (channel != null) ? channel.id().asShortText() : "null";
}
@Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index f7b35e9..295c9d0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -18,25 +18,23 @@
*/
package org.apache.tinkerpop.gremlin.driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
@@ -85,7 +83,7 @@
this.host = host;
this.client = client;
this.cluster = client.cluster;
- poolLabel = String.format("Connection Pool {host=%s}", host);
+ poolLabel = "Connection Pool {host=" + host + "}";
final Settings.ConnectionPoolSettings settings = settings();
this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
@@ -97,15 +95,38 @@
this.connections = new CopyOnWriteArrayList<>();
try {
+ final List<CompletableFuture<Void>> connCreationFutures = new ArrayList<>();
for (int i = 0; i < minPoolSize; i++) {
- this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+ connCreationFutures.add(CompletableFuture.runAsync(() -> {
+ try {
+ this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+ } catch (ConnectionException e) {
+ throw new CompletionException(e);
+ }
+ }, cluster.executor()));
}
- } catch (ConnectionException ce) {
- // ok if we don't get it initialized here - when a request is attempted in a connection from the
- // pool it will try to create new connections as needed.
- logger.info("Could not initialize connections in pool for {} - pool size at {}", host, this.connections.size(), ce);
- considerHostUnavailable();
+ CompletableFuture.allOf(connCreationFutures.toArray(new CompletableFuture[0])).join();
+ } catch (CancellationException ce) {
+ logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce);
+ throw ce;
+ } catch (CompletionException ce) {
+ // Some connections might have been initialized. Close the connection pool gracefully to close them.
+ this.closeAsync();
+
+ final String errMsg = "Could not initialize " + minPoolSize + " (minPoolSize) connections in pool." +
+ " Successful connections=" + this.connections.size() +
+ ". Closing the connection pool.";
+
+
+ Throwable cause = null;
+ Throwable result = ce;
+
+ if (null != (cause = result.getCause())) {
+ result = cause;
+ }
+
+ throw new CompletionException(errMsg, result);
}
this.open = new AtomicInteger(connections.size());
@@ -316,7 +337,7 @@
try {
connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
} catch (ConnectionException ce) {
- logger.debug("Connections were under max, but there was an error creating the connection.", ce);
+ logger.error("Connections were under max, but there was an error creating the connection.", ce);
open.decrementAndGet();
considerHostUnavailable();
return false;
@@ -350,7 +371,7 @@
// only close the connection for good once it is done being borrowed or when it is dead
if (connection.isDead() || connection.borrowed.get() == 0) {
- if(bin.remove(connection)) {
+ if (bin.remove(connection)) {
connection.closeAsync();
// TODO: Log the following message on completion of the future returned by closeAsync.
logger.debug("{} destroyed", connection.getConnectionInfo());
@@ -396,7 +417,7 @@
logger.debug("Continue to wait for connection on {} if {} > 0", host, remaining);
} while (remaining > 0);
- logger.debug("Timed-out waiting for connection on {} - possibly unavailable", host);
+ logger.error("Timed-out ({} {}) waiting for connection on {} - possibly unavailable", timeout, unit, host);
// if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
// either way supply a function to reconnect
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
index 67101b3..551c9d0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
@@ -23,9 +23,11 @@
import java.util.Optional;
/**
+ * This exception signifies network connection failure.
+ *
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public class ConnectionException extends Exception {
+public class ConnectionException extends RuntimeException {
private URI uri;
private InetSocketAddress address;
@@ -35,6 +37,12 @@
this.uri = uri;
}
+ public ConnectionException(final URI uri, final Throwable cause) {
+ super(cause);
+ this.uri = uri;
+ this.address = null;
+ }
+
public ConnectionException(final URI uri, final String message, final Throwable cause) {
super(message, cause);
this.uri = uri;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
new file mode 100644
index 0000000..e8b3b87
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.exception;
+
+public class NoHostAvailableException extends RuntimeException {
+
+ public NoHostAvailableException() {
+ super("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason.");
+ }
+
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this;
+ }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 7e55b92..3d2df78 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -70,8 +70,10 @@
if (!handshakeFuture.isDone()) {
// channel was closed before the handshake could be completed.
handshakeFuture.setFailure(
- new RuntimeException(String.format("Channel=[%s] closed before the handshake could complete",
- ctx.channel().toString())));
+ new RuntimeException(String.format("WebSocket channel=[%s] closed before the handshake could complete." +
+ " Server logs could contain the reason for abrupt connection disconnect or the " +
+ "server might not be reachable from the client anymore.",
+ ctx.channel().id().asShortText())));
}
super.channelInactive(ctx);
@@ -82,7 +84,7 @@
if (event instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) event;
if (e.state() == IdleState.READER_IDLE) {
- logger.warn("WebSocket connection " + ctx.channel() + " has been idle for too long.");
+ logger.warn("WebSocket connection {} has been idle for too long.", ctx.channel());
} else if (e.state() == IdleState.WRITER_IDLE) {
logger.debug("Sending ping frame to the server");
ctx.writeAndFlush(new PingWebSocketFrame());
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
new file mode 100644
index 0000000..20f38ba
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Client.ClusteredClient.class, Client.SessionedClient.class, Host.class, Cluster.class})
+public class ClientTest {
+ @Mock
+ private Cluster cluster;
+
+ @Mock
+ private Host mockAvailableHost;
+
+ @Mock
+ private Client.Settings settings;
+
+ private ScheduledExecutorService executor;
+
+ @Before
+ public void setup() {
+ executor = Executors.newScheduledThreadPool(1);
+ when(mockAvailableHost.isAvailable()).thenReturn(true);
+ when(cluster.allHosts()).thenReturn(Collections.singletonList(mockAvailableHost));
+ when(cluster.executor()).thenReturn(executor);
+ }
+
+ @After
+ public void cleanup() {
+ executor.shutdown();
+ }
+
+ @Test(expected = NoHostAvailableException.class)
+ public void shouldThrowErrorWhenConnPoolInitFailsForClusteredClient() throws Exception {
+ Client.ClusteredClient client = new Client.ClusteredClient(cluster, settings);
+ whenNew(ConnectionPool.class).withAnyArguments().thenThrow(new RuntimeException("cannot initialize client"));
+ client.init();
+ }
+
+ @Test(expected = NoHostAvailableException.class)
+ public void shouldThrowErrorWhenConnPoolInitFailsForSessionClient() throws Exception {
+ final Client.SessionSettings sessionSettings = Client.SessionSettings.build().sessionId("my-session-id").create();
+ when(settings.getSession()).thenReturn(Optional.of(sessionSettings));
+ Client.SessionedClient client = new Client.SessionedClient(cluster, settings);
+ whenNew(ConnectionPool.class).withAnyArguments().thenThrow(new RuntimeException("cannot initialize client"));
+ client.init();
+ }
+
+}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
index 3d6bef3..1008653 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
@@ -108,6 +108,7 @@
final ResponseMessage deserialized = serializer.deserializeResponse(serialized);
final SamplePerson actual = (SamplePerson) deserialized.getResult().getData();
+
assertThat(actual, new ReflectionEquals(person));
}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
index 5a2e1f8..4259d15 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
@@ -102,10 +102,10 @@
// periodically ping the server, but coming from this direction allows the server to kill channels that
// have dead clients on the other end
if (e.state() == IdleState.READER_IDLE) {
- logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel());
+ logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel().id().asShortText());
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE && settings.keepAliveInterval > 0) {
- logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel());
+ logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel().id().asShortText());
ctx.writeAndFlush(channelizer.createIdleDetectionMessage());
}
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index db88ef9..623107f 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -28,6 +28,7 @@
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
@@ -64,6 +65,7 @@
import java.awt.Color;
import java.io.File;
+import java.net.ConnectException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@@ -411,7 +413,7 @@
fail("Should not have gone through because the server is not running");
} catch (Exception i) {
final Throwable root = ExceptionUtils.getRootCause(i);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
}
startServer();
@@ -447,7 +449,7 @@
fail("Should not have gone through because the server is not running");
} catch (Exception i) {
final Throwable root = ExceptionUtils.getRootCause(i);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
}
startServer();
@@ -476,6 +478,7 @@
try {
final Client client = cluster.connect();
+ client.init();
// the first host is dead on init. request should succeed on localhost
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
@@ -861,12 +864,29 @@
final Cluster cluster = TestClientFactory.open();
try {
assertEquals(0, cluster.availableHosts().size());
- cluster.connect().init();
+ final Client client1 = cluster.connect().init();
assertEquals(1, cluster.availableHosts().size());
stopServer();
+ // We create a new client here which will fail to initialize but the original client still has
+ // host marked as connected. Since the second client failed during initialization, it has no way to
+ // test if a host is indeed unreachable because it doesn't have any established connections. It will not add
+ // the host to load balancer but it will also not remove it if it already exists there. Leave that
+ // responsibility to a client that added it. In this case, let the second client perform it's own mechanism
+ // to mark host as unavailable. The first client will discover that the host has failed either with next
+ // keepAlive message or the next request, whichever is earlier. In this case, we will simulate the second
+ // scenario by sending a new request on first client. The request would fail (since server is down) and
+ // client should mark the host unavailable.
cluster.connect().init();
+
+ try {
+ client1.submit("1+1").all().join();
+ fail("Expecting an exception because the server is shut down.");
+ } catch (Exception ex) {
+ // ignore the exception
+ }
+
assertEquals(0, cluster.availableHosts().size());
} finally {
cluster.close();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index 807e9a7..f1740c4 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -21,6 +21,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
import org.ietf.jgss.GSSException;
@@ -88,8 +89,7 @@
fail("This should not succeed as the client did not enable SSL");
} catch(Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
- assertEquals(TimeoutException.class, root.getClass());
- assertThat(root.getMessage(), startsWith("Timed out while waiting for an available host"));
+ assertEquals(NoHostAvailableException.class, root.getClass());
} finally {
cluster.close();
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
index ab75b8e..8e1e8fb 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
@@ -27,6 +27,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.junit.Test;
import java.util.Arrays;
@@ -203,7 +204,7 @@
fail("Should throw exception because ssl is enabled on the server but not on client");
} catch(Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -233,7 +234,7 @@
fail("Should throw exception because ssl client auth is enabled on the server but client does not have a cert");
} catch(Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -251,7 +252,7 @@
fail("Should throw exception because ssl client auth is enabled on the server but does not trust client's cert");
} catch(Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -294,7 +295,7 @@
fail("Should throw exception because ssl client auth is enabled on the server but client does not have a cert");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -311,7 +312,7 @@
fail("Should throw exception because ssl client auth is enabled on the server but does not trust client's cert");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -328,7 +329,7 @@
fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -345,7 +346,7 @@
fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -391,7 +392,7 @@
fail("Should throw exception because incorrect keyStoreType is specified");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -410,7 +411,7 @@
fail("Should throw exception because incorrect trustStoreType is specified");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
diff --git a/pom.xml b/pom.xml
index 3a5451b..8fd981c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,6 +160,7 @@
<slf4j.version>1.7.25</slf4j.version>
<snakeyaml.version>1.27</snakeyaml.version>
<spark.version>2.4.0</spark.version>
+ <powermock.version>1.6.4</powermock.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
@@ -721,6 +722,18 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>