more timeout handling in JsonParserIterator (#10426)
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index c12e64a..5fac43b 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -110,7 +110,14 @@
return retVal;
}
catch (IOException e) {
- throw new RuntimeException(e);
+ // check for timeout, a failure here might be related to a timeout, so lets just attribute it
+ if (checkTimeout()) {
+ TimeoutException timeoutException = timeoutQuery();
+ timeoutException.addSuppressed(e);
+ throw interruptQuery(timeoutException);
+ } else {
+ throw interruptQuery(e);
+ }
}
}
@@ -120,45 +127,6 @@
throw new UnsupportedOperationException();
}
- private void init()
- {
- if (jp == null) {
- try {
- long timeLeftMillis = timeoutAt - System.currentTimeMillis();
- if (hasTimeout && timeLeftMillis < 1) {
- throw new TimeoutException(StringUtils.format("url[%s] timed out", url));
- }
- InputStream is = hasTimeout
- ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS)
- : future.get();
- if (is != null) {
- jp = objectMapper.getFactory().createParser(is);
- } else {
- interruptQuery(
- new ResourceLimitExceededException(
- "url[%s] timed out or max bytes limit reached.",
- url
- )
- );
- }
- final JsonToken nextToken = jp.nextToken();
- if (nextToken == JsonToken.START_ARRAY) {
- jp.nextToken();
- objectCodec = jp.getCodec();
- } else if (nextToken == JsonToken.START_OBJECT) {
- interruptQuery(jp.getCodec().readValue(jp, QueryInterruptedException.class));
- } else {
- interruptQuery(
- new IAE("Next token wasn't a START_ARRAY, was[%s] from url[%s]", jp.getCurrentToken(), url)
- );
- }
- }
- catch (IOException | InterruptedException | ExecutionException | CancellationException | TimeoutException e) {
- interruptQuery(e);
- }
- }
- }
-
@Override
public void close() throws IOException
{
@@ -167,10 +135,66 @@
}
}
- private void interruptQuery(Exception cause)
+ private boolean checkTimeout()
+ {
+ long timeLeftMillis = timeoutAt - System.currentTimeMillis();
+ return checkTimeout(timeLeftMillis);
+ }
+
+ private boolean checkTimeout(long timeLeftMillis)
+ {
+ if (hasTimeout && timeLeftMillis < 1) {
+ return true;
+ }
+ return false;
+ }
+
+ private void init()
+ {
+ if (jp == null) {
+ try {
+ long timeLeftMillis = timeoutAt - System.currentTimeMillis();
+ if (checkTimeout(timeLeftMillis)) {
+ throw interruptQuery(timeoutQuery());
+ }
+ InputStream is = hasTimeout ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS) : future.get();
+
+ if (is != null) {
+ jp = objectMapper.getFactory().createParser(is);
+ } else if (checkTimeout()) {
+ throw interruptQuery(timeoutQuery());
+ } else {
+ // if we haven't timed out completing the future, then this is the likely cause
+ throw interruptQuery(new ResourceLimitExceededException("url[%s] max bytes limit reached.", url));
+ }
+
+ final JsonToken nextToken = jp.nextToken();
+ if (nextToken == JsonToken.START_ARRAY) {
+ jp.nextToken();
+ objectCodec = jp.getCodec();
+ } else if (nextToken == JsonToken.START_OBJECT) {
+ throw interruptQuery(jp.getCodec().readValue(jp, QueryInterruptedException.class));
+ } else {
+ throw interruptQuery(
+ new IAE("Next token wasn't a START_ARRAY, was[%s] from url[%s]", jp.getCurrentToken(), url)
+ );
+ }
+ }
+ catch (IOException | InterruptedException | ExecutionException | CancellationException | TimeoutException e) {
+ throw interruptQuery(e);
+ }
+ }
+ }
+
+ private TimeoutException timeoutQuery()
+ {
+ return new TimeoutException(StringUtils.format("url[%s] timed out", url));
+ }
+
+ private QueryInterruptedException interruptQuery(Exception cause)
{
LOG.warn(cause, "Query [%s] to host [%s] interrupted", queryId, host);
- throw new QueryInterruptedException(cause, host);
+ return new QueryInterruptedException(cause, host);
}
}