TINKERPOP-2445 Parallelize connection creation (#1360)

Changes include:
- Added a new NoHostAvailableException
- Changed ConnectionException to extend RuntimeException
- Add powermock dependency 
- Do not try to reconnect to host if the connection pool initialization to that host has failed
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 4343093..4eb11fc 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -64,6 +64,9 @@
 * Deprecated `BytecodeUtil` and merged its functionality to the existing `BytecodeHelper`.
 * Added configuring implementation in HasStep
 * Remove static initialization for `GraphSONMessageSerializerV1d0` and `GraphSONMessageSerializerV1d0` in Java driver.
+* Connections to the server in a connection pool are created in parallel instead of serially in Java Driver.
+* Connection pools for multiple endpoints are created in parallel instead of serially in Java Driver.
+* Introduced new HostNotAvailable exception to represent cases when no server with active connection is available. 
 
 [[release-3-4-8]]
 === TinkerPop 3.4.8 (Release Date: August 3, 2020)
diff --git a/gremlin-driver/pom.xml b/gremlin-driver/pom.xml
index bd997a6..7bffa41 100644
--- a/gremlin-driver/pom.xml
+++ b/gremlin-driver/pom.xml
@@ -94,6 +94,16 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
             <scope>test</scope>
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 596910a..5460e75 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -47,6 +47,7 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static java.lang.Math.toIntExact;
 
@@ -255,8 +256,16 @@
                 // forgot to enable it or perhaps the server is not configured for websockets.
                 handler.handshakeFuture().sync();
             } catch (Exception ex) {
-                throw new RuntimeException(new ConnectionException(connection.getUri(),
-                        "Could not complete websocket handshake - ensure that client protocol matches server", ex));
+                String errMsg = "";
+                if (ex instanceof TimeoutException) {
+                    errMsg = "Timed out while waiting to complete the connection setup. Consider increasing the " +
+                            "WebSocket handshake timeout duration.";
+                } else {
+                    errMsg = "Could not complete connection setup to the server. Ensure that SSL is correctly " +
+                            "configured at both the client and the server. Ensure that client WebSocket handshake " +
+                            "protocol matches the server. Ensure that the server is still reachable.";
+                }
+                throw new ConnectionException(connection.getUri(), errMsg, ex);
             }
         }
     }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 9cbc337..af4ab31 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -18,7 +18,10 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -26,8 +29,6 @@
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,11 +38,13 @@
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -105,7 +108,7 @@
      * one or more globally defined {@link Graph} or {@link TraversalSource} server bindings for the context of
      * the created {@code Client}.
      */
