CASSANDRASC-65 Enrich RetriesExhaustedException to have more information for better visibility

patch by Yifan Cai; reviewed by Dinesh Joshi, Francisco Guerrero for CASSANDRASC-65
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 7e91c83..6611efb 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -62,6 +62,7 @@
   java8:
     docker:
      - image: circleci/openjdk:8-jdk-stretch
+    resource_class: large
     steps:
      - checkout
 
@@ -80,6 +81,7 @@
   java11:
     docker:
       - image: circleci/openjdk:11-jdk-stretch
+    resource_class: large
     steps:
       - checkout
 
diff --git a/CHANGES.txt b/CHANGES.txt
index 9cdb549..7974993 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Enrich RetriesExhaustedException to have more information for better visibility (CASSANDRASC-65)
  * Fix failing unit tests in Apache CI (CASSANDRASC-66)
  * Support credential rotation in JmxClient (CASSANDRASC-63)
  * File descriptor is not being closed on MD5 checksum (CASSANDRASC-64)
diff --git a/build.gradle b/build.gradle
index ef7b7bc..ef324ca 100644
--- a/build.gradle
+++ b/build.gradle
@@ -322,7 +322,7 @@
     testClassesDirs = sourceSets.integrationTest.output.classesDirs
     classpath = sourceSets.integrationTest.runtimeClasspath
     shouldRunAfter test
-    forkEvery = 1 // DTest framework tends to have issues without forkEvery method
+    forkEvery = 1 // DTest framework tends to have issues without forkEvery test class
     maxHeapSize = "8g"
 }
 
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/HttpResponse.java b/client/src/main/java/org/apache/cassandra/sidecar/client/HttpResponse.java
index e5fd774..3806857 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/HttpResponse.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/HttpResponse.java
@@ -62,4 +62,9 @@
      * @return the headers for the response
      */
     Map<String, List<String>> headers();
+
+    /**
+     * @return the sidecar server instance that returns the response
+     */
+    SidecarInstance sidecarInstance();
 }
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/HttpResponseImpl.java b/client/src/main/java/org/apache/cassandra/sidecar/client/HttpResponseImpl.java
index 432a9dc..9d273f4 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/HttpResponseImpl.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/HttpResponseImpl.java
@@ -32,6 +32,7 @@
     private final String statusMessage;
     private final byte[] raw;
     private final Map<String, List<String>> headers;
+    private final SidecarInstance sidecarInstance;
 
     /**
      * Constructs a response object with the provided values
@@ -39,10 +40,14 @@
      * @param statusCode    the status code of the response
      * @param statusMessage the status message of the response
      * @param headers       the headers from the response
+     * @param server        the server that returns the response
      */
-    public HttpResponseImpl(int statusCode, String statusMessage, Map<String, List<String>> headers)
+    public HttpResponseImpl(int statusCode,
+                            String statusMessage,
+                            Map<String, List<String>> headers,
+                            SidecarInstance server)
     {
-        this(statusCode, statusMessage, null, headers);
+        this(statusCode, statusMessage, null, headers, server);
     }
 
     /**
@@ -52,13 +57,19 @@
      * @param statusMessage the status message of the response
      * @param raw           the raw bytes received from the response
      * @param headers       the headers from the response
+     * @param server        the server that returns the response
      */
-    public HttpResponseImpl(int statusCode, String statusMessage, byte[] raw, Map<String, List<String>> headers)
+    public HttpResponseImpl(int statusCode,
+                            String statusMessage,
+                            byte[] raw,
+                            Map<String, List<String>> headers,
+                            SidecarInstance server)
     {
         this.statusCode = statusCode;
         this.statusMessage = statusMessage;
         this.raw = raw;
         this.headers = Collections.unmodifiableMap(headers);
+        this.sidecarInstance = server;
     }
 
     /**
@@ -121,6 +132,15 @@
      * {@inheritDoc}
      */
     @Override
