Merge branch 'TINKERPOP-2479' into 3.4-dev
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 3d81a53..eed6738 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -57,12 +57,19 @@
* Deprecated driver `Channelizer` keep-alive related methods.
* Delegate handling of WebSocket handshake to Netty instead of custom code in Java Driver.
* Delegate detection of idle connection to Netty instead of custom keep alive logic for `WebSocketChannelizer`.
-* Added support for WebSocket frame compression extension ( [RFC7692](https://tools.ietf.org/html/rfc7692) ) for `WebSocketChannelizer` in Java driver.
+* Added support for WebSocket frame compression extension ( [RFC7692](https://tools.ietf.org/html/rfc7692) ) for `WebSocketChannelizer` in Java/Python driver.
* Added server support for WebSocket compression extension ( [RFC7692](https://tools.ietf.org/html/rfc7692) ).
* Fixed bug with Bytecode serialization when `Bytecode.toString()` is used in Javascript.
* Fixed "toString" for P and TextP to produce valid script representation from bytecode glv steps containing a string predicate in Javascript.
* Fixed a bug which could cause Java driver to hang when using `ResultSet.statusAttributes()`
+* Fixed bug with `ReservedVerificationStrategy.getConfiguration()` which was omitting the reserved `keys` value.
+* Changed all configuration keys on `AbstractWarningVerificationStrategy` implementations to `public`.
* Deprecated `BytecodeUtil` and merged its functionality to the existing `BytecodeHelper`.
+* Added configuring implementation in HasStep
+* Remove static initialization for `GraphSONMessageSerializerV1d0` and `GraphSONMessageSerializerV1d0` in Java driver.
+* Connections to the server in a connection pool are created in parallel instead of serially in Java Driver.
+* Connection pools for multiple endpoints are created in parallel instead of serially in Java Driver.
+* Introduced new HostNotAvailable exception to represent cases when no server with active connection is available.
[[release-3-4-8]]
=== TinkerPop 3.4.8 (Release Date: August 3, 2020)
diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc
index fd769bb..8d9acc8 100644
--- a/docs/src/reference/gremlin-variants.asciidoc
+++ b/docs/src/reference/gremlin-variants.asciidoc
@@ -770,13 +770,21 @@
|=========================================================
Note that the `transport_factory` can allow for additional configuration of the `TornadoTransport`, which exposes
-options to manage `ioloop` timeouts:
+options to manage `ioloop` timeouts and compression settings:
-```python
+[source,python]
+----
g = traversal().withRemote(
DriverRemoteConnection('ws://localhost:8182/gremlin','g',
- transport_factory=lambda: TornadoTransport(read_timeout=10, write_timeout=10)))
-```
+ transport_factory=lambda: TornadoTransport(read_timeout=10,
+ write_timeout=10,
+ compression_options={'compression_level':5,'mem_level':5})))
+----
+
+Compression configuration options are described in the
+link:https://docs.python.org/3.6/library/zlib.html#zlib.compressobj[zlib documentation]. By default, compression
+settings are configured as shown in the above example.
+
[[gremlin-python-strategies]]
=== Traversal Strategies
diff --git a/docs/src/upgrade/release-3.4.x.asciidoc b/docs/src/upgrade/release-3.4.x.asciidoc
index 913138a..ba5acf1 100644
--- a/docs/src/upgrade/release-3.4.x.asciidoc
+++ b/docs/src/upgrade/release-3.4.x.asciidoc
@@ -28,7 +28,9 @@
Please see the link:https://github.com/apache/tinkerpop/blob/3.4.9/CHANGELOG.asciidoc#release-3-4-9[changelog] for a
complete list of all the modifications that are part of this release.
-=== Translator Implementations
+=== Upgrading for Users
+
+==== Translator Implementations
One of the silent features of Gremlin is the `ScriptTranslator`. More specifically, the implementation of this
interface which will convert a `Traversal` object (or Gremlin `Bytecode`) into a proper `String` representation that
@@ -59,7 +61,7 @@
See: link:https://issues.apache.org/jira/browse/TINKERPOP-2461[TINKERPOP-2461]
-=== Bytecode Command Improvements
+==== Bytecode Command Improvements
The `:bytecode` command in the Gremlin console includes two new options: `reset` and `config`. Both options provide
ways to better control the `GraphSONMapper` used internally by the command. The `reset` option will replace the current
@@ -69,7 +71,7 @@
See: link:https://issues.apache.org/jira/browse/TINKERPOP-2479[TINKERPOP-2479]
-=== withStrategies() Groovy Syntax
+==== withStrategies() Groovy Syntax
The `withStrategies()` configuration step accepts a variable number of `TraversalStrategy` instances. In Java, those
instances are typically constructed with `instance()` if it is a singleton or by way of a builder pattern which
@@ -101,7 +103,7 @@
See: link:https://issues.apache.org/jira/browse/TINKERPOP-2466[TINKERPOP-2466]
-=== withEmbedded()
+==== withEmbedded()
The `AnonymousTraversalSource` was introduced in 3.3.5 and is most typically used for constructing remote
`TraversalSource` instances, but it also provides a way to construct a `TraversalSource` from an embedded `Graph`
@@ -143,6 +145,17 @@
See: link:https://issues.apache.org/jira/browse/TINKERPOP-2054[TINKERPOP-2054]
+==== WebSocket Compression
+
+Gremlin Server now supports standard WebSocket compression (per link:https://tools.ietf.org/html/rfc7692[RFC 7692]).
+Both the Java and Python drivers support this functionality from the client's perspective. Compression is enabled by
+default and should be backward compatible, thus allowing older versions of the driver to connect to newer versions of
+the server and vice versa. Using the compression-enabled drivers with a server that also supports that functionality
+will greatly reduce network IO requirements.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-2441[TINKERPOP-2441],
+link:https://issues.apache.org/jira/browse/TINKERPOP-2453[TINKERPOP-2453]
+
==== Per Request Options
With Java it has been possible to pass per-request settings for both scripts and bytecode. While Javascript, Python,
@@ -175,7 +188,7 @@
link:https://issues.apache.org/jira/browse/TINKERPOP-2420[TINKERPOP-2420],
link:https://issues.apache.org/jira/browse/TINKERPOP-2421[TINKERPOP-2421]
-=== GraphManager Extension
+==== GraphManager Extension
The `org.apache.tinkerpop.gremlin.server.util.CheckedGraphManager` can be used instead of
`org.apache.tinkerpop.gremlin.server.util.DefaultGraphManager` (in gremlin-server.yml to ensures that Gremlin Server
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
index 93931c3..1b74508 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
@@ -65,8 +65,10 @@
public void set(final String key, final Object value) throws IllegalArgumentException, IllegalStateException;
/**
- * Set the value of the provided key. This is typically called in setup() and/or terminate() of the {@link VertexProgram}.
- * If this is called during execute(), there is no guarantee as to the ultimately stored value as call order is indeterminate.
+ * Set the value of the provided key. This is typically called in setup() and/or terminate() of the
+ * {@link VertexProgram}. If this is called during execute(), there is no guarantee as to the ultimately stored
+ * value as call order is indeterminate. It is up to the implementation to determine the states in which this
+ * method can be called.
*
* @param key they key to set a value for
* @param value the value to set for the key
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/HasStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/HasStep.java
index 450f656..6048ba7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/HasStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/HasStep.java
@@ -20,8 +20,10 @@
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Configuring;
import org.apache.tinkerpop.gremlin.process.traversal.step.HasContainerHolder;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Property;
@@ -36,8 +38,9 @@
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class HasStep<S extends Element> extends FilterStep<S> implements HasContainerHolder {
+public class HasStep<S extends Element> extends FilterStep<S> implements HasContainerHolder, Configuring {
+ private final Parameters parameters = new Parameters();
private List<HasContainer> hasContainers;
public HasStep(final Traversal.Admin traversal, final HasContainer... hasContainers) {
@@ -47,6 +50,16 @@
}
@Override
+ public Parameters getParameters() {
+ return this.parameters;
+ }
+
+ @Override
+ public void configure(final Object... keyValues) {
+ this.parameters.set(null, keyValues);
+ }
+
+ @Override
protected boolean filter(final Traverser.Admin<S> traverser) {
// the generic S is defined as Element but Property can also be used with HasStep so this seems to cause
// problems with some jdk versions.
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TraversalFilterStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TraversalFilterStep.java
index 7391733..ec37a3f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TraversalFilterStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TraversalFilterStep.java
@@ -20,7 +20,9 @@
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Configuring;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
@@ -32,7 +34,8 @@
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class TraversalFilterStep<S> extends FilterStep<S> implements TraversalParent {
+public final class TraversalFilterStep<S> extends FilterStep<S> implements TraversalParent, Configuring {
+ private final Parameters parameters = new Parameters();
private Traversal.Admin<S, ?> filterTraversal;
@@ -42,6 +45,16 @@
}
@Override
+ public Parameters getParameters() {
+ return this.parameters;
+ }
+
+ @Override
+ public void configure(final Object... keyValues) {
+ this.parameters.set(null, keyValues);
+ }
+
+ @Override
protected boolean filter(final Traverser.Admin<S> traverser) {
return TraversalUtil.test(traverser, this.filterTraversal);
}
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/AbstractWarningVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/AbstractWarningVerificationStrategy.java
index a7c5a3a..52c9410 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/AbstractWarningVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/AbstractWarningVerificationStrategy.java
@@ -42,8 +42,8 @@
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWarningVerificationStrategy.class);
- protected static final String THROW_EXCEPTION = "throwException";
- protected static final String LOG_WARNING = "logWarning";
+ public static final String THROW_EXCEPTION = "throwException";
+ public static final String LOG_WARNING = "logWarning";
protected final boolean throwException;
protected final boolean logWarning;
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategy.java
index 8fe6424..b106884 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategy.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.process.traversal.strategy.verification;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.Parameterizing;
@@ -31,7 +32,9 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -47,7 +50,7 @@
*/
public class ReservedKeysVerificationStrategy extends AbstractWarningVerificationStrategy {
- private static final String KEYS = "keys";
+ public static final String KEYS = "keys";
private static final Set<String> DEFAULT_RESERVED_KEYS = new HashSet<>(Arrays.asList("id", "label"));
private final Set<String> reservedKeys;
@@ -84,6 +87,13 @@
.logWarning(configuration.getBoolean(LOG_WARNING, false)).create();
}
+ @Override
+ public Configuration getConfiguration() {
+ final Configuration c = super.getConfiguration();
+ c.setProperty(KEYS, this.reservedKeys);
+ return c;
+ }
+
public static ReservedKeysVerificationStrategy.Builder build() {
return new ReservedKeysVerificationStrategy.Builder();
}
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategyTest.java
index f25f66d..22dcd11 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategyTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategyTest.java
@@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@@ -81,6 +82,7 @@
{"__.addE('knows').property('label','xyz')", __.addE("knows").property("id", "xyz"), false},
{"__.addV().property('x','xyz', 'label', 'xxx')", __.addV().property("x", "xyz", "label", "xxx"), false},
{"__.addV().property('x','xyz', 'not-Label', 'xxx')", __.addV().property("x", "xyz", "not-label", "xxx"), true},
+ {"__.addV().property('x','xyz', 'not-allowed', 'xxx')", __.addV().property("x", "xyz", "not-allowed", "xxx"), false},
});
}
@@ -106,7 +108,10 @@
@Test
public void shouldOnlyThrow() {
final TraversalStrategies strategies = new DefaultTraversalStrategies();
- strategies.addStrategies(ReservedKeysVerificationStrategy.build().throwException().create());
+ final ReservedKeysVerificationStrategy.Builder builder = ReservedKeysVerificationStrategy.build().throwException();
+ if (name.equals("__.addV().property('x','xyz', 'not-allowed', 'xxx')"))
+ builder.reservedKeys(new HashSet<>(Arrays.asList("id", "label", "not-allowed")));
+ strategies.addStrategies(builder.create());
final Traversal traversal = this.traversal.asAdmin().clone();
traversal.asAdmin().setStrategies(strategies);
if (allow) {
@@ -125,7 +130,10 @@
@Test
public void shouldOnlyLog() {
final TraversalStrategies strategies = new DefaultTraversalStrategies();
- strategies.addStrategies(ReservedKeysVerificationStrategy.build().logWarning().create());
+ final ReservedKeysVerificationStrategy.Builder builder = ReservedKeysVerificationStrategy.build().logWarning();
+ if (name.equals("__.addV().property('x','xyz', 'not-allowed', 'xxx')"))
+ builder.reservedKeys(new HashSet<>(Arrays.asList("id", "label", "not-allowed")));
+ strategies.addStrategies(builder.create());
final Traversal traversal = this.traversal.asAdmin().clone();
traversal.asAdmin().setStrategies(strategies);
traversal.asAdmin().applyStrategies();
@@ -138,7 +146,11 @@
@Test
public void shouldThrowAndLog() {
final TraversalStrategies strategies = new DefaultTraversalStrategies();
- strategies.addStrategies(ReservedKeysVerificationStrategy.build().throwException().logWarning().create());
+ final ReservedKeysVerificationStrategy.Builder builder = ReservedKeysVerificationStrategy.build().
+ throwException().logWarning();
+ if (name.equals("__.addV().property('x','xyz', 'not-allowed', 'xxx')"))
+ builder.reservedKeys(new HashSet<>(Arrays.asList("id", "label", "not-allowed")));
+ strategies.addStrategies(builder.create());
final Traversal traversal = this.traversal.asAdmin().clone();
traversal.asAdmin().setStrategies(strategies);
if (allow) {
diff --git a/gremlin-driver/pom.xml b/gremlin-driver/pom.xml
index bd997a6..7bffa41 100644
--- a/gremlin-driver/pom.xml
+++ b/gremlin-driver/pom.xml
@@ -94,6 +94,16 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 596910a..5460e75 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -47,6 +47,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static java.lang.Math.toIntExact;
@@ -255,8 +256,16 @@
// forgot to enable it or perhaps the server is not configured for websockets.
handler.handshakeFuture().sync();
} catch (Exception ex) {
- throw new RuntimeException(new ConnectionException(connection.getUri(),
- "Could not complete websocket handshake - ensure that client protocol matches server", ex));
+ String errMsg = "";
+ if (ex instanceof TimeoutException) {
+ errMsg = "Timed out while waiting to complete the connection setup. Consider increasing the " +
+ "WebSocket handshake timeout duration.";
+ } else {
+ errMsg = "Could not complete connection setup to the server. Ensure that SSL is correctly " +
+ "configured at both the client and the server. Ensure that client WebSocket handshake " +
+ "protocol matches the server. Ensure that the server is still reachable.";
+ }
+ throw new ConnectionException(connection.getUri(), errMsg, ex);
}
}
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 9cbc337..af4ab31 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -18,7 +18,10 @@
*/
package org.apache.tinkerpop.gremlin.driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -26,8 +29,6 @@
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
@@ -37,11 +38,13 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
@@ -105,7 +108,7 @@
* one or more globally defined {@link Graph} or {@link TraversalSource} server bindings for the context of
* the created {@code Client}.
*/
- public Client alias(final Map<String,String> aliases) {
+ public Client alias(final Map<String, String> aliases) {
return new AliasClusteredClient(this, aliases, settings);
}
@@ -154,8 +157,7 @@
* A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
*
* @param bytecode request in the form of gremlin {@link Bytecode}
- * @param options for the request
- *
+ * @param options for the request
* @see #submit(Bytecode)
*/
public ResultSet submit(final Bytecode bytecode, final RequestOptions options) {
@@ -181,8 +183,7 @@
* A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
*
* @param bytecode request in the form of gremlin {@link Bytecode}
- * @param options for the request
- *
+ * @param options for the request
* @see #submitAsync(Bytecode)
*/
public CompletableFuture<ResultSet> submitAsync(final Bytecode bytecode, final RequestOptions options) {
@@ -201,8 +202,14 @@
logger.debug("Initializing client on cluster [{}]", cluster);
cluster.init();
+
initializeImplementation();
+ // throw an error if no host is available even after initialization is complete.
+ if (cluster.availableHosts().isEmpty()) {
+ throw new NoHostAvailableException();
+ }
+
initialized = true;
return this;
}
@@ -223,7 +230,7 @@
* this method to concatenating a Gremlin script from dynamically produced strings and sending it to
* {@link #submit(String)}. Parameterized scripts will perform better.
*
- * @param gremlin the gremlin script to execute
+ * @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
*/
public ResultSet submit(final String gremlin, final Map<String, Object> parameters) {
@@ -263,7 +270,7 @@
* The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
* write of the request completes.
*
- * @param gremlin the gremlin script to execute
+ * @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
*/
public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, Object> parameters) {
@@ -279,15 +286,15 @@
* The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
* write of the request completes.
*
- * @param gremlin the gremlin script to execute
- * @param parameters a map of parameters that will be bound to the script on execution
+ * @param gremlin the gremlin script to execute
+ * @param parameters a map of parameters that will be bound to the script on execution
* @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
* @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
*/
@Deprecated
public CompletableFuture<ResultSet> submitAsync(final String gremlin, final String graphOrTraversalSource,
final Map<String, Object> parameters) {
- Map<String,String> aliases = null;
+ Map<String, String> aliases = null;
if (graphOrTraversalSource != null && !graphOrTraversalSource.isEmpty()) {
aliases = makeDefaultAliasMap(graphOrTraversalSource);
}
@@ -299,15 +306,15 @@
* The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
* write of the request completes.
*
- * @param gremlin the gremlin script to execute
+ * @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
- * @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the
- * script where the key is the alias name and the value represents the global variable on the
- * server
+ * @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the
+ * script where the key is the alias name and the value represents the global variable on the
+ * server
* @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
*/
@Deprecated
- public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String,String> aliases,
+ public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, String> aliases,
final Map<String, Object> parameters) {
final RequestOptions.Builder options = RequestOptions.build();
if (aliases != null && !aliases.isEmpty()) {
@@ -402,8 +409,8 @@
return cluster;
}
- protected Map<String,String> makeDefaultAliasMap(final String graphOrTraversalSource) {
- final Map<String,String> aliases = new HashMap<>();
+ protected Map<String, String> makeDefaultAliasMap(final String graphOrTraversalSource) {
+ final Map<String, String> aliases = new HashMap<>();
aliases.put("g", graphOrTraversalSource);
return aliases;
}
@@ -443,8 +450,8 @@
* this method to concatenating a Gremlin script from dynamically produced strings and sending it to
* {@link #submit(String)}. Parameterized scripts will perform better.
*
- * @param gremlin the gremlin script to execute
- * @param parameters a map of parameters that will be bound to the script on execution
+ * @param gremlin the gremlin script to execute
+ * @param parameters a map of parameters that will be bound to the script on execution
* @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
*/
public ResultSet submit(final String gremlin, final String graphOrTraversalSource, final Map<String, Object> parameters) {
@@ -460,7 +467,7 @@
*/
@Override
public Client alias(final String graphOrTraversalSource) {
- final Map<String,String> aliases = new HashMap<>();
+ final Map<String, String> aliases = new HashMap<>();
aliases.put("g", graphOrTraversalSource);
return alias(aliases);
}
@@ -469,7 +476,7 @@
* {@inheritDoc}
*/
@Override
- public Client alias(final Map<String,String> aliases) {
+ public Client alias(final Map<String, String> aliases) {
return new AliasClusteredClient(this, aliases, settings);
}
@@ -492,7 +499,7 @@
// you can get no possible hosts in more than a few situations. perhaps the servers are just all down.
// or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server).
if (!possibleHosts.hasNext())
- throw new TimeoutException("Timed out while waiting for an available host - check the client configuration and connectivity to the server if this message persists");
+ throw new NoHostAvailableException();
final Host bestHost = possibleHosts.next();
final ConnectionPool pool = hostConnectionPools.get(bestHost);
@@ -504,18 +511,20 @@
*/
@Override
protected void initializeImplementation() {
- cluster.allHosts().forEach(host -> {
- try {
- // hosts that don't initialize connection pools will come up as a dead host
- hostConnectionPools.put(host, new ConnectionPool(host, this));
-
- // added a new host to the cluster so let the load-balancer know
- this.cluster.loadBalancingStrategy().onNew(host);
- } catch (Exception ex) {
- // catch connection errors and prevent them from failing the creation
- logger.warn("Could not initialize connection pool for {} - will try later", host);
+ try {
+ CompletableFuture.allOf(cluster.allHosts().stream()
+ .map(host -> CompletableFuture.runAsync(() -> initializeConnectionSetupForHost.accept(host), cluster.executor()))
+ .toArray(CompletableFuture[]::new))
+ .join();
+ } catch (CompletionException ex) {
+ Throwable cause = null;
+ Throwable result = ex;
+ if (null != (cause = ex.getCause())) {
+ result = cause;
}
- });
+
+ logger.error("", result);
+ }
}
/**
@@ -526,11 +535,27 @@
if (closing.get() != null)
return closing.get();
- final CompletableFuture[] poolCloseFutures = new CompletableFuture[hostConnectionPools.size()];
- hostConnectionPools.values().stream().map(ConnectionPool::closeAsync).collect(Collectors.toList()).toArray(poolCloseFutures);
- closing.set(CompletableFuture.allOf(poolCloseFutures));
+ final CompletableFuture<Void> allPoolsClosedFuture =
+ CompletableFuture.allOf(hostConnectionPools.values().stream()
+ .map(ConnectionPool::closeAsync)
+ .toArray(CompletableFuture[]::new));
+
+ closing.set(allPoolsClosedFuture);
return closing.get();
}
+
+ private Consumer<Host> initializeConnectionSetupForHost = host -> {
+ try {
+ // hosts that don't initialize connection pools will come up as a dead host
+ hostConnectionPools.put(host, new ConnectionPool(host, ClusteredClient.this));
+
+ // added a new host to the cluster so let the load-balancer know
+ ClusteredClient.this.cluster.loadBalancingStrategy().onNew(host);
+ } catch (RuntimeException ex) {
+ final String errMsg = "Could not initialize client for " + host;
+ throw new RuntimeException(errMsg, ex);
+ }
+ };
}
/**
@@ -539,10 +564,10 @@
*/
public static class AliasClusteredClient extends Client {
private final Client client;
- private final Map<String,String> aliases = new HashMap<>();
+ private final Map<String, String> aliases = new HashMap<>();
final CompletableFuture<Void> close = new CompletableFuture<>();
- AliasClusteredClient(final Client client, final Map<String,String> aliases, final Client.Settings settings) {
+ AliasClusteredClient(final Client client, final Map<String, String> aliases, final Client.Settings settings) {
super(client.cluster, settings);
this.client = client;
this.aliases.putAll(aliases);
@@ -559,8 +584,8 @@
// need to call buildMessage() right away to get client specific configurations, that way request specific
// ones can override as needed
final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_BYTECODE)
- .processor("traversal")
- .addArg(Tokens.ARGS_GREMLIN, bytecode));
+ .processor("traversal")
+ .addArg(Tokens.ARGS_GREMLIN, bytecode));
// apply settings if they were made available
options.getBatchSize().ifPresent(batchSize -> request.add(Tokens.ARGS_BATCH_SIZE, batchSize));
@@ -582,7 +607,7 @@
// overrides which should be mucked with
if (!aliases.isEmpty()) {
final Map original = (Map) msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap());
- aliases.forEach((k,v) -> {
+ aliases.forEach((k, v) -> {
if (!original.containsKey(k))
builder.addArg(Tokens.ARGS_ALIASES, aliases);
});
@@ -708,7 +733,12 @@
if (hosts.isEmpty()) throw new IllegalStateException("No available host in the cluster");
Collections.shuffle(hosts);
final Host host = hosts.get(0);
- connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
+
+ try {
+ connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
+ } catch (RuntimeException ex) {
+ logger.error("Could not initialize client for {}", host, ex);
+ }
}
@Override
@@ -758,7 +788,8 @@
public static class Builder {
private Optional<SessionSettings> session = Optional.empty();
- private Builder() {}
+ private Builder() {
+ }
/**
* Enables a session. By default this will create a random session name and configure transactions to be
@@ -842,7 +873,8 @@
private String sessionId = UUID.randomUUID().toString();
private boolean forceClosed = false;
- private Builder() {}
+ private Builder() {
+ }
/**
* If enabled, transactions will be "managed" such that each request will represent a complete transaction.
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 9636df9..26699bc 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -101,7 +101,7 @@
this.maxInProcess = maxInProcess;
this.keepAliveInterval = pool.settings().keepAliveInterval;
- connectionLabel = String.format("Connection{host=%s}", pool.host);
+ connectionLabel = "Connection{host=" + pool.host + "}";
if (cluster.isClosing())
throw new IllegalStateException("Cannot open a connection with the cluster after close() is called");
@@ -128,9 +128,8 @@
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- if(logger.isDebugEnabled()) {
- logger.debug("OnChannelClose callback called for channel {}", channel.id().asShortText());
- }
+ logger.debug("OnChannelClose callback called for channel {}", channel);
+
// Replace the channel if it was not intentionally closed using CloseAsync method.
if (thisConnection.closeFuture.get() == null) {
// delegate the task to worker thread and free up the event loop
@@ -143,11 +142,11 @@
// Default WebSocketChannelizer uses Netty's IdleStateHandler
if (!(channelizer instanceof Channelizer.WebSocketChannelizer)) {
+ logger.debug("Using custom keep alive handler.");
scheduleKeepAlive();
}
- } catch (Exception ie) {
- logger.debug("Error opening connection on {}", uri);
- throw new ConnectionException(uri, "Could not open connection", ie);
+ } catch (Exception ex) {
+ throw new ConnectionException(uri, "Could not open " + this.toString(), ex);
}
}
@@ -273,6 +272,7 @@
// Default WebSocketChannelizer uses Netty's IdleStateHandler
if (!(channelizer instanceof Channelizer.WebSocketChannelizer)) {
+ logger.debug("Using custom keep alive handler.");
scheduleKeepAlive();
}
@@ -409,10 +409,10 @@
/**
* Returns the short ID for the underlying channel for this connection.
* <p>
- * Currently only used for testing.
+ * Visible for testing.
*/
String getChannelId() {
- return (channel != null) ? channel.id().asShortText() : "";
+ return (channel != null) ? channel.id().asShortText() : "null";
}
@Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index f7b35e9..295c9d0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -18,25 +18,23 @@
*/
package org.apache.tinkerpop.gremlin.driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
@@ -85,7 +83,7 @@
this.host = host;
this.client = client;
this.cluster = client.cluster;
- poolLabel = String.format("Connection Pool {host=%s}", host);
+ poolLabel = "Connection Pool {host=" + host + "}";
final Settings.ConnectionPoolSettings settings = settings();
this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
@@ -97,15 +95,38 @@
this.connections = new CopyOnWriteArrayList<>();
try {
+ final List<CompletableFuture<Void>> connCreationFutures = new ArrayList<>();
for (int i = 0; i < minPoolSize; i++) {
- this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+ connCreationFutures.add(CompletableFuture.runAsync(() -> {
+ try {
+ this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+ } catch (ConnectionException e) {
+ throw new CompletionException(e);
+ }
+ }, cluster.executor()));
}
- } catch (ConnectionException ce) {
- // ok if we don't get it initialized here - when a request is attempted in a connection from the
- // pool it will try to create new connections as needed.
- logger.info("Could not initialize connections in pool for {} - pool size at {}", host, this.connections.size(), ce);
- considerHostUnavailable();
+ CompletableFuture.allOf(connCreationFutures.toArray(new CompletableFuture[0])).join();
+ } catch (CancellationException ce) {
+ logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce);
+ throw ce;
+ } catch (CompletionException ce) {
+ // Some connections might have been initialized. Close the connection pool gracefully to close them.
+ this.closeAsync();
+
+ final String errMsg = "Could not initialize " + minPoolSize + " (minPoolSize) connections in pool." +
+ " Successful connections=" + this.connections.size() +
+ ". Closing the connection pool.";
+
+
+ Throwable cause = null;
+ Throwable result = ce;
+
+ if (null != (cause = result.getCause())) {
+ result = cause;
+ }
+
+ throw new CompletionException(errMsg, result);
}
this.open = new AtomicInteger(connections.size());
@@ -316,7 +337,7 @@
try {
connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
} catch (ConnectionException ce) {
- logger.debug("Connections were under max, but there was an error creating the connection.", ce);
+ logger.error("Connections were under max, but there was an error creating the connection.", ce);
open.decrementAndGet();
considerHostUnavailable();
return false;
@@ -350,7 +371,7 @@
// only close the connection for good once it is done being borrowed or when it is dead
if (connection.isDead() || connection.borrowed.get() == 0) {
- if(bin.remove(connection)) {
+ if (bin.remove(connection)) {
connection.closeAsync();
// TODO: Log the following message on completion of the future returned by closeAsync.
logger.debug("{} destroyed", connection.getConnectionInfo());
@@ -396,7 +417,7 @@
logger.debug("Continue to wait for connection on {} if {} > 0", host, remaining);
} while (remaining > 0);
- logger.debug("Timed-out waiting for connection on {} - possibly unavailable", host);
+ logger.error("Timed-out ({} {}) waiting for connection on {} - possibly unavailable", timeout, unit, host);
// if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
// either way supply a function to reconnect
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
index 67101b3..551c9d0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
@@ -23,9 +23,11 @@
import java.util.Optional;
/**
+ * This exception signifies network connection failure.
+ *
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public class ConnectionException extends Exception {
+public class ConnectionException extends RuntimeException {
private URI uri;
private InetSocketAddress address;
@@ -35,6 +37,12 @@
this.uri = uri;
}
+ public ConnectionException(final URI uri, final Throwable cause) {
+ super(cause);
+ this.uri = uri;
+ this.address = null;
+ }
+
public ConnectionException(final URI uri, final String message, final Throwable cause) {
super(message, cause);
this.uri = uri;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
new file mode 100644
index 0000000..e8b3b87
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.exception;
+
+public class NoHostAvailableException extends RuntimeException {
+
+ public NoHostAvailableException() {
+ super("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason.");
+ }
+
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this;
+ }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 7e55b92..3d2df78 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -70,8 +70,10 @@
if (!handshakeFuture.isDone()) {
// channel was closed before the handshake could be completed.
handshakeFuture.setFailure(
- new RuntimeException(String.format("Channel=[%s] closed before the handshake could complete",
- ctx.channel().toString())));
+ new RuntimeException(String.format("WebSocket channel=[%s] closed before the handshake could complete." +
+ " Server logs could contain the reason for abrupt connection disconnect or the " +
+ "server might not be reachable from the client anymore.",
+ ctx.channel().id().asShortText())));
}
super.channelInactive(ctx);
@@ -82,7 +84,7 @@
if (event instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) event;
if (e.state() == IdleState.READER_IDLE) {
- logger.warn("WebSocket connection " + ctx.channel() + " has been idle for too long.");
+ logger.warn("WebSocket connection {} has been idle for too long.", ctx.channel());
} else if (e.state() == IdleState.WRITER_IDLE) {
logger.debug("Sending ping frame to the server");
ctx.writeAndFlush(new PingWebSocketFrame());
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/Serializers.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/Serializers.java
index 47af647..04f6b7c 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/Serializers.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/Serializers.java
@@ -56,24 +56,6 @@
private String value;
- /**
- * Default serializer for results returned from Gremlin Server. This implementation must be of type
- * {@link MessageTextSerializer} so that it can be compatible with text-based websocket messages.
- *
- * @deprecated As of release 3.3.5, not replaced, simply specify the exact version of the serializer to use.
- */
- @Deprecated
- public static final MessageSerializer DEFAULT_RESULT_SERIALIZER = new GraphSONMessageSerializerV1d0();
-
- /**
- * Default serializer for requests received by Gremlin Server. This implementation must be of type
- * {@link MessageTextSerializer} so that it can be compatible with text-based websocket messages.
- *
- * @deprecated As of release 3.3.5, not replaced, simply specify the exact version of the serializer to use.
- */
- @Deprecated
- public static final MessageSerializer DEFAULT_REQUEST_SERIALIZER = new GraphSONMessageSerializerV1d0();
-
Serializers(final String mimeType) {
this.value = mimeType;
}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
new file mode 100644
index 0000000..20f38ba
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Client.ClusteredClient.class, Client.SessionedClient.class, Host.class, Cluster.class})
+public class ClientTest {
+ @Mock
+ private Cluster cluster;
+
+ @Mock
+ private Host mockAvailableHost;
+
+ @Mock
+ private Client.Settings settings;
+
+ private ScheduledExecutorService executor;
+
+ @Before
+ public void setup() {
+ executor = Executors.newScheduledThreadPool(1);
+ when(mockAvailableHost.isAvailable()).thenReturn(true);
+ when(cluster.allHosts()).thenReturn(Collections.singletonList(mockAvailableHost));
+ when(cluster.executor()).thenReturn(executor);
+ }
+
+ @After
+ public void cleanup() {
+ executor.shutdown();
+ }
+
+ @Test(expected = NoHostAvailableException.class)
+ public void shouldThrowErrorWhenConnPoolInitFailsForClusteredClient() throws Exception {
+ Client.ClusteredClient client = new Client.ClusteredClient(cluster, settings);
+ whenNew(ConnectionPool.class).withAnyArguments().thenThrow(new RuntimeException("cannot initialize client"));
+ client.init();
+ }
+
+ @Test(expected = NoHostAvailableException.class)
+ public void shouldThrowErrorWhenConnPoolInitFailsForSessionClient() throws Exception {
+ final Client.SessionSettings sessionSettings = Client.SessionSettings.build().sessionId("my-session-id").create();
+ when(settings.getSession()).thenReturn(Optional.of(sessionSettings));
+ Client.SessionedClient client = new Client.SessionedClient(cluster, settings);
+ whenNew(ConnectionPool.class).withAnyArguments().thenThrow(new RuntimeException("cannot initialize client"));
+ client.init();
+ }
+
+}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
index 3d6bef3..1008653 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
@@ -108,6 +108,7 @@
final ResponseMessage deserialized = serializer.deserializeResponse(serialized);
final SamplePerson actual = (SamplePerson) deserialized.getResult().getData();
+
assertThat(actual, new ReflectionEquals(person));
}
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py
index 089a51a..7bf4fe0 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py
@@ -26,16 +26,19 @@
class TornadoTransport(AbstractBaseTransport):
- def __init__(self, read_timeout=30, write_timeout=30):
+ def __init__(self, read_timeout=30, write_timeout=30,
+ compression_options={'compression_level': 5, 'mem_level': 5}):
self._loop = ioloop.IOLoop(make_current=False)
+ self._ws = None
self._read_timeout = read_timeout
self._write_timeout = write_timeout
+ self._compression_options = compression_options
def connect(self, url, headers=None):
if headers:
url = httpclient.HTTPRequest(url, headers=headers)
self._ws = self._loop.run_sync(
- lambda: websocket.websocket_connect(url))
+ lambda: websocket.websocket_connect(url, compression_options=self._compression_options))
def write(self, message):
self._loop.run_sync(
diff --git a/gremlin-python/src/main/jython/setup.py b/gremlin-python/src/main/jython/setup.py
index 3e630ec..ae4609d 100644
--- a/gremlin-python/src/main/jython/setup.py
+++ b/gremlin-python/src/main/jython/setup.py
@@ -72,7 +72,8 @@
test_suite="tests",
data_files=[("", ["LICENSE", "NOTICE"])],
setup_requires=[
- 'pytest-runner',
+ 'pytest-runner==5.2',
+ 'importlib-metadata<3.0.0'
],
tests_require=[
'pytest>=4.6.4,<5.0.0',
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
index 5a2e1f8..4259d15 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
@@ -102,10 +102,10 @@
// periodically ping the server, but coming from this direction allows the server to kill channels that
// have dead clients on the other end
if (e.state() == IdleState.READER_IDLE) {
- logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel());
+ logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel().id().asShortText());
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE && settings.keepAliveInterval > 0) {
- logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel());
+ logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel().id().asShortText());
ctx.writeAndFlush(channelizer.createIdleDetectionMessage());
}
}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/ServerSerializers.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/ServerSerializers.java
index d59e1ed..3981867 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/ServerSerializers.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/ServerSerializers.java
@@ -20,7 +20,6 @@
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0;
-import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
@@ -31,8 +30,7 @@
/**
* Default serializer used by the server when the serializer requested does not match what is on the server.
- * Using GraphSON 1.0 on 3.3.5 because that's what it has long been set to in previous versions on
- * {@link Serializers#DEFAULT_RESULT_SERIALIZER} which is now deprecated.
+ * Using GraphSON 1.0 on 3.3.5 because that's what it has long been set to in previous versions.
*/
static final MessageSerializer DEFAULT_SERIALIZER = new GraphSONMessageSerializerV1d0();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index db88ef9..623107f 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -28,6 +28,7 @@
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
@@ -64,6 +65,7 @@
import java.awt.Color;
import java.io.File;
+import java.net.ConnectException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@@ -411,7 +413,7 @@
fail("Should not have gone through because the server is not running");
} catch (Exception i) {
final Throwable root = ExceptionUtils.getRootCause(i);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
}
startServer();
@@ -447,7 +449,7 @@
fail("Should not have gone through because the server is not running");
} catch (Exception i) {
final Throwable root = ExceptionUtils.getRootCause(i);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
}
startServer();
@@ -476,6 +478,7 @@
try {
final Client client = cluster.connect();
+ client.init();
// the first host is dead on init. request should succeed on localhost
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
@@ -861,12 +864,29 @@
final Cluster cluster = TestClientFactory.open();
try {
assertEquals(0, cluster.availableHosts().size());
- cluster.connect().init();
+ final Client client1 = cluster.connect().init();
assertEquals(1, cluster.availableHosts().size());
stopServer();
+ // We create a new client here which will fail to initialize but the original client still has
+ // host marked as connected. Since the second client failed during initialization, it has no way to
+ // test if a host is indeed unreachable because it doesn't have any established connections. It will not add
+ // the host to load balancer but it will also not remove it if it already exists there. Leave that
+ // responsibility to a client that added it. In this case, let the second client perform it's own mechanism
+ // to mark host as unavailable. The first client will discover that the host has failed either with next
+ // keepAlive message or the next request, whichever is earlier. In this case, we will simulate the second
+ // scenario by sending a new request on first client. The request would fail (since server is down) and
+ // client should mark the host unavailable.
cluster.connect().init();
+
+ try {
+ client1.submit("1+1").all().join();
+ fail("Expecting an exception because the server is shut down.");
+ } catch (Exception ex) {
+ // ignore the exception
+ }
+
assertEquals(0, cluster.availableHosts().size());
} finally {
cluster.close();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index 807e9a7..f1740c4 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -21,6 +21,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
import org.ietf.jgss.GSSException;
@@ -88,8 +89,7 @@
fail("This should not succeed as the client did not enable SSL");
} catch(Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
- assertEquals(TimeoutException.class, root.getClass());
- assertThat(root.getMessage(), startsWith("Timed out while waiting for an available host"));
+ assertEquals(NoHostAvailableException.class, root.getClass());
} finally {
cluster.close();
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
index ab75b8e..8e1e8fb 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
@@ -27,6 +27,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.junit.Test;
import java.util.Arrays;
@@ -203,7 +204,7 @@
fail("Should throw exception because ssl is enabled on the server but not on client");
} catch(Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -233,7 +234,7 @@
fail("Should throw exception because ssl client auth is enabled on the server but client does not have a cert");
} catch(Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -251,7 +252,7 @@
fail("Should throw exception because ssl client auth is enabled on the server but does not trust client's cert");
} catch(Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -294,7 +295,7 @@
fail("Should throw exception because ssl client auth is enabled on the server but client does not have a cert");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -311,7 +312,7 @@
fail("Should throw exception because ssl client auth is enabled on the server but does not trust client's cert");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -328,7 +329,7 @@
fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -345,7 +346,7 @@
fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -391,7 +392,7 @@
fail("Should throw exception because incorrect keyStoreType is specified");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
@@ -410,7 +411,7 @@
fail("Should throw exception because incorrect trustStoreType is specified");
} catch (Exception x) {
final Throwable root = ExceptionUtils.getRootCause(x);
- assertThat(root, instanceOf(TimeoutException.class));
+ assertThat(root, instanceOf(NoHostAvailableException.class));
} finally {
cluster.close();
}
diff --git a/pom.xml b/pom.xml
index 3a5451b..8fd981c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,6 +160,7 @@
<slf4j.version>1.7.25</slf4j.version>
<snakeyaml.version>1.27</snakeyaml.version>
<spark.version>2.4.0</spark.version>
+ <powermock.version>1.6.4</powermock.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
@@ -721,6 +722,18 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>