-    public Client alias(final Map<String,String> aliases) {
+    public Client alias(final Map<String, String> aliases) {
         return new AliasClusteredClient(this, aliases, settings);
     }
 
@@ -154,8 +157,7 @@
      * A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
      *
      * @param bytecode request in the form of gremlin {@link Bytecode}
-     * @param options for the request
-     *
+     * @param options  for the request
      * @see #submit(Bytecode)
      */
     public ResultSet submit(final Bytecode bytecode, final RequestOptions options) {
@@ -181,8 +183,7 @@
      * A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
      *
      * @param bytecode request in the form of gremlin {@link Bytecode}
-     * @param options for the request
-     *
+     * @param options  for the request
      * @see #submitAsync(Bytecode)
      */
     public CompletableFuture<ResultSet> submitAsync(final Bytecode bytecode, final RequestOptions options) {
@@ -201,8 +202,14 @@
         logger.debug("Initializing client on cluster [{}]", cluster);
 
         cluster.init();
+
         initializeImplementation();
 
+        // throw an error if no host is available even after initialization is complete.
+        if (cluster.availableHosts().isEmpty()) {
+            throw new NoHostAvailableException();
+        }
+
         initialized = true;
         return this;
     }
@@ -223,7 +230,7 @@
      * this method to concatenating a Gremlin script from dynamically produced strings and sending it to
      * {@link #submit(String)}.  Parameterized scripts will perform better.
      *
-     * @param gremlin the gremlin script to execute
+     * @param gremlin    the gremlin script to execute
      * @param parameters a map of parameters that will be bound to the script on execution
      */
     public ResultSet submit(final String gremlin, final Map<String, Object> parameters) {
@@ -263,7 +270,7 @@
      * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
      * write of the request completes.
      *
-     * @param gremlin the gremlin script to execute
+     * @param gremlin    the gremlin script to execute
      * @param parameters a map of parameters that will be bound to the script on execution
      */
     public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, Object> parameters) {
@@ -279,15 +286,15 @@
      * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
      * write of the request completes.
      *
-     * @param gremlin the gremlin script to execute
-     * @param parameters a map of parameters that will be bound to the script on execution
+     * @param gremlin                the gremlin script to execute
+     * @param parameters             a map of parameters that will be bound to the script on execution
      * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
      * @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
      */
     @Deprecated
     public CompletableFuture<ResultSet> submitAsync(final String gremlin, final String graphOrTraversalSource,
                                                     final Map<String, Object> parameters) {
-        Map<String,String> aliases = null;
+        Map<String, String> aliases = null;
         if (graphOrTraversalSource != null && !graphOrTraversalSource.isEmpty()) {
             aliases = makeDefaultAliasMap(graphOrTraversalSource);
         }
@@ -299,15 +306,15 @@
      * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
      * write of the request completes.
      *
-     * @param gremlin the gremlin script to execute
+     * @param gremlin    the gremlin script to execute
      * @param parameters a map of parameters that will be bound to the script on execution
-     * @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the
-     *                script where the key is the alias name and the value represents the global variable on the
-     *                server
+     * @param aliases    aliases the specified global Gremlin Server variable some other name that then be used in the
+     *                   script where the key is the alias name and the value represents the global variable on the
+     *                   server
      * @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
      */
     @Deprecated
-    public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String,String> aliases,
+    public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, String> aliases,
                                                     final Map<String, Object> parameters) {
         final RequestOptions.Builder options = RequestOptions.build();
         if (aliases != null && !aliases.isEmpty()) {
@@ -402,8 +409,8 @@
         return cluster;
     }
 
-    protected Map<String,String> makeDefaultAliasMap(final String graphOrTraversalSource) {
-        final Map<String,String> aliases = new HashMap<>();
+    protected Map<String, String> makeDefaultAliasMap(final String graphOrTraversalSource) {
+        final Map<String, String> aliases = new HashMap<>();
         aliases.put("g", graphOrTraversalSource);
         return aliases;
     }
@@ -443,8 +450,8 @@
          * this method to concatenating a Gremlin script from dynamically produced strings and sending it to
          * {@link #submit(String)}.  Parameterized scripts will perform better.
          *
-         * @param gremlin the gremlin script to execute
-         * @param parameters a map of parameters that will be bound to the script on execution
+         * @param gremlin                the gremlin script to execute
+         * @param parameters             a map of parameters that will be bound to the script on execution
          * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
          */
         public ResultSet submit(final String gremlin, final String graphOrTraversalSource, final Map<String, Object> parameters) {
@@ -460,7 +467,7 @@
          */
         @Override
         public Client alias(final String graphOrTraversalSource) {
-            final Map<String,String> aliases = new HashMap<>();
+            final Map<String, String> aliases = new HashMap<>();
             aliases.put("g", graphOrTraversalSource);
             return alias(aliases);
         }
@@ -469,7 +476,7 @@
          * {@inheritDoc}
          */
         @Override
-        public Client alias(final Map<String,String> aliases) {
+        public Client alias(final Map<String, String> aliases) {
             return new AliasClusteredClient(this, aliases, settings);
         }
 
@@ -492,7 +499,7 @@
             // you can get no possible hosts in more than a few situations. perhaps the servers are just all down.
             // or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server).
             if (!possibleHosts.hasNext())
-                throw new TimeoutException("Timed out while waiting for an available host - check the client configuration and connectivity to the server if this message persists");
+                throw new NoHostAvailableException();
 
             final Host bestHost = possibleHosts.next();
             final ConnectionPool pool = hostConnectionPools.get(bestHost);
@@ -504,18 +511,20 @@
          */
         @Override
         protected void initializeImplementation() {
-            cluster.allHosts().forEach(host -> {
-                try {
-                    // hosts that don't initialize connection pools will come up as a dead host
-                    hostConnectionPools.put(host, new ConnectionPool(host, this));
-
-                    // added a new host to the cluster so let the load-balancer know
-                    this.cluster.loadBalancingStrategy().onNew(host);
-                } catch (Exception ex) {
-                    // catch connection errors and prevent them from failing the creation
-                    logger.warn("Could not initialize connection pool for {} - will try later", host);
+            try {
+                CompletableFuture.allOf(cluster.allHosts().stream()
+                        .map(host -> CompletableFuture.runAsync(() -> initializeConnectionSetupForHost.accept(host), cluster.executor()))
+                        .toArray(CompletableFuture[]::new))
+                        .join();
+            } catch (CompletionException ex) {
+                Throwable cause = null;
+                Throwable result = ex;
+                if (null != (cause = ex.getCause())) {
+                    result = cause;
                 }
-            });
+
+                logger.error("", result);
+            }
         }
 
         /**
@@ -526,11 +535,27 @@
             if (closing.get() != null)
                 return closing.get();
 
-            final CompletableFuture[] poolCloseFutures = new CompletableFuture[hostConnectionPools.size()];
-            hostConnectionPools.values().stream().map(ConnectionPool::closeAsync).collect(Collectors.toList()).toArray(poolCloseFutures);
-            closing.set(CompletableFuture.allOf(poolCloseFutures));
+            final CompletableFuture<Void> allPoolsClosedFuture =
+                    CompletableFuture.allOf(hostConnectionPools.values().stream()
+                            .map(ConnectionPool::closeAsync)
+                            .toArray(CompletableFuture[]::new));
+
+            closing.set(allPoolsClosedFuture);
             return closing.get();
         }
+
+        private Consumer<Host> initializeConnectionSetupForHost = host -> {
+            try {
+                // hosts that don't initialize connection pools will come up as a dead host
+                hostConnectionPools.put(host, new ConnectionPool(host, ClusteredClient.this));
+
+                // added a new host to the cluster so let the load-balancer know
+                ClusteredClient.this.cluster.loadBalancingStrategy().onNew(host);
+            } catch (RuntimeException ex) {
+                final String errMsg = "Could not initialize client for " + host;
+                throw new RuntimeException(errMsg, ex);
+            }
+        };
     }
 
     /**
@@ -539,10 +564,10 @@
      */
     public static class AliasClusteredClient extends Client {
         private final Client client;
-        private final Map<String,String> aliases = new HashMap<>();
+        private final Map<String, String> aliases = new HashMap<>();
         final CompletableFuture<Void> close = new CompletableFuture<>();
 
-        AliasClusteredClient(final Client client, final Map<String,String> aliases, final Client.Settings settings) {
+        AliasClusteredClient(final Client client, final Map<String, String> aliases, final Client.Settings settings) {
             super(client.cluster, settings);
             this.client = client;
             this.aliases.putAll(aliases);
@@ -559,8 +584,8 @@
                 // need to call buildMessage() right away to get client specific configurations, that way request specific
                 // ones can override as needed
                 final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_BYTECODE)
-                                                                                  .processor("traversal")
-                                                                                  .addArg(Tokens.ARGS_GREMLIN, bytecode));
+                        .processor("traversal")
+                        .addArg(Tokens.ARGS_GREMLIN, bytecode));
 
                 // apply settings if they were made available
                 options.getBatchSize().ifPresent(batchSize -> request.add(Tokens.ARGS_BATCH_SIZE, batchSize));
@@ -582,7 +607,7 @@
             // overrides which should be mucked with
             if (!aliases.isEmpty()) {
                 final Map original = (Map) msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap());
-                aliases.forEach((k,v) -> {
+                aliases.forEach((k, v) -> {
                     if (!original.containsKey(k))
                         builder.addArg(Tokens.ARGS_ALIASES, aliases);
                 });
@@ -708,7 +733,12 @@
             if (hosts.isEmpty()) throw new IllegalStateException("No available host in the cluster");
             Collections.shuffle(hosts);
             final Host host = hosts.get(0);
-            connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
+
+            try {
+                connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
+            } catch (RuntimeException ex) {
+                logger.error("Could not initialize client for {}", host, ex);
+            }
         }
 
         @Override
@@ -758,7 +788,8 @@
         public static class Builder {
             private Optional<SessionSettings> session = Optional.empty();
 
-            private Builder() {}
+            private Builder() {
+            }
 
             /**
              * Enables a session. By default this will create a random session name and configure transactions to be
@@ -842,7 +873,8 @@
             private String sessionId = UUID.randomUUID().toString();
             private boolean forceClosed = false;
 
-            private Builder() {}
+            private Builder() {
+            }
 
             /**
              * If enabled, transactions will be "managed" such that each request will represent a complete transaction.
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 9636df9..26699bc 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -101,7 +101,7 @@
         this.maxInProcess = maxInProcess;
         this.keepAliveInterval = pool.settings().keepAliveInterval;
 
-        connectionLabel = String.format("Connection{host=%s}", pool.host);
+        connectionLabel = "Connection{host=" + pool.host + "}";
 
         if (cluster.isClosing())
             throw new IllegalStateException("Cannot open a connection with the cluster after close() is called");
@@ -128,9 +128,8 @@
             channel.closeFuture().addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
-                    if(logger.isDebugEnabled()) {
-                        logger.debug("OnChannelClose callback called for channel {}", channel.id().asShortText());
-                    }
+                    logger.debug("OnChannelClose callback called for channel {}", channel);
+
                     // Replace the channel if it was not intentionally closed using CloseAsync method.
                     if (thisConnection.closeFuture.get() == null) {
                         // delegate the task to worker thread and free up the event loop
@@ -143,11 +142,11 @@
 
             // Default WebSocketChannelizer uses Netty's IdleStateHandler
             if (!(channelizer instanceof Channelizer.WebSocketChannelizer)) {
+                logger.debug("Using custom keep alive handler.");
                 scheduleKeepAlive();
             }
-        } catch (Exception ie) {
-            logger.debug("Error opening connection on {}", uri);
-            throw new ConnectionException(uri, "Could not open connection", ie);
+        } catch (Exception ex) {
+            throw new ConnectionException(uri, "Could not open " + this.toString(), ex);
         }
     }
 
@@ -273,6 +272,7 @@
 
         // Default WebSocketChannelizer uses Netty's IdleStateHandler
         if (!(channelizer instanceof Channelizer.WebSocketChannelizer)) {
+            logger.debug("Using custom keep alive handler.");
             scheduleKeepAlive();
         }
 
@@ -409,10 +409,10 @@
     /**
      * Returns the short ID for the underlying channel for this connection.
      * <p>
-     * Currently only used for testing.
+     * Visible for testing.
      */
     String getChannelId() {
-        return (channel != null) ? channel.id().asShortText() : "";
+        return (channel != null) ? channel.id().asShortText() : "null";
     }
 
     @Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index f7b35e9..295c9d0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -18,25 +18,23 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.util.TimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
@@ -85,7 +83,7 @@
         this.host = host;
         this.client = client;
         this.cluster = client.cluster;
-        poolLabel = String.format("Connection Pool {host=%s}", host);
+        poolLabel = "Connection Pool {host=" + host + "}";
 
         final Settings.ConnectionPoolSettings settings = settings();
         this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
@@ -97,15 +95,38 @@
         this.connections = new CopyOnWriteArrayList<>();
 
         try {
+            final List<CompletableFuture<Void>> connCreationFutures = new ArrayList<>();
             for (int i = 0; i < minPoolSize; i++) {
-                this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+                connCreationFutures.add(CompletableFuture.runAsync(() -> {
+                    try {
+                        this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+                    } catch (ConnectionException e) {
+                        throw new CompletionException(e);
+                    }
+                }, cluster.executor()));
             }
 
-        } catch (ConnectionException ce) {
-            // ok if we don't get it initialized here - when a request is attempted in a connection from the
-            // pool it will try to create new connections as needed.
-            logger.info("Could not initialize connections in pool for {} - pool size at {}", host, this.connections.size(), ce);
-            considerHostUnavailable();
+            CompletableFuture.allOf(connCreationFutures.toArray(new CompletableFuture[0])).join();
+        } catch (CancellationException ce) {
+            logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce);
+            throw ce;
+        } catch (CompletionException ce) {
+            // Some connections might have been initialized. Close the connection pool gracefully to close them.
+            this.closeAsync();
+
+            final String errMsg = "Could not initialize " + minPoolSize + " (minPoolSize) connections in pool." +
+                    " Successful connections=" + this.connections.size() +
+                    ". Closing the connection pool.";
+
+
+            Throwable cause = null;
+            Throwable result = ce;
+
+            if (null != (cause = result.getCause())) {
+                result = cause;
+            }
+
+            throw new CompletionException(errMsg, result);
         }
 
         this.open = new AtomicInteger(connections.size());
@@ -316,7 +337,7 @@
         try {
             connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
         } catch (ConnectionException ce) {
-            logger.debug("Connections were under max, but there was an error creating the connection.", ce);
+            logger.error("Connections were under max, but there was an error creating the connection.", ce);
             open.decrementAndGet();
             considerHostUnavailable();
             return false;
@@ -350,7 +371,7 @@
 
         // only close the connection for good once it is done being borrowed or when it is dead
         if (connection.isDead() || connection.borrowed.get() == 0) {
-            if(bin.remove(connection)) {
+            if (bin.remove(connection)) {
                 connection.closeAsync();
                 // TODO: Log the following message on completion of the future returned by closeAsync.
                 logger.debug("{} destroyed", connection.getConnectionInfo());
@@ -396,7 +417,7 @@
             logger.debug("Continue to wait for connection on {} if {} > 0", host, remaining);
         } while (remaining > 0);
 
-        logger.debug("Timed-out waiting for connection on {} - possibly unavailable", host);
+        logger.error("Timed-out ({} {}) waiting for connection on {} - possibly unavailable", timeout, unit, host);
 
         // if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
         // either way supply a function to reconnect
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
index 67101b3..551c9d0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
@@ -23,9 +23,11 @@
 import java.util.Optional;
 
 /**
+ * This exception signifies network connection failure.
+ *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public class ConnectionException extends Exception {
+public class ConnectionException extends RuntimeException {
     private URI uri;
     private InetSocketAddress address;
 
@@ -35,6 +37,12 @@
         this.uri = uri;
     }
 
+    public ConnectionException(final URI uri, final Throwable cause) {
+        super(cause);
+        this.uri = uri;
+        this.address = null;
+    }
+
     public ConnectionException(final URI uri, final String message, final Throwable cause) {
         super(message, cause);
         this.uri = uri;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
new file mode 100644
index 0000000..e8b3b87
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.exception;
+
+public class NoHostAvailableException extends RuntimeException {
+
+    public NoHostAvailableException() {
+        super("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason.");
+    }
+
+    @Override
+    public synchronized Throwable fillInStackTrace() {
+        return this;
+    }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 7e55b92..3d2df78 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -70,8 +70,10 @@
         if (!handshakeFuture.isDone()) {
             // channel was closed before the handshake could be completed.
             handshakeFuture.setFailure(
-                    new RuntimeException(String.format("Channel=[%s] closed before the handshake could complete",
-                            ctx.channel().toString())));
+                    new RuntimeException(String.format("WebSocket channel=[%s] closed before the handshake could complete." +
+                                    " Server logs could contain the reason for abrupt connection disconnect or the " +
+                                    "server might not be reachable from the client anymore.",
+                            ctx.channel().id().asShortText())));
         }
 
         super.channelInactive(ctx);
@@ -82,7 +84,7 @@
         if (event instanceof IdleStateEvent) {
             IdleStateEvent e = (IdleStateEvent) event;
             if (e.state() == IdleState.READER_IDLE) {
-                logger.warn("WebSocket connection " + ctx.channel() + " has been idle for too long.");
+                logger.warn("WebSocket connection {} has been idle for too long.", ctx.channel());
             } else if (e.state() == IdleState.WRITER_IDLE) {
                 logger.debug("Sending ping frame to the server");
                 ctx.writeAndFlush(new PingWebSocketFrame());
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
new file mode 100644
index 0000000..20f38ba
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Client.ClusteredClient.class, Client.SessionedClient.class, Host.class, Cluster.class})
+public class ClientTest {
+    @Mock
+    private Cluster cluster;
+
+    @Mock
+    private Host mockAvailableHost;
+
+    @Mock
+    private Client.Settings settings;
+
+    private ScheduledExecutorService executor;
+
+    @Before
+    public void setup() {
+        executor = Executors.newScheduledThreadPool(1);
+        when(mockAvailableHost.isAvailable()).thenReturn(true);
+        when(cluster.allHosts()).thenReturn(Collections.singletonList(mockAvailableHost));
+        when(cluster.executor()).thenReturn(executor);
+    }
+
+    @After
+    public void cleanup() {
+        executor.shutdown();
+    }
+
+    @Test(expected = NoHostAvailableException.class)
+    public void shouldThrowErrorWhenConnPoolInitFailsForClusteredClient() throws Exception {
+        Client.ClusteredClient client = new Client.ClusteredClient(cluster, settings);
+        whenNew(ConnectionPool.class).withAnyArguments().thenThrow(new RuntimeException("cannot initialize client"));
+        client.init();
+    }
+
+    @Test(expected = NoHostAvailableException.class)
+    public void shouldThrowErrorWhenConnPoolInitFailsForSessionClient() throws Exception {
+        final Client.SessionSettings sessionSettings = Client.SessionSettings.build().sessionId("my-session-id").create();
+        when(settings.getSession()).thenReturn(Optional.of(sessionSettings));
+        Client.SessionedClient client = new Client.SessionedClient(cluster, settings);
+        whenNew(ConnectionPool.class).withAnyArguments().thenThrow(new RuntimeException("cannot initialize client"));
+        client.init();
+    }
+
+}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
index 3d6bef3..1008653 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/types/sample/SamplePersonSerializerTest.java
@@ -108,6 +108,7 @@
         final ResponseMessage deserialized = serializer.deserializeResponse(serialized);
 
         final SamplePerson actual = (SamplePerson) deserialized.getResult().getData();
+
         assertThat(actual, new ReflectionEquals(person));
     }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
index 5a2e1f8..4259d15 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
@@ -102,10 +102,10 @@
             // periodically ping the server, but coming from this direction allows the server to kill channels that
             // have dead clients on the other end
             if (e.state() == IdleState.READER_IDLE) {
-                logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel());
+                logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel().id().asShortText());
                 ctx.close();
             } else if (e.state() == IdleState.WRITER_IDLE && settings.keepAliveInterval > 0) {
-                logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel());
+                logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel().id().asShortText());
                 ctx.writeAndFlush(channelizer.createIdleDetectionMessage());
             }
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index db88ef9..623107f 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -28,6 +28,7 @@
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
@@ -64,6 +65,7 @@
 
 import java.awt.Color;
 import java.io.File;
+import java.net.ConnectException;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -411,7 +413,7 @@
                 fail("Should not have gone through because the server is not running");
             } catch (Exception i) {
                 final Throwable root = ExceptionUtils.getRootCause(i);
-                assertThat(root, instanceOf(TimeoutException.class));
+                assertThat(root, instanceOf(NoHostAvailableException.class));
             }
 
             startServer();
@@ -447,7 +449,7 @@
             fail("Should not have gone through because the server is not running");
         } catch (Exception i) {
             final Throwable root = ExceptionUtils.getRootCause(i);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         }
 
         startServer();
@@ -476,6 +478,7 @@
 
         try {
             final Client client = cluster.connect();
+            client.init();
 
             // the first host is dead on init.  request should succeed on localhost
             assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
@@ -861,12 +864,29 @@
         final Cluster cluster = TestClientFactory.open();
         try {
             assertEquals(0, cluster.availableHosts().size());
-            cluster.connect().init();
+            final Client client1 = cluster.connect().init();
             assertEquals(1, cluster.availableHosts().size());
 
             stopServer();
 
+            // We create a new client here which will fail to initialize but the original client still has
+            // host marked as connected. Since the second client failed during initialization, it has no way to
+            // test if a host is indeed unreachable because it doesn't have any established connections. It will not add
+            // the host to load balancer but it will also not remove it if it already exists there. Leave that
+            // responsibility to a client that added it. In this case, let the second client perform it's own mechanism
+            // to mark host as unavailable. The first client will discover that the host has failed either with next
+            // keepAlive message or the next request, whichever is earlier. In this case, we will simulate the second
+            // scenario by sending a new request on first client. The request would fail (since server is down) and
+            // client should mark the host unavailable.
             cluster.connect().init();
+
+            try {
+                client1.submit("1+1").all().join();
+                fail("Expecting an exception because the server is shut down.");
+            } catch (Exception ex) {
+                // ignore the exception
+            }
+
             assertEquals(0, cluster.availableHosts().size());
         } finally {
             cluster.close();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index 807e9a7..f1740c4 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -21,6 +21,7 @@
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.ietf.jgss.GSSException;
@@ -88,8 +89,7 @@
             fail("This should not succeed as the client did not enable SSL");
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertEquals(TimeoutException.class, root.getClass());
-            assertThat(root.getMessage(), startsWith("Timed out while waiting for an available host"));
+            assertEquals(NoHostAvailableException.class, root.getClass());
         } finally {
             cluster.close();
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
index ab75b8e..8e1e8fb 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
@@ -27,6 +27,7 @@
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -203,7 +204,7 @@
             fail("Should throw exception because ssl is enabled on the server but not on client");
         } catch(Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -233,7 +234,7 @@
             fail("Should throw exception because ssl client auth is enabled on the server but client does not have a cert");
         } catch(Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -251,7 +252,7 @@
             fail("Should throw exception because ssl client auth is enabled on the server but does not trust client's cert");
         } catch(Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -294,7 +295,7 @@
             fail("Should throw exception because ssl client auth is enabled on the server but client does not have a cert");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -311,7 +312,7 @@
             fail("Should throw exception because ssl client auth is enabled on the server but does not trust client's cert");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -328,7 +329,7 @@
             fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -345,7 +346,7 @@
             fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -391,7 +392,7 @@
             fail("Should throw exception because incorrect keyStoreType is specified");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
@@ -410,7 +411,7 @@
             fail("Should throw exception because incorrect trustStoreType is specified");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(NoHostAvailableException.class));
         } finally {
             cluster.close();
         }
diff --git a/pom.xml b/pom.xml
index 3a5451b..8fd981c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,6 +160,7 @@
         <slf4j.version>1.7.25</slf4j.version>
         <snakeyaml.version>1.27</snakeyaml.version>
         <spark.version>2.4.0</spark.version>
+        <powermock.version>1.6.4</powermock.version>
 
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
@@ -721,6 +722,18 @@
                 </exclusions>
             </dependency>
             <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock-module-junit4</artifactId>
+                <version>${powermock.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock-api-mockito</artifactId>
+                <version>${powermock.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
                 <groupId>org.hamcrest</groupId>
                 <artifactId>hamcrest-all</artifactId>
                 <version>1.3</version>