+    public SidecarInstance sidecarInstance()
+    {
+        return sidecarInstance;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public String toString()
     {
         return "HttpResponseImpl{" +
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
index 64b5700..0dd9e52 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
@@ -32,7 +32,6 @@
 import org.slf4j.LoggerFactory;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException;
 import org.apache.cassandra.sidecar.client.request.DecodableRequest;
 import org.apache.cassandra.sidecar.client.request.Request;
 
@@ -101,10 +100,20 @@
      */
     public <T> CompletableFuture<T> executeRequestAsync(RequestContext context)
     {
-        CompletableFuture<T> resultFuture = new CompletableFuture<>();
-        CompletableFuture<HttpResponse> responseFuture = new CompletableFuture<>();
         Iterator<SidecarInstance> iterator = context.instanceSelectionPolicy().iterator();
-        executeWithRetries(responseFuture, iterator, context, 1, null);
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        if (!iterator.hasNext())
+        {
+            resultFuture.completeExceptionally(new IllegalStateException("InstanceSelectionPolicy " +
+                                                                         context.instanceSelectionPolicy()
+                                                                                .getClass()
+                                                                                .getSimpleName() +
+                                                                         " selects 0 instances"));
+            return resultFuture;
+        }
+        SidecarInstance instance = iterator.next();
+        CompletableFuture<HttpResponse> responseFuture = new CompletableFuture<>();
+        executeWithRetries(responseFuture, iterator, instance, context, 1);
 
         responseFuture.whenComplete((response, retryThrowable) ->
                                     processResponse(resultFuture, context.request(), response, retryThrowable));
@@ -122,9 +131,18 @@
     {
         Objects.requireNonNull(streamConsumer, "streamConsumer must be non-null");
         Iterator<SidecarInstance> iterator = context.instanceSelectionPolicy().iterator();
-
+        if (!iterator.hasNext())
+        {
+            streamConsumer.onError(new IllegalStateException("InstanceSelectionPolicy " +
+                                                             context.instanceSelectionPolicy()
+                                                                    .getClass()
+                                                                    .getSimpleName() +
+                                                             " selects 0 instances"));
+            return;
+        }
+        SidecarInstance instance = iterator.next();
         CompletableFuture<HttpResponse> responseFuture = new CompletableFuture<>();
-        streamWithRetries(responseFuture, streamConsumer, iterator, context, 1, null);
+        streamWithRetries(responseFuture, streamConsumer, iterator, instance, context, 1);
 
         responseFuture.whenComplete(((response, throwable) -> {
             if (throwable != null)
@@ -143,62 +161,6 @@
     }
 
     /**
-     * Executes the request from the {@code context}, it iterates over the {@link SidecarInstance}s until the response
-     * satisfies the {@code retryPolicy}.
-     *
-     * @param future    a future for the {@link HttpResponse}
-     * @param iterator  the iterator of instances
-     * @param context   the request context
-     * @param attempt   the number of attempts for this request
-     * @param throwable the last {@link Throwable}, or {@code null} if there are no previous errors
-     */
-    protected void executeWithRetries(CompletableFuture<HttpResponse> future,
-                                      Iterator<SidecarInstance> iterator,
-                                      RequestContext context,
-                                      int attempt,
-                                      Throwable throwable)
-    {
-        if (iterator.hasNext())
-        {
-            executeWithRetries(future, iterator, iterator.next(), context, attempt);
-        }
-        else
-        {
-            // exhausted retries on all available hosts
-            future.completeExceptionally(new RetriesExhaustedException(attempt, context.request(), throwable));
-        }
-    }
-
-    /**
-     * Streams the request from the {@code context} to the {@code streamConsumer}. It iterates over the
-     * {@link SidecarInstance}s until the response satisfies the {@code retryPolicy}.
-     *
-     * @param future         a future for the {@link HttpResponse}
-     * @param streamConsumer the object that consumes the stream
-     * @param iterator       the iterator of Sidecar instances
-     * @param context        the request context
-     * @param attempt        the number of attempts for this request
-     * @param throwable      the last {@link Throwable}, or {@code null} if there are no previous errors
-     */
-    private void streamWithRetries(CompletableFuture<HttpResponse> future,
-                                   StreamConsumer streamConsumer,
-                                   Iterator<SidecarInstance> iterator,
-                                   RequestContext context,
-                                   int attempt,
-                                   Throwable throwable)
-    {
-        if (iterator.hasNext())
-        {
-            streamWithRetries(future, streamConsumer, iterator, iterator.next(), context, attempt);
-        }
-        else
-        {
-            // exhausted retries on all available hosts
-            streamConsumer.onError(new RetriesExhaustedException(attempt, context.request(), throwable));
-        }
-    }
-
-    /**
      * Executes the {@code request} from the {@code context} on the provided {@code sidecarInstance}, and applies the
      * retry policy after complete.
      *
@@ -305,26 +267,15 @@
         context.retryPolicy()
                .onResponse(future, request, response, throwable, attempt, retryOnNewHost, (nextAttempt, delay) -> {
             String statusCode = response != null ? String.valueOf(response.statusCode()) : "<Not Available>";
-            if (iterator.hasNext())
+            SidecarInstance nextInstance = iterator.hasNext() ? iterator.next() : sidecarInstance;
+            if (response == null || response.statusCode() != HttpResponseStatus.ACCEPTED.code())
             {
-                if (response == null || response.statusCode() != HttpResponseStatus.ACCEPTED.code())
-                {
-                    logger.warn("Retrying request on next instance after {}ms. Failed on instance={}, " +
-                                "attempt={}, statusCode={}, request={}", delay, sidecarInstance, attempt, statusCode,
-                                request, throwable);
-                }
-                schedule(delay, () -> executeWithRetries(future, iterator, context, nextAttempt, throwable));
+                logger.warn("Retrying request on {} instance after {}ms. " +
+                            "Failed on instance={}, attempt={}, statusCode={}",
+                            nextInstance == sidecarInstance ? "same" : "next", delay,
+                            sidecarInstance, attempt, statusCode, throwable);
             }
-            else
-            {
-                if (response == null || response.statusCode() != HttpResponseStatus.ACCEPTED.code())
-                {
-                    logger.warn("Retrying request on same instance after {}ms. Failed on instance={}, " +
-                                "attempt={}, statusCode={}, request={}", delay, sidecarInstance, attempt, statusCode,
-                                request, throwable);
-                }
-                schedule(delay, () -> executeWithRetries(future, iterator, sidecarInstance, context, nextAttempt));
-            }
+            schedule(delay, () -> executeWithRetries(future, iterator, nextInstance, context, nextAttempt));
         });
     }
 
@@ -355,27 +306,15 @@
         context.retryPolicy()
                .onResponse(future, request, response, throwable, attempt, retryOnNewHost, (nextAttempt, delay) -> {
             String statusCode = response != null ? String.valueOf(response.statusCode()) : "<Not Available>";
-            if (iterator.hasNext())
+            SidecarInstance nextInstance = iterator.hasNext() ? iterator.next() : sidecarInstance;
+            if (response == null || response.statusCode() != HttpResponseStatus.ACCEPTED.code())
             {
-                if (response == null || response.statusCode() != HttpResponseStatus.ACCEPTED.code())
-                {
-                    logger.warn("Retrying stream on next instance after {}ms. Failed on instance={}, " +
-                                "attempt={}, statusCode={}", delay, sidecarInstance, attempt, statusCode, throwable);
-                }
-
-                schedule(delay, () -> streamWithRetries(future, consumer, iterator, context, nextAttempt, throwable));
+                logger.warn("Retrying stream on {} instance after {}ms. " +
+                            "Failed on instance={}, attempt={}, statusCode={}",
+                            nextInstance == sidecarInstance ? "same" : "next", delay,
+                            sidecarInstance, attempt, statusCode, throwable);
             }
-            else
-            {
-                if (response == null || response.statusCode() != HttpResponseStatus.ACCEPTED.code())
-                {
-                    logger.warn("Retrying stream on same instance after {}ms. Failed on instance={}, " +
-                                "attempt={}, statusCode={}", delay, sidecarInstance, attempt, statusCode, throwable);
-                }
-
-                schedule(delay, () ->
-                                streamWithRetries(future, consumer, iterator, sidecarInstance, context, nextAttempt));
-            }
+            schedule(delay, () -> streamWithRetries(future, consumer, iterator, nextInstance, context, nextAttempt));
         });
     }
 
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/exception/RetriesExhaustedException.java b/client/src/main/java/org/apache/cassandra/sidecar/client/exception/RetriesExhaustedException.java
index 890dd00..58ee398 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/exception/RetriesExhaustedException.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/exception/RetriesExhaustedException.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.sidecar.client.exception;
 
+import org.apache.cassandra.sidecar.client.HttpResponse;
 import org.apache.cassandra.sidecar.client.request.Request;
 
 /**
@@ -28,24 +29,52 @@
     /**
      * Constructs an exception with the number of {@code attempts} performed for the request.
      *
-     * @param attempts the number of attempts performed for the request
-     * @param request  the HTTP request
+     * @param attempts      the number of attempts performed for the request
+     * @param request       the HTTP request
+     * @param lastResponse  the last failed HTTP response
      */
-    public RetriesExhaustedException(int attempts, Request request)
+    public static RetriesExhaustedException of(int attempts,
+                                               Request request,
+                                               HttpResponse lastResponse)
     {
-        this(attempts, request, null);
+        return of(attempts, request, lastResponse, null);
     }
 
     /**
      * Constructs an exception with the number of {@code attempts} performed for the request.
      *
-     * @param attempts  the number of attempts performed for the request
-     * @param request   the HTTP request
-     * @param throwable the underlying exception
+     * @param attempts      the number of attempts performed for the request
+     * @param request       the HTTP request
+     * @param lastResponse  the last failed HTTP response
+     * @param throwable     the underlying exception
      */
-    public RetriesExhaustedException(int attempts, Request request, Throwable throwable)
+    public static RetriesExhaustedException of(int attempts,
+                                               Request request,
+                                               HttpResponse lastResponse,
+                                               Throwable throwable)
     {
-        super(String.format("Unable to complete request '%s' after %d attempt%s",
-                            request.requestURI(), attempts, attempts == 1 ? "" : "s"), throwable);
+        return new RetriesExhaustedException(attempts, request, lastResponse, throwable);
+    }
+
+    /**
+     * Constructs an exception with the number of {@code attempts} performed for the request.
+     *
+     * @param attempts      the number of attempts performed for the request
+     * @param request       the HTTP request
+     * @param lastResponse  the last failed HTTP response
+     * @param throwable     the underlying exception
+     */
+    protected RetriesExhaustedException(int attempts,
+                                        Request request,
+                                        HttpResponse lastResponse,
+                                        Throwable throwable)
+    {
+        super(String.format("Unable to complete request '%s' after %d attempt%s; last response '%s' from server '%s'",
+                            request.requestURI(),
+                            attempts,
+                            attempts == 1 ? "" : "s",
+                            lastResponse,
+                            lastResponse != null ? lastResponse.sidecarInstance() : null),
+              throwable);
     }
 }
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/retry/BasicRetryPolicy.java b/client/src/main/java/org/apache/cassandra/sidecar/client/retry/BasicRetryPolicy.java
index c06a629..fc4c067 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/retry/BasicRetryPolicy.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/retry/BasicRetryPolicy.java
@@ -26,6 +26,7 @@
 import io.netty.handler.codec.http.HttpStatusClass;
 import org.apache.cassandra.sidecar.client.HttpResponse;
 import org.apache.cassandra.sidecar.client.exception.ResourceNotFoundException;
+import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException;
 import org.apache.cassandra.sidecar.client.request.Request;
 import org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus;
 
@@ -89,11 +90,11 @@
         {
             if (canRetryOnADifferentHost)
             {
-                retryImmediately(responseFuture, request, retryAction, attempts, throwable);
+                retryImmediately(responseFuture, request, response, retryAction, attempts, throwable);
             }
             else
             {
-                retry(responseFuture, request, retryAction, attempts, throwable);
+                retry(responseFuture, request, response, retryAction, attempts, throwable);
             }
             return;
         }
@@ -109,7 +110,7 @@
         {
             if (canRetryOnADifferentHost)
             {
-                retryImmediately(responseFuture, request, retryAction, attempts);
+                retryImmediately(responseFuture, request, response, retryAction, attempts);
             }
             else
             {
@@ -130,11 +131,11 @@
         {
             if (canRetryOnADifferentHost)
             {
-                retryImmediately(responseFuture, request, retryAction, attempts);
+                retryImmediately(responseFuture, request, response, retryAction, attempts);
             }
             else
             {
-                retry(responseFuture, request, retryAction, attempts,
+                retry(responseFuture, request, response, retryAction, attempts,
                       maybeParseRetryAfterOrDefault(response, attempts), null);
             }
             return;
@@ -152,11 +153,11 @@
             // checksum is encountered
             if (canRetryOnADifferentHost)
             {
-                retryImmediately(responseFuture, request, retryAction, attempts);
+                retryImmediately(responseFuture, request, response, retryAction, attempts);
             }
             else
             {
-                retry(responseFuture, request, retryAction, attempts, null);
+                retry(responseFuture, request, response, retryAction, attempts, null);
             }
             return;
         }
@@ -167,12 +168,12 @@
         {
             if (canRetryOnADifferentHost)
             {
-                retryImmediately(responseFuture, request, retryAction, attempts);
+                retryImmediately(responseFuture, request, response, retryAction, attempts);
             }
             else
             {
                 logger.error("Request exhausted. response={}, attempts={}", response, attempts);
-                responseFuture.completeExceptionally(retriesExhausted(attempts, request));
+                responseFuture.completeExceptionally(RetriesExhaustedException.of(attempts, request, response));
             }
             return;
         }
@@ -203,53 +204,59 @@
     /**
      * Retries the request with no delay
      *
-     * @param future      a future for the {@link HttpResponse}
-     * @param request     the HTTP request
-     * @param retryAction the action that is called on retry
-     * @param attempts    the number of attempts for the request
+     * @param future       a future for the {@link HttpResponse}
+     * @param request      the HTTP request
+     * @param lastResponse the last received HTTP response
+     * @param retryAction  the action that is called on retry
+     * @param attempts     the number of attempts for the request
      */
     protected void retryImmediately(CompletableFuture<HttpResponse> future,
                                     Request request,
+                                    HttpResponse lastResponse,
                                     RetryAction retryAction,
                                     int attempts)
     {
-        retry(future, request, retryAction, attempts, 0L, null);
+        retry(future, request, lastResponse, retryAction, attempts, 0L, null);
     }
 
     /**
      * Retries the request with no delay
      *
-     * @param future      a future for the {@link HttpResponse}
-     * @param request     the HTTP request
-     * @param retryAction the action that is called on retry
-     * @param attempts    the number of attempts for the request
-     * @param throwable   the underlying exception
+     * @param future       a future for the {@link HttpResponse}
+     * @param request      the HTTP request
+     * @param lastResponse the last received HTTP response
+     * @param retryAction  the action that is called on retry
+     * @param attempts     the number of attempts for the request
+     * @param throwable    the underlying exception
      */
     protected void retryImmediately(CompletableFuture<HttpResponse> future,
                                     Request request,
+                                    HttpResponse lastResponse,
                                     RetryAction retryAction,
                                     int attempts,
                                     Throwable throwable)
     {
-        retry(future, request, retryAction, attempts, 0L, throwable);
+        retry(future, request, lastResponse, retryAction, attempts, 0L, throwable);
     }
 
     /**
      * Retries the request after waiting for the configured retryDelayMillis
      *
-     * @param future      a future for the {@link HttpResponse}
-     * @param request     the HTTP request
-     * @param retryAction the action that is called on retry
-     * @param attempts    the number of attempts for the request
-     * @param throwable   the underlying exception
+     * @param future       a future for the {@link HttpResponse}
+     * @param request      the HTTP request
+     * @param lastResponse the last received HTTP response
+     * @param retryAction  the action that is called on retry
+     * @param attempts     the number of attempts for the request
+     * @param throwable    the underlying exception
      */
     protected void retry(CompletableFuture<HttpResponse> future,
                          Request request,
+                         HttpResponse lastResponse,
                          RetryAction retryAction,
                          int attempts,
                          Throwable throwable)
     {
-        retry(future, request, retryAction, attempts, retryDelayMillis(attempts), throwable);
+        retry(future, request, lastResponse, retryAction, attempts, retryDelayMillis(attempts), throwable);
     }
 
     /**
@@ -259,6 +266,7 @@
      *
      * @param future          a future for the {@link HttpResponse}
      * @param request         the HTTP request
+     * @param lastResponse    the last received HTTP response
      * @param retryAction     the action that is called on retry
      * @param attempts        the number of attempts for the request
      * @param sleepTimeMillis the amount of time to wait in milliseconds before attempting the request again
@@ -266,6 +274,7 @@
      */
     protected void retry(CompletableFuture<HttpResponse> future,
                          Request request,
+                         HttpResponse lastResponse,
                          RetryAction retryAction,
                          int attempts,
                          long sleepTimeMillis,
@@ -274,7 +283,7 @@
         int configuredMaxRetries = maxRetries();
         if (configuredMaxRetries > RETRY_INDEFINITELY && attempts >= configuredMaxRetries)
         {
-            future.completeExceptionally(retriesExhausted(attempts, request, throwable));
+            future.completeExceptionally(RetriesExhaustedException.of(attempts, request, lastResponse, throwable));
         }
         else
         {
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/retry/NoRetryPolicy.java b/client/src/main/java/org/apache/cassandra/sidecar/client/retry/NoRetryPolicy.java
index c5cde09..280d586 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/retry/NoRetryPolicy.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/retry/NoRetryPolicy.java
@@ -22,6 +22,7 @@
 
 import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.cassandra.sidecar.client.HttpResponse;
+import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException;
 import org.apache.cassandra.sidecar.client.request.Request;
 
 /**
@@ -44,7 +45,7 @@
     {
         if (throwable != null)
         {
-            responseFuture.completeExceptionally(retriesExhausted(attempts, request, throwable));
+            responseFuture.completeExceptionally(RetriesExhaustedException.of(attempts, request, response));
         }
         else if (response.statusCode() == HttpResponseStatus.OK.code())
         {
@@ -52,7 +53,7 @@
         }
         else
         {
-            responseFuture.completeExceptionally(retriesExhausted(attempts, request));
+            responseFuture.completeExceptionally(RetriesExhaustedException.of(attempts, request, response));
         }
     }
 }
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/retry/RetryPolicy.java b/client/src/main/java/org/apache/cassandra/sidecar/client/retry/RetryPolicy.java
index 8d9ac91..d2826c2 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/retry/RetryPolicy.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/retry/RetryPolicy.java
@@ -24,7 +24,6 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.sidecar.client.HttpResponse;
-import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException;
 import org.apache.cassandra.sidecar.client.exception.UnexpectedStatusCodeException;
 import org.apache.cassandra.sidecar.client.request.Request;
 
@@ -56,31 +55,6 @@
                                     RetryAction retryAction);
 
     /**
-     * Returns a {@link RetriesExhaustedException} with the number of {@code attempts} performed before the retries.
-     *
-     * @param attempts the number of attempts for the request
-     * @param request  the HTTP request
-     * @return a {@link RetriesExhaustedException} with the number of {@code attempts} performed before the retries
-     */
-    RetriesExhaustedException retriesExhausted(int attempts, Request request)
-    {
-        return new RetriesExhaustedException(attempts, request);
-    }
-
-    /**
-     * Returns a {@link RetriesExhaustedException} with the number of {@code attempts} performed before the retries.
-     *
-     * @param attempts  the number of attempts for the request
-     * @param request   the HTTP request
-     * @param throwable the underlying exception
-     * @return a {@link RetriesExhaustedException} with the number of {@code attempts} performed before the retries
-     */
-    RetriesExhaustedException retriesExhausted(int attempts, Request request, Throwable throwable)
-    {
-        return new RetriesExhaustedException(attempts, request, throwable);
-    }
-
-    /**
      * Returns an {@link UnsupportedOperationException} with the provided {@code response}.
      *
      * @param response the response to use
diff --git a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
index 1079827..3fa0f7d 100644
--- a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
+++ b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
@@ -119,7 +119,7 @@
             UploadableRequest uploadableRequest = (UploadableRequest) context.request();
             LOGGER.debug("Uploading file={}, for request={}, instance={}",
                          uploadableRequest.filename(), context.request(), sidecarInstance);
-            return executeUploadFileInternal(vertxRequest, uploadableRequest.filename());
+            return executeUploadFileInternal(sidecarInstance, vertxRequest, uploadableRequest.filename());
         }
         else
         {
@@ -141,13 +141,15 @@
                         promise.complete(new HttpResponseImpl(response.statusCode(),
                                                               response.statusMessage(),
                                                               raw,
-                                                              mapHeaders(response.headers())
+                                                              mapHeaders(response.headers()),
+                                                              sidecarInstance
                         ));
                     });
         return promise.future().toCompletionStage().toCompletableFuture();
     }
 
-    protected CompletableFuture<HttpResponse> executeUploadFileInternal(HttpRequest<Buffer> vertxRequest,
+    protected CompletableFuture<HttpResponse> executeUploadFileInternal(SidecarInstance sidecarInstance,
+                                                                        HttpRequest<Buffer> vertxRequest,
                                                                         String filename)
     {
         Promise<HttpResponse> promise = Promise.promise();
@@ -164,7 +166,8 @@
             promise.complete(new HttpResponseImpl(response.statusCode(),
                                                   response.statusMessage(),
                                                   raw,
-                                                  mapHeaders(response.headers())
+                                                  mapHeaders(response.headers()),
+                                                  sidecarInstance
             ));
         });
 
@@ -192,7 +195,8 @@
                         // fulfill the promise with the response
                         promise.complete(new HttpResponseImpl(response.statusCode(),
                                                               response.statusMessage(),
-                                                              mapHeaders(response.headers())));
+                                                              mapHeaders(response.headers()),
+                                                              sidecarInstance));
 
                         if (response.statusCode() == HttpResponseStatus.OK.code() ||
                             response.statusCode() == HttpResponseStatus.PARTIAL_CONTENT.code())