more timeout handling in JsonParserIterator (#10426)

diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index c12e64a..5fac43b 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -110,7 +110,14 @@
       return retVal;
     }
     catch (IOException e) {
-      throw new RuntimeException(e);
+      // check for timeout, a failure here might be related to a timeout, so lets just attribute it
+      if (checkTimeout()) {
+        TimeoutException timeoutException = timeoutQuery();
+        timeoutException.addSuppressed(e);
+        throw interruptQuery(timeoutException);
+      } else {
+        throw interruptQuery(e);
+      }
     }
   }
 
@@ -120,45 +127,6 @@
     throw new UnsupportedOperationException();
   }
 
-  private void init()
-  {
-    if (jp == null) {
-      try {
-        long timeLeftMillis = timeoutAt - System.currentTimeMillis();
-        if (hasTimeout && timeLeftMillis < 1) {
-          throw new TimeoutException(StringUtils.format("url[%s] timed out", url));
-        }
-        InputStream is = hasTimeout
-                         ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS)
-                         : future.get();
-        if (is != null) {
-          jp = objectMapper.getFactory().createParser(is);
-        } else {
-          interruptQuery(
-              new ResourceLimitExceededException(
-                  "url[%s] timed out or max bytes limit reached.",
-                  url
-              )
-          );
-        }
-        final JsonToken nextToken = jp.nextToken();
-        if (nextToken == JsonToken.START_ARRAY) {
-          jp.nextToken();
-          objectCodec = jp.getCodec();
-        } else if (nextToken == JsonToken.START_OBJECT) {
-          interruptQuery(jp.getCodec().readValue(jp, QueryInterruptedException.class));
-        } else {
-          interruptQuery(
-              new IAE("Next token wasn't a START_ARRAY, was[%s] from url[%s]", jp.getCurrentToken(), url)
-          );
-        }
-      }
-      catch (IOException | InterruptedException | ExecutionException | CancellationException | TimeoutException e) {
-        interruptQuery(e);
-      }
-    }
-  }
-
   @Override
   public void close() throws IOException
   {
@@ -167,10 +135,66 @@
     }
   }
 
-  private void interruptQuery(Exception cause)
+  private boolean checkTimeout()
+  {
+    long timeLeftMillis = timeoutAt - System.currentTimeMillis();
+    return checkTimeout(timeLeftMillis);
+  }
+
+  private boolean checkTimeout(long timeLeftMillis)
+  {
+    if (hasTimeout && timeLeftMillis < 1) {
+      return true;
+    }
+    return false;
+  }
+
+  private void init()
+  {
+    if (jp == null) {
+      try {
+        long timeLeftMillis = timeoutAt - System.currentTimeMillis();
+        if (checkTimeout(timeLeftMillis)) {
+          throw interruptQuery(timeoutQuery());
+        }
+        InputStream is = hasTimeout ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS) : future.get();
+
+        if (is != null) {
+          jp = objectMapper.getFactory().createParser(is);
+        } else if (checkTimeout()) {
+          throw interruptQuery(timeoutQuery());
+        } else {
+          // if we haven't timed out completing the future, then this is the likely cause
+          throw interruptQuery(new ResourceLimitExceededException("url[%s] max bytes limit reached.", url));
+        }
+
+        final JsonToken nextToken = jp.nextToken();
+        if (nextToken == JsonToken.START_ARRAY) {
+          jp.nextToken();
+          objectCodec = jp.getCodec();
+        } else if (nextToken == JsonToken.START_OBJECT) {
+          throw interruptQuery(jp.getCodec().readValue(jp, QueryInterruptedException.class));
+        } else {
+          throw interruptQuery(
+              new IAE("Next token wasn't a START_ARRAY, was[%s] from url[%s]", jp.getCurrentToken(), url)
+          );
+        }
+      }
+      catch (IOException | InterruptedException | ExecutionException | CancellationException | TimeoutException e) {
+        throw interruptQuery(e);
+      }
+    }
+  }
+
+  private TimeoutException timeoutQuery()
+  {
+    return new TimeoutException(StringUtils.format("url[%s] timed out", url));
+  }
+
+  private QueryInterruptedException interruptQuery(Exception cause)
   {
     LOG.warn(cause, "Query [%s] to host [%s] interrupted", queryId, host);
-    throw new QueryInterruptedException(cause, host);
+    return new QueryInterruptedException(cause, host);
   }
 }