Merge branch 'TINKERPOP-2479' into 3.4-dev
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 3d81a53..eed6738 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -57,12 +57,19 @@
 * Deprecated driver `Channelizer` keep-alive related methods.
 * Delegate handling of WebSocket handshake to Netty instead of custom code in Java Driver.
 * Delegate detection of idle connection to Netty instead of custom keep alive logic for `WebSocketChannelizer`.
-* Added support for WebSocket frame compression extension ( [RFC7692](https://tools.ietf.org/html/rfc7692) ) for `WebSocketChannelizer` in Java driver.
+* Added support for WebSocket frame compression extension ( [RFC7692](https://tools.ietf.org/html/rfc7692) ) for `WebSocketChannelizer` in Java/Python driver.
 * Added server support for WebSocket compression extension ( [RFC7692](https://tools.ietf.org/html/rfc7692) ).
 * Fixed bug with Bytecode serialization when `Bytecode.toString()` is used in Javascript.
 * Fixed "toString" for P and TextP to produce valid script representation from bytecode glv steps containing a string predicate in Javascript.
 * Fixed a bug which could cause Java driver to hang when using `ResultSet.statusAttributes()`
+* Fixed bug with `ReservedVerificationStrategy.getConfiguration()` which was omitting the reserved `keys` value.
+* Changed all configuration keys on `AbstractWarningVerificationStrategy` implementations to `public`.
 * Deprecated `BytecodeUtil` and merged its functionality to the existing `BytecodeHelper`.
+* Added configuring implementation in HasStep
+* Remove static initialization for `GraphSONMessageSerializerV1d0` and `GraphSONMessageSerializerV1d0` in Java driver.
+* Connections to the server in a connection pool are created in parallel instead of serially in Java Driver.
+* Connection pools for multiple endpoints are created in parallel instead of serially in Java Driver.
+* Introduced new HostNotAvailable exception to represent cases when no server with active connection is available. 
 
 [[release-3-4-8]]
 === TinkerPop 3.4.8 (Release Date: August 3, 2020)
diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc
index fd769bb..8d9acc8 100644
--- a/docs/src/reference/gremlin-variants.asciidoc
+++ b/docs/src/reference/gremlin-variants.asciidoc
@@ -770,13 +770,21 @@
 |=========================================================
 
 Note that the `transport_factory` can allow for additional configuration of the `TornadoTransport`, which exposes
-options to manage `ioloop` timeouts:
+options to manage `ioloop` timeouts and compression settings:
 
-```python
+[source,python]
+----
 g = traversal().withRemote(
   DriverRemoteConnection('ws://localhost:8182/gremlin','g',
-                         transport_factory=lambda: TornadoTransport(read_timeout=10, write_timeout=10)))
-```
+                         transport_factory=lambda: TornadoTransport(read_timeout=10,
+                                                                    write_timeout=10,
+                                                                    compression_options={'compression_level':5,'mem_level':5})))
+----
+
+Compression configuration options are described in the
+link:https://docs.python.org/3.6/library/zlib.html#zlib.compressobj[zlib documentation]. By default, compression
+settings are configured as shown in the above example.
+
 [[gremlin-python-strategies]]
 === Traversal Strategies
 
diff --git a/docs/src/upgrade/release-3.4.x.asciidoc b/docs/src/upgrade/release-3.4.x.asciidoc
index 913138a..ba5acf1 100644
--- a/docs/src/upgrade/release-3.4.x.asciidoc
+++ b/docs/src/upgrade/release-3.4.x.asciidoc
@@ -28,7 +28,9 @@
 Please see the link:https://github.com/apache/tinkerpop/blob/3.4.9/CHANGELOG.asciidoc#release-3-4-9[changelog] for a
 complete list of all the modifications that are part of this release.
 
-=== Translator Implementations
+=== Upgrading for Users
+
+==== Translator Implementations
 
 One of the silent features of Gremlin is the `ScriptTranslator`. More specifically, the implementation of this
 interface which will convert a `Traversal` object (or Gremlin `Bytecode`) into a proper `String` representation that
@@ -59,7 +61,7 @@
 
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2461[TINKERPOP-2461]
 
-=== Bytecode Command Improvements
+==== Bytecode Command Improvements
 
 The `:bytecode` command in the Gremlin console includes two new options: `reset` and `config`. Both options provide
 ways to better control the `GraphSONMapper` used internally by the command. The `reset` option will replace the current
@@ -69,7 +71,7 @@
 
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2479[TINKERPOP-2479]
 
-=== withStrategies() Groovy Syntax
+==== withStrategies() Groovy Syntax
 
 The `withStrategies()` configuration step accepts a variable number of `TraversalStrategy` instances. In Java, those
 instances are typically constructed with `instance()` if it is a singleton or by way of a builder pattern which
@@ -101,7 +103,7 @@
 
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2466[TINKERPOP-2466]
 
-=== withEmbedded()
+==== withEmbedded()
 
 The `AnonymousTraversalSource` was introduced in 3.3.5 and is most typically used for constructing remote
 `TraversalSource` instances, but it also provides a way to construct a `TraversalSource` from an embedded `Graph`
@@ -143,6 +145,17 @@
 
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2054[TINKERPOP-2054]
 
+==== WebSocket Compression
+
+Gremlin Server now supports standard WebSocket compression (per link:https://tools.ietf.org/html/rfc7692[RFC 7692]).
+Both the Java and Python drivers support this functionality from the client's perspective. Compression is enabled by
+default and should be backward compatible, thus allowing older versions of the driver to connect to newer versions of
+the server and vice versa. Using the compression-enabled drivers with a server that also supports that functionality
+will greatly reduce network IO requirements.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-2441[TINKERPOP-2441],
+link:https://issues.apache.org/jira/browse/TINKERPOP-2453[TINKERPOP-2453]
+
 ==== Per Request Options
 
 With Java it has been possible to pass per-request settings for both scripts and bytecode. While Javascript, Python,
@@ -175,7 +188,7 @@
 link:https://issues.apache.org/jira/browse/TINKERPOP-2420[TINKERPOP-2420],
 link:https://issues.apache.org/jira/browse/TINKERPOP-2421[TINKERPOP-2421]
 
-=== GraphManager Extension
+==== GraphManager Extension
 
 The `org.apache.tinkerpop.gremlin.server.util.CheckedGraphManager` can be used instead of
 `org.apache.tinkerpop.gremlin.server.util.DefaultGraphManager` (in gremlin-server.yml  to ensures that Gremlin Server
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
index 93931c3..1b74508 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
@@ -65,8 +65,10 @@
     public void set(final String key, final Object value) throws IllegalArgumentException, IllegalStateException;
 
     /**
-     * Set the value of the provided key. This is typically called in setup() and/or terminate() of the {@link VertexProgram}.
-     * If this is called during execute(), there is no guarantee as to the ultimately stored value as call order is indeterminate.
+     * Set the value of the provided key. This is typically called in setup() and/or terminate() of the
+     * {@link VertexProgram}. If this is called during execute(), there is no guarantee as to the ultimately stored
+     * value as call order is indeterminate. It is up to the implementation to determine the states in which this
+     * method can be called.
      *
      * @param key   they key to set a value for
      * @param value the value to set for the key
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/HasStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/HasStep.java
index 450f656..6048ba7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/HasStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/HasStep.java
@@ -20,8 +20,10 @@
 
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Configuring;
 import org.apache.tinkerpop.gremlin.process.traversal.step.HasContainerHolder;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Property;
@@ -36,8 +38,9 @@
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class HasStep<S extends Element> extends FilterStep<S> implements HasContainerHolder {
+public class HasStep<S extends Element> extends FilterStep<S> implements HasContainerHolder, Configuring {
 
+    private final Parameters parameters = new Parameters();
     private List<HasContainer> hasContainers;
 
     public HasStep(final Traversal.Admin traversal, final HasContainer... hasContainers) {
@@ -47,6 +50,16 @@
     }
 
     @Override
+    public Parameters getParameters() {
+        return this.parameters;
+    }
+
+    @Override
+    public void configure(final Object... keyValues) {
+        this.parameters.set(null, keyValues);
+    }
+
+    @Override
     protected boolean filter(final Traverser.Admin<S> traverser) {
         // the generic S is defined as Element but Property can also be used with HasStep so this seems to cause
         // problems with some jdk versions.
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TraversalFilterStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TraversalFilterStep.java
index 7391733..ec37a3f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TraversalFilterStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TraversalFilterStep.java
@@ -20,7 +20,9 @@
 
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Configuring;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
@@ -32,7 +34,8 @@
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class TraversalFilterStep<S> extends FilterStep<S> implements TraversalParent {
+public final class TraversalFilterStep<S> extends FilterStep<S> implements TraversalParent, Configuring {
+    private final Parameters parameters = new Parameters();
 
     private Traversal.Admin<S, ?> filterTraversal;
 
@@ -42,6 +45,16 @@
     }
 
     @Override
+    public Parameters getParameters() {
+        return this.parameters;
+    }
+
+    @Override
+    public void configure(final Object... keyValues) {
+        this.parameters.set(null, keyValues);
+    }
+
+    @Override
     protected boolean filter(final Traverser.Admin<S> traverser) {
         return TraversalUtil.test(traverser, this.filterTraversal);
     }
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/AbstractWarningVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/AbstractWarningVerificationStrategy.java
index a7c5a3a..52c9410 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/AbstractWarningVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/AbstractWarningVerificationStrategy.java
@@ -42,8 +42,8 @@
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWarningVerificationStrategy.class);
 
-    protected static final String THROW_EXCEPTION = "throwException";
-    protected static final String LOG_WARNING = "logWarning";
+    public static final String THROW_EXCEPTION = "throwException";
+    public static final String LOG_WARNING = "logWarning";
 
     protected final boolean throwException;
     protected final boolean logWarning;
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategy.java
index 8fe6424..b106884 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.process.traversal.strategy.verification;
 
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Parameterizing;
@@ -31,7 +32,9 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -47,7 +50,7 @@
  */
 public class ReservedKeysVerificationStrategy extends AbstractWarningVerificationStrategy {
 
-    private static final String KEYS = "keys";
+    public static final String KEYS = "keys";
     private static final Set<String> DEFAULT_RESERVED_KEYS = new HashSet<>(Arrays.asList("id", "label"));
     private final Set<String> reservedKeys;
 
@@ -84,6 +87,13 @@
                 .logWarning(configuration.getBoolean(LOG_WARNING, false)).create();
     }
 
+    @Override
+    public Configuration getConfiguration() {
+        final Configuration c = super.getConfiguration();
+        c.setProperty(KEYS, this.reservedKeys);
+        return c;
+    }
+
     public static ReservedKeysVerificationStrategy.Builder build() {
         return new ReservedKeysVerificationStrategy.Builder();
     }
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategyTest.java
index f25f66d..22dcd11 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategyTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ReservedKeysVerificationStrategyTest.java
@@ -34,6 +34,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.function.Predicate;
 import java.util.regex.Pattern;
@@ -81,6 +82,7 @@
                 {"__.addE('knows').property('label','xyz')", __.addE("knows").property("id", "xyz"), false},
                 {"__.addV().property('x','xyz', 'label', 'xxx')", __.addV().property("x", "xyz", "label", "xxx"), false},
                 {"__.addV().property('x','xyz', 'not-Label', 'xxx')", __.addV().property("x", "xyz", "not-label", "xxx"), true},
+                {"__.addV().property('x','xyz', 'not-allowed', 'xxx')", __.addV().property("x", "xyz", "not-allowed", "xxx"), false},
         });
     }
 
@@ -106,7 +108,10 @@
     @Test
     public void shouldOnlyThrow() {
         final TraversalStrategies strategies = new DefaultTraversalStrategies();
-        strategies.addStrategies(ReservedKeysVerificationStrategy.build().throwException().create());
+        final ReservedKeysVerificationStrategy.Builder builder = ReservedKeysVerificationStrategy.build().throwException();
+        if (name.equals("__.addV().property('x','xyz', 'not-allowed', 'xxx')"))
+            builder.reservedKeys(new HashSet<>(Arrays.asList("id", "label", "not-allowed")));
+        strategies.addStrategies(builder.create());
         final Traversal traversal = this.traversal.asAdmin().clone();
         traversal.asAdmin().setStrategies(strategies);
         if (allow) {
@@ -125,7 +130,10 @@
     @Test
     public void shouldOnlyLog() {
         final TraversalStrategies strategies = new DefaultTraversalStrategies();
-        strategies.addStrategies(ReservedKeysVerificationStrategy.build().logWarning().create());
+        final ReservedKeysVerificationStrategy.Builder builder = ReservedKeysVerificationStrategy.build().logWarning();
+        if (name.equals("__.addV().property('x','xyz', 'not-allowed', 'xxx')"))
+            builder.reservedKeys(new HashSet<>(Arrays.asList("id", "label", "not-allowed")));
+        strategies.addStrategies(builder.create());
         final Traversal traversal = this.traversal.asAdmin().clone();
         traversal.asAdmin().setStrategies(strategies);
         traversal.asAdmin().applyStrategies();
@@ -138,7 +146,11 @@
     @Test
     public void shouldThrowAndLog() {
         final TraversalStrategies strategies = new DefaultTraversalStrategies();
-        strategies.addStrategies(ReservedKeysVerificationStrategy.build().throwException().logWarning().create());
+        final ReservedKeysVerificationStrategy.Builder builder = ReservedKeysVerificationStrategy.build().
+                throwException().logWarning();
+        if (name.equals("__.addV().property('x','xyz', 'not-allowed', 'xxx')"))
+            builder.reservedKeys(new HashSet<>(Arrays.asList("id", "label", "not-allowed")));
+        strategies.addStrategies(builder.create());
         final Traversal traversal = this.traversal.asAdmin().clone();
         traversal.asAdmin().setStrategies(strategies);
         if (allow) {
diff --git a/gremlin-driver/pom.xml b/gremlin-driver/pom.xml
index bd997a6..7bffa41 100644
--- a/gremlin-driver/pom.xml
+++ b/gremlin-driver/pom.xml
@@ -94,6 +94,16 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
             <scope>test</scope>
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 596910a..5460e75 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -47,6 +47,7 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static java.lang.Math.toIntExact;
 
@@ -255,8 +256,16 @@
                 // forgot to enable it or perhaps the server is not configured for websockets.
                 handler.handshakeFuture().sync();
             } catch (Exception ex) {
-                throw new RuntimeException(new ConnectionException(connection.getUri(),
-                        "Could not complete websocket handshake - ensure that client protocol matches server", ex));
+                String errMsg = "";
+                if (ex instanceof TimeoutException) {
+                    errMsg = "Timed out while waiting to complete the connection setup. Consider increasing the " +
+                            "WebSocket handshake timeout duration.";
+                } else {
+                    errMsg = "Could not complete connection setup to the server. Ensure that SSL is correctly " +
+                            "configured at both the client and the server. Ensure that client WebSocket handshake " +
+                            "protocol matches the server. Ensure that the server is still reachable.";
+                }
+                throw new ConnectionException(connection.getUri(), errMsg, ex);
             }
         }
     }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 9cbc337..af4ab31 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -18,7 +18,10 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -26,8 +29,6 @@
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,11 +38,13 @@
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -105,7 +108,7 @@
      * one or more globally defined {@link Graph} or {@link TraversalSource} server bindings for the context of
      * the created {@code Client}.
      */
-    public Client alias(final Map<String,String> aliases) {
+    public Client alias(final Map<String, String> aliases) {
         return new AliasClusteredClient(this, aliases, settings);
     }
 
@@ -154,8 +157,7 @@
      * A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
      *
      * @param bytecode request in the form of gremlin {@link Bytecode}
-     * @param options for the request
-     *
+     * @param options  for the request
      * @see #submit(Bytecode)
      */
     public ResultSet submit(final Bytecode bytecode, final RequestOptions options) {
@@ -181,8 +183,7 @@
      * A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
      *
      * @param bytecode request in the form of gremlin {@link Bytecode}
-     * @param options for the request
-     *
+     * @param options  for the request
      * @see #submitAsync(Bytecode)
      */
     public CompletableFuture<ResultSet> submitAsync(final Bytecode bytecode, final RequestOptions options) {
@@ -201,8 +202,14 @@
         logger.debug("Initializing client on cluster [{}]", cluster);
 
         cluster.init();
+
         initializeImplementation();
 
+        // throw an error if no host is available even after initialization is complete.
+        if (cluster.availableHosts().isEmpty()) {
+            throw new NoHostAvailableException();
+        }
+
         initialized = true;
         return this;
     }
@@ -223,7 +230,7 @@
      * this method to concatenating a Gremlin script from dynamically produced strings and sending it to
      * {@link #submit(String)}.  Parameterized scripts will perform better.
      *
-     * @param gremlin the gremlin script to execute
+     * @param gremlin    the gremlin script to execute
      * @param parameters a map of parameters that will be bound to the script on execution
      */
     public ResultSet submit(final String gremlin, final Map<String, Object> parameters) {
@@ -263,7 +270,7 @@
      * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
      * write of the request completes.
      *
-     * @param gremlin the gremlin script to execute
+     * @param gremlin    the gremlin script to execute
      * @param parameters a map of parameters that will be bound to the script on execution
      */
     public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, Object> parameters) {
@@ -279,15 +286,15 @@
      * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
      * write of the request completes.
      *
-     * @param gremlin the gremlin script to execute
-     * @param parameters a map of parameters that will be bound to the script on execution
+     * @param gremlin                the gremlin script to execute
+     * @param parameters             a map of parameters that will be bound to the script on execution
      * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
      * @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
      */
     @Deprecated
     public CompletableFuture<ResultSet> submitAsync(final String gremlin, final String graphOrTraversalSource,
                                                     final Map<String, Object> parameters) {
-        Map<String,String> aliases = null;
+        Map<String, String> aliases = null;
         if (graphOrTraversalSource != null && !graphOrTraversalSource.isEmpty()) {
             aliases = makeDefaultAliasMap(graphOrTraversalSource);
         }
@@ -299,15 +306,15 @@
      * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
      * write of the request completes.
      *
-     * @param gremlin the gremlin script to execute
+     * @param gremlin    the gremlin script to execute
      * @param parameters a map of parameters that will be bound to the script on execution
-     * @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the
-     *                script where the key is the alias name and the value represents the global variable on the
-     *                server
+     * @param aliases    aliases the specified global Gremlin Server variable some other name that then be used in the
+     *                   script where the key is the alias name and the value represents the global variable on the
+     *                   server
      * @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
      */
     @Deprecated
-    public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String,String> aliases,
+    public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, String> aliases,
                                                     final Map<String, Object> parameters) {
         final RequestOptions.Builder options = RequestOptions.build();
         if (aliases != null && !aliases.isEmpty()) {
@@ -402,8 +409,8 @@
         return cluster;
     }
 
-    protected Map<String,String> makeDefaultAliasMap(final String graphOrTraversalSource) {
-        final Map<String,String> aliases = new HashMap<>();
+    protected Map<String, String> makeDefaultAliasMap(final String graphOrTraversalSource) {
+        final Map<String, String> aliases = new HashMap<>();
         aliases.put("g", graphOrTraversalSource);
         return aliases;
     }
@@ -443,8 +450,8 @@
          * this method to concatenating a Gremlin script from dynamically produced strings and sending it to
          * {@link #submit(String)}.  Parameterized scripts will perform better.
          *
-         * @param gremlin the gremlin script to execute
-         * @param parameters a map of parameters that will be bound to the script on execution
+         * @param gremlin                the gremlin script to execute
+         * @param parameters             a map of parameters that will be bound to the script on execution
          * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
          */
         public ResultSet submit(final String gremlin, final String graphOrTraversalSource, final Map<String, Object> parameters) {
@@ -460,7 +467,7 @@
          */
         @Override
         public Client alias(final String graphOrTraversalSource) {
-            final Map<String,String> aliases = new HashMap<>();
+            final Map<String, String> aliases = new HashMap<>();
             aliases.put("g", graphOrTraversalSource);
             return alias(aliases);
         }
@@ -469,7 +476,7 @@
          * {@inheritDoc}
          */
         @Override
-        public Client alias(final Map<String,String> aliases) {
+        public Client alias(final Map<String, String> aliases) {
             return new AliasClusteredClient(this, aliases, settings);
         }
 
@@ -492,7 +499,7 @@
             // you can get no possible hosts in more than a few situations. perhaps the servers are just all down.
             // or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server).
             if (!possibleHosts.hasNext())
-                throw new TimeoutException("Timed out while waiting for an available host - check the client configuration and connectivity to the server if this message persists");
+                throw new NoHostAvailableException();
 
             final Host bestHost = possibleHosts.next();
             final ConnectionPool pool = hostConnectionPools.get(bestHost);
@@ -504,18 +511,20 @@
          */
         @Override
         protected void initializeImplementation() {
-            cluster.allHosts().forEach(host -> {
-                try {
-                    // hosts that don't initialize connection pools will come up as a dead host
-                    hostConnectionPools.put(host, new ConnectionPool(host, this));
-
-                    // added a new host to the cluster so let the load-balancer know
-                    this.cluster.loadBalancingStrategy().onNew(host);
-                } catch (Exception ex) {
-                    // catch connection errors and prevent them from failing the creation
-                    logger.warn("Could not initialize connection pool for {} - will try later", host);
+            try {
+                CompletableFuture.allOf(cluster.allHosts().stream()
+                        .map(host -> CompletableFuture.runAsync(() -> initializeConnectionSetupForHost.accept(host), cluster.executor()))
+                        .toArray(CompletableFuture[]::new))
+                        .join();
+            } catch (CompletionException ex) {
+                Throwable cause = null;
+                Throwable result = ex;
+                if (null != (cause = ex.getCause())) {
+                    result = cause;
                 }
-            });
+
+                logger.error("", result);
+            }
         }
 
         /**
@@ -526,11 +535,27 @@
             if (closing.get() != null)
                 return closing.get();
 
-            final CompletableFuture[] poolCloseFutures = new CompletableFuture[hostConnectionPools.size()];
-            hostConnectionPools.values().stream().map(ConnectionPool::closeAsync).collect(Collectors.toList()).toArray(poolCloseFutures);
-            closing.set(CompletableFuture.allOf(poolCloseFutures));
+            final CompletableFuture<Void> allPoolsClosedFuture =
+                    CompletableFuture.allOf(hostConnectionPools.values().stream()
+                            .map(ConnectionPool::closeAsync)
+                            .toArray(CompletableFuture[]::new));
+
+            closing.set(allPoolsClosedFuture);
             return closing.get();
         }
+
+        private Consumer<Host> initializeConnectionSetupForHost = host -> {
+            try {
+                // hosts that don't initialize connection pools will come up as a dead host
+                hostConnectionPools.put(host, new ConnectionPool(host, ClusteredClient.this));
+
+                // added a new host to the cluster so let the load-balancer know
+                ClusteredClient.this.cluster.loadBalancingStrategy().onNew(host);
+            } catch (RuntimeException ex) {
+                final String errMsg = "Could not initialize client for " + host;
+                throw new RuntimeException(errMsg, ex);
+            }
+        };
     }
 
     /**
@@ -539,10 +564,10 @@
      */
     public static class AliasClusteredClient extends Client {
         private final Client client;
-        private final Map<String,String> aliases = new HashMap<>();
+        private final Map<String, String> aliases = new HashMap<>();
         final CompletableFuture<Void> close = new CompletableFuture<>();
 
-        AliasClusteredClient(final Client client, final Map<String,String> aliases, final Client.Settings settings) {
+        AliasClusteredClient(final Client client, final Map<String, String> aliases, final Client.Settings settings) {
             super(client.cluster, settings);
             this.client = client;
             this.aliases.putAll(aliases);
@@ -559,8 +584,8 @@
                 // need to call buildMessage() right away to get client specific configurations, that way request specific
                 // ones can override as needed
                 final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_BYTECODE)
-                                                                                  .processor("traversal")
-                                                                                  .addArg(Tokens.ARGS_GREMLIN, bytecode));
+                        .processor("traversal")
+                        .addArg(Tokens.ARGS_GREMLIN, bytecode));
 
                 // apply settings if they were made available
                 options.getBatchSize().ifPresent(batchSize -> request.add(Tokens.ARGS_BATCH_SIZE, batchSize));
@@ -582,7 +607,7 @@
             // overrides which should be mucked with
             if (!aliases.isEmpty()) {
                 final Map original = (Map) msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap());
-                aliases.forEach((k,v) -> {
+                aliases.forEach((k, v) -> {
                     if (!original.containsKey(k))
                         builder.addArg(Tokens.ARGS_ALIASES, aliases);
                 });
@@ -708,7 +733,12 @@
             if (hosts.isEmpty()) throw new IllegalStateException("No available host in the cluster");
             Collections.shuffle(hosts);
             final Host host = hosts.get(0);
-            connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
+
+            try {
+                connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
+            } catch (RuntimeException ex) {
+                logger.error("Could not initialize client for {}", host, ex);
+            }
         }
 
         @Override
@@ -758,7 +788,8 @@
         public static class Builder {
             private Optional<SessionSettings> session = Optional.empty();
 
-            private Builder() {}
+            private Builder() {
+            }
 
             /**
              * Enables a session. By default this will create a random session name and configure transactions to be
@@ -842,7 +873,8 @@
             private String sessionId = UUID.randomUUID().toString();
             private boolean forceClosed = false;
 
-            private Builder() {}
+            private Builder() {
+            }
 
             /**
              * If enabled, transactions will be "managed" such that each request will represent a complete transaction.
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 9636df9..26699bc 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -101,7 +101,7 @@
         this.maxInProcess = maxInProcess;
         this.keepAliveInterval = pool.settings().keepAliveInterval;
 
-        connectionLabel = String.format("Connection{host=%s}", pool.host);
+        connectionLabel = "Connection{host=" + pool.host + "}";
 
         if (cluster.isClosing())
             throw new IllegalStateException("Cannot open a connection with the cluster after close() is called");
@@ -128,9 +128,8 @@
             channel.closeFuture().addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
-                    if(logger.isDebugEnabled()) {
-                        logger.debug("OnChannelClose callback called for channel {}", channel.id().asShortText());
-                    }
+                    logger.debug("OnChannelClose callback called for channel {}", channel);
+
                     // Replace the channel if it was not intentionally closed using CloseAsync method.
                     if (thisConnection.closeFuture.get() == null) {
                         // delegate the task to worker thread and free up the event loop
@@ -143,11 +142,11 @@
 
             // Default WebSocketChannelizer uses Netty's IdleStateHandler
             if (!(channelizer instanceof Channelizer.WebSocketChannelizer)) {
+                logger.debug("Using custom keep alive handler.");
                 scheduleKeepAlive();
             }
-        } catch (Exception ie) {
-            logger.debug("Error opening connection on {}", uri);
-            throw new ConnectionException(uri, "Could not open connection", ie);
+        } catch (Exception ex) {
+            throw new ConnectionException(uri, "Could not open " + this.toString(), ex);
         }
     }
 
@@ -273,6 +272,7 @@
 
         // Default WebSocketChannelizer uses Netty's IdleStateHandler
         if (!(channelizer instanceof Channelizer.WebSocketChannelizer)) {
+            logger.debug("Using custom keep alive handler.");
             scheduleKeepAlive();
         }
 
@@ -409,10 +409,10 @@
     /**
      * Returns the short ID for the underlying channel for this connection.
      * <p>
-     * Currently only used for testing.
+     * Visible for testing.
      */
     String getChannelId() {
-        return (channel != null) ? channel.id().asShortText() : "";
+        return (channel != null) ? channel.id().asShortText() : "null";
     }
 
     @Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index f7b35e9..295c9d0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -18,25 +18,23 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.util.TimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
@@ -85,7 +83,7 @@
         this.host = host;
         this.client = client;
         this.cluster = client.cluster;
-        poolLabel = String.format("Connection Pool {host=%s}", host);
+        poolLabel = "Connection Pool {host=" + host + "}";
 
         final Settings.ConnectionPoolSettings settings = settings();
         this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
@@ -97,15 +95,38 @@
         this.connections = new CopyOnWriteArrayList<>();
 
         try {
+            final List<CompletableFuture<Void>> connCreationFutures = new ArrayList<>();
             for (int i = 0; i < minPoolSize; i++) {
-                this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+                connCreationFutures.add(CompletableFuture.runAsync(() -> {
+                    try {
+                        this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+                    } catch (ConnectionException e) {
+                        throw new CompletionException(e);
+                    }
+                }, cluster.executor()));
             }
 
-        } catch (ConnectionException ce) {
-            // ok if we don't get it initialized here - when a request is attempted in a connection from the
-            // pool it will try to create new connections as needed.
-            logger.info("Could not initialize connections in pool for {} - pool size at {}", host, this.connections.size(), ce);
-            considerHostUnavailable();
+            CompletableFuture.allOf(connCreationFutures.toArray(new CompletableFuture[0])).join();
+        } catch (CancellationException ce) {
+            logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce);
+            throw ce;
+        } catch (CompletionException ce) {
+            // Some connections might have been initialized. Close the connection pool gracefully to close them.
+            this.closeAsync();
+
+            final String errMsg = "Could not initialize " + minPoolSize + " (minPoolSize) connections in pool." +
+                    " Successful connections=" + this.connections.size() +
+                    ". Closing the connection pool.";
+
+
+            Throwable cause = null;
+            Throwable result = ce;
+
+            if (null != (cause = result.getCause())) {
+                result = cause;
+            }
+
+            throw new CompletionException(errMsg, result);
         }
 
         this.open = new AtomicInteger(connections.size());
@@ -316,7 +337,7 @@
         try {
             connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
         } catch (ConnectionException ce) {
-            logger.debug("Connections were under max, but there was an error creating the connection.", ce);
+            logger.error("Connections were under max, but there was an error creating the connection.", ce);
             open.decrementAndGet();
             considerHostUnavailable();
             return false;
@@ -350,7 +371,7 @@
 
         // only close the connection for good once it is done being borrowed or when it is dead
         if (connection.isDead() || connection.borrowed.get() == 0) {
-            if(bin.remove(connection)) {
+            if (bin.remove(connection)) {
                 connection.closeAsync();
                 // TODO: Log the following message on completion of the future returned by closeAsync.
                 logger.debug("{} destroyed", connection.getConnectionInfo());
@@ -396,7 +417,7 @@
             logger.debug("Continue to wait for connection on {} if {} > 0", host, remaining);
         } while (remaining > 0);
 
-        logger.debug("Timed-out waiting for connection on {} - possibly unavailable", host);
+        logger.error("Timed-out ({} {}) waiting for connection on {} - possibly unavailable", timeout, unit, host);
 
         // if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
         // either way supply a function to reconnect
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
index 67101b3..551c9d0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
@@ -23,9 +23,11 @@
 import java.util.Optional;
 
 /**
+ * This exception signifies network connection failure.
+ *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public class ConnectionException extends Exception {
+public class ConnectionException extends RuntimeException {
     private URI uri;
     private InetSocketAddress address;
 
@@ -35,6 +37,12 @@
         this.uri = uri;
     }
 
+    public ConnectionException(final URI uri, final Throwable cause) {
+        super(cause);
+        this.uri = uri;
+        this.address = null;
+    }
+
     public ConnectionException(final URI uri, final String message, final Throwable cause) {
         super(message, cause);
         this.uri = uri;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
new file mode 100644
index 0000000..e8b3b87
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.exception;
+
+public class NoHostAvailableException extends RuntimeException {
+
+    public NoHostAvailableException() {
+        super("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason.");
+    }
+
+    @Override
+    public synchronized Throwable fillInStackTrace() {
+        return this;
+    }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 7e55b92..3d2df78 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -70,8 +70,10 @@
         if (!handshakeFuture.isDone()) {
             // channel was closed before the handshake could be completed.
             handshakeFuture.setFailure(
-                    new RuntimeException(String.format("Channel=[%s] closed before the handshake could complete",
-                            ctx.channel().toString())));
+                    new RuntimeException(String.format("WebSocket channel=[%s] closed before the handshake could complete." +
+                                    " Server logs could contain the reason for abrupt connection disconnect or the " +
+                                    "server might not be reachable from the client anymore.",
+                            ctx.channel().id().asShortText())));
         }
 
         super.channelInactive(ctx);
@@ -82,7 +84,7 @@
         if (event instanceof IdleStateEvent) {
             IdleStateEvent e = (IdleStateEvent) event;
             if (e.state() == IdleState.READER_IDLE) {
-                logger.warn("WebSocket connection " + ctx.channel() + " has been idle for too long.");
+                logger.warn("WebSocket connection {} has been idle for too long.", ctx.channel());
             } else if (e.state() == IdleState.WRITER_IDLE) {
                 logger.debug("Sending ping frame to the server");
                 ctx.writeAndFlush(new PingWebSocketFrame());
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/Serializers.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/Serializers.java
index 47af647..04f6b7c 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/Serializers.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/Serializers.java
@@ -56,24 +56,6 @@
 
     private String value;
 
-    /**
-     * Default serializer for results returned from Gremlin Server. This implementation must be of type
-     * {@link MessageTextSerializer} so that it can be compatible with text-based websocket messages.
-     *
-     * @deprecated As of release 3.3.5, not replaced, simply specify the exact version of the serializer to use.
-     */
-    @Deprecated
-    public static final MessageSerializer DEFAULT_RESULT_SERIALIZER = new GraphSONMessageSerializerV1d0();
-
-    /**
-     * Default serializer for requests received by Gremlin Server. This implementation must be of type
-     * {@link MessageTextSerializer} so that it can be compatible with text-based websocket messages.
-     *
-     * @deprecated As of release 3.3.5, not replaced, simply specify the exact version of the serializer to use.
-     */
-    @Deprecated
-    public static final MessageSerializer DEFAULT_REQUEST_SERIALIZER = new GraphSONMessageSerializerV1d0();
-
     Serializers(final String mimeType) {
         this.value = mimeType;
     }
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
new file mode 100644
index 0000000..20f38ba
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Client.ClusteredClient.class, Client.SessionedClient.class, Host.class, Cluster.class})
+public class ClientTest {
+    @Mock
+    private Cluster cluster;
+
+    @Mock
+    private Host mockAvailableHost;
+
+    @Mock
+    private Client.Settings settings;
+
+    private ScheduledExecutorService executor;
+
+    @Before
+    public void setup() {
+        executor = Executors.newScheduledThreadPool(1);
+        when(mockAvailableHost.isAvailable()).thenReturn(true);
+        when(cluster.allHosts()).thenReturn(Collections.singletonList(mockAvailableHost));
+        when(cluster.executor()).thenReturn(executor);
+    }
+
+    @After
+    public void cleanup() {
+        executor.shutdown();
+    }
+
+    @Test(expected = NoHostAvailableException.class)
+    public void shouldThrowErrorWhenConnPoolInitFailsForClusteredClient() throws Exception {
+        Client.ClusteredClient client = new Client.ClusteredClient(cluster, settings);
+        whenNew(ConnectionPool.class).withAnyArguments().thenThrow(new RuntimeException("cannot initialize client"));
+        client.init();
+    }
+
+    @Test(expected = NoHostAvailableException.class)
+    public void shouldThrowErrorWhenConnPoolInitFailsForSessionClient() throws Exception {
+        final Client.SessionSettings sessionSettings = Client.SessionSettings.build().sessionId("my-session-id").create();
+        when(settings.getSession()).thenReturn(Optional.of(sessionSettings));
+        Client.SessionedClient client = new Client.SessionedClient(cluster, settings);
+        whenNew(ConnectionPool.class).withAnyArguments().thenThrow(new RuntimeException("cannot initialize client"));
+        client.init();
+    }
+
+}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
index 3d6bef3..1008653 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
@@ -108,6 +108,7 @@
         final ResponseMessage deserialized = serializer.deserializeResponse(serialized);
 
         final SamplePerson actual = (SamplePerson) deserialized.getResult().getData();
+
         assertThat(actual, new ReflectionEquals(person));
     }
 
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py
index 089a51a..7bf4fe0 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py
@@ -26,16 +26,19 @@
 
 class TornadoTransport(AbstractBaseTransport):
 
-    def __init__(self, read_timeout=30, write_timeout=30):
+    def __init__(self, read_timeout=30, write_timeout=30,
+                 compression_options={'compression_level': 5, 'mem_level': 5}):
         self._loop = ioloop.IOLoop(make_current=False)
+        self._ws = None
         self._read_timeout = read_timeout
         self._write_timeout = write_timeout
+        self._compression_options = compression_options
 
     def connect(self, url, headers=None):
         if headers:
             url = httpclient.HTTPRequest(url, headers=headers)
         self._ws = self._loop.run_sync(
-            lambda: websocket.websocket_connect(url))
+            lambda: websocket.websocket_connect(url, compression_options=self._compression_options))
 
     def write(self, message):
         self._loop.run_sync(
diff --git a/gremlin-python/src/main/jython/setup.py b/gremlin-python/src/main/jython/setup.py
index 3e630ec..ae4609d 100644
--- a/gremlin-python/src/main/jython/setup.py
+++ b/gremlin-python/src/main/jython/setup.py
@@ -72,7 +72,8 @@
     test_suite="tests",
     data_files=[("", ["LICENSE", "NOTICE"])],
     setup_requires=[
-        'pytest-runner',
+        'pytest-runner==5.2',
+        'importlib-metadata<3.0.0'
     ],
     tests_require=[
         'pytest>=4.6.4,<5.0.0',
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
index 5a2e1f8..4259d15 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
@@ -102,10 +102,10 @@
             // periodically ping the server, but coming from this direction allows the server to kill channels that
             // have dead clients on the other end
             if (e.state() == IdleState.READER_IDLE) {
-                logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel());
+                logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel().id().asShortText());
                 ctx.close();
             } else if (e.state() == IdleState.WRITER_IDLE && settings.keepAliveInterval > 0) {
-                logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel());
+                logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel().id().asShortText());
                 ctx.writeAndFlush(channelizer.createIdleDetectionMessage());
             }
         }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/ServerSerializers.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/ServerSerializers.java
index d59e1ed..3981867 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/ServerSerializers.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/ServerSerializers.java
@@ -20,7 +20,6 @@
 
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0;
-import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
@@ -31,8 +30,7 @@
 
     /**
      * Default serializer used by the server when the serializer requested does not match what is on the server.
-     * Using GraphSON 1.0 on 3.3.5 because that's what it has long been set to in previous versions on
-     * {@link Serializers#DEFAULT_RESULT_SERIALIZER} which is now deprecated.
+     * Using GraphSON 1.0 on 3.3.5 because that's what it has long been set to in previous versions.
      */
     static final MessageSerializer DEFAULT_SERIALIZER = new GraphSONMessageSerializerV1d0();
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index db88ef9..623107f 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -28,6 +28,7 @@
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
@@ -64,6 +65,7 @@
 
 import java.awt.Color;
 import java.io.File;
+import java.net.ConnectException;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -411,7 +413,7 @@
                 fail("Should not have gone through because the server is not running");
             } catch (Exception i) {
                 final Throwable root = ExceptionUtils.getRootCause(i);
-                assertThat(root, instanceOf(TimeoutException.class));
+                assertThat(root, instanceOf(NoHostAvailableException.class));
             }
 
             startServer();
@@ -447,7 +449,7 @@
             fail("Should not have gone through because the server is not running");
         } catch (Exception i) {
             final Throwable root = ExceptionUtils.getRootCause(i);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         }
 
         startServer();
@@ -476,6 +478,7 @@
 
         try {
             final Client client = cluster.connect();
+            client.init();
 
             // the first host is dead on init.  request should succeed on localhost
             assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
@@ -861,12 +864,29 @@
         final Cluster cluster = TestClientFactory.open();
         try {
             assertEquals(0, cluster.availableHosts().size());
-            cluster.connect().init();
+            final Client client1 = cluster.connect().init();
             assertEquals(1, cluster.availableHosts().size());
 
             stopServer();
 
+            // We create a new client here which will fail to initialize but the original client still has
+            // host marked as connected. Since the second client failed during initialization, it has no way to
+            // test if a host is indeed unreachable because it doesn't have any established connections. It will not add
+            // the host to load balancer but it will also not remove it if it already exists there. Leave that
+            // responsibility to a client that added it. In this case, let the second client perform it's own mechanism
+            // to mark host as unavailable. The first client will discover that the host has failed either with next
+            // keepAlive message or the next request, whichever is earlier. In this case, we will simulate the second
+            // scenario by sending a new request on first client. The request would fail (since server is down) and
+            // client should mark the host unavailable.
             cluster.connect().init();
+
+            try {
+                client1.submit("1+1").all().join();
+                fail("Expecting an exception because the server is shut down.");
+            } catch (Exception ex) {
+                // ignore the exception
+            }
+
             assertEquals(0, cluster.availableHosts().size());
         } finally {
             cluster.close();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index 807e9a7..f1740c4 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -21,6 +21,7 @@
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.ietf.jgss.GSSException;
@@ -88,8 +89,7 @@
             fail("This should not succeed as the client did not enable SSL");
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertEquals(TimeoutException.class, root.getClass());
-            assertThat(root.getMessage(), startsWith("Timed out while waiting for an available host"));
+            assertEquals(NoHostAvailableException.class, root.getClass());
         } finally {
             cluster.close();
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
index ab75b8e..8e1e8fb 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
@@ -27,6 +27,7 @@
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -203,7 +204,7 @@
             fail("Should throw exception because ssl is enabled on the server but not on client");
         } catch(Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -233,7 +234,7 @@
             fail("Should throw exception because ssl client auth is enabled on the server but client does not have a cert");
         } catch(Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -251,7 +252,7 @@
             fail("Should throw exception because ssl client auth is enabled on the server but does not trust client's cert");
         } catch(Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -294,7 +295,7 @@
             fail("Should throw exception because ssl client auth is enabled on the server but client does not have a cert");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -311,7 +312,7 @@
             fail("Should throw exception because ssl client auth is enabled on the server but does not trust client's cert");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -328,7 +329,7 @@
             fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -345,7 +346,7 @@
             fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -391,7 +392,7 @@
             fail("Should throw exception because incorrect keyStoreType is specified");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -410,7 +411,7 @@
             fail("Should throw exception because incorrect trustStoreType is specified");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
diff --git a/pom.xml b/pom.xml
index 3a5451b..8fd981c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,6 +160,7 @@
         <slf4j.version>1.7.25</slf4j.version>
         <snakeyaml.version>1.27</snakeyaml.version>
         <spark.version>2.4.0</spark.version>
+        <powermock.version>1.6.4</powermock.version>
 
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
@@ -721,6 +722,18 @@
                 </exclusions>
             </dependency>
             <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock-module-junit4</artifactId>
+                <version>${powermock.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock-api-mockito</artifactId>
+                <version>${powermock.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
                 <groupId>org.hamcrest</groupId>
                 <artifactId>hamcrest-all</artifactId>
                 <version>1.3</version>