DRILL-8295: Probable resource leak in the HTTP storage plugin (#2641)

diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 9915948..e2bfee9 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -127,6 +127,7 @@
       buildImplicitColumns();
     }
 
+    // inStream is expected to be closed by the JsonLoader.
     InputStream inStream = http.getInputStream();
     populateImplicitFieldMap(http);
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
index da4f78c..bb5d217 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
@@ -143,7 +143,7 @@
 
   @Override
   public void close() {
-    AutoCloseables.closeSilently(inStream);
     AutoCloseables.closeSilently(xmlReader);
+    AutoCloseables.closeSilently(inStream);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index a95bc9f..84a9262 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -78,7 +78,7 @@
         return;
       }
       String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
-      // Make the API call
+      // Make the API call, we expect that results will be closed by the JsonLoader
       java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
       // If the result string is null or empty, return an empty map
       if (results == null) {
@@ -93,6 +93,8 @@
         rowWriter.start();
         if (jsonLoader.parser().next()) {
           rowWriter.save();
+        } else {
+          jsonLoader.close();
         }
       } catch (Exception e) {
         throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
@@ -173,6 +175,7 @@
       if (args == null) {
         return;
       }
+      // we expect that results will be closed by the JsonLoader
       java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args)
         .getInputStream();
       // If the result string is null or empty, return an empty map
@@ -189,6 +192,8 @@
         rowWriter.start();
         if (jsonLoader.parser().next()) {
           rowWriter.save();
+        } else {
+          jsonLoader.close();
         }
       } catch (Exception e) {
         throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index ee5b285..05380d1 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -35,6 +35,7 @@
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
@@ -321,7 +322,8 @@
   /**
    * Returns an InputStream based on the URL and config in the scanSpec. If anything goes wrong
    * the method throws a UserException.
-   * @return An Inputstream of the data from the URL call.
+   * @return An Inputstream of the data from the URL call. The caller is responsible for calling
+   *         close() on the InputStream.
    */
   public InputStream getInputStream() {
 
@@ -369,15 +371,14 @@
 
     // Build the request object
     Request request = requestBuilder.build();
+    Response response = null;
 
     try {
       logger.debug("Executing request: {}", request);
       logger.debug("Headers: {}", request.headers());
 
       // Execute the request
-      Response response = client
-        .newCall(request)
-        .execute();
+      response = client.newCall(request).execute();
 
       // Preserve the response
       responseMessage = response.message();
@@ -392,8 +393,9 @@
         paginator.notifyPartialPage();
       }
 
-      // If the request is unsuccessful, throw a UserException
+      // If the request is unsuccessful clean up and throw a UserException
       if (!isSuccessful(responseCode)) {
+        AutoCloseables.closeSilently(response);
         throw UserException
           .dataReadError()
           .message("HTTP request failed")
@@ -405,9 +407,12 @@
       logger.debug("HTTP Request for {} successful.", url());
       logger.debug("Response Headers: {} ", response.headers());
 
-      // Return the InputStream of the response
+      // Return the InputStream of the response. Note that it is necessary and
+      // and sufficient that the caller invokes close() on the returned stream.
       return Objects.requireNonNull(response.body()).byteStream();
+
     } catch (IOException e) {
+      // response can only be null at this location so we do not attempt to close it.
       throw UserException
         .dataReadError(e)
         .message("Failed to read the HTTP response body")
@@ -419,10 +424,14 @@
 
   public String getResultsFromApiCall() {
     InputStream inputStream = getInputStream();
-    return new BufferedReader(
-      new InputStreamReader(inputStream, StandardCharsets.UTF_8))
-      .lines()
-      .collect(Collectors.joining("\n"));
+    try {
+      return new BufferedReader(
+        new InputStreamReader(inputStream, StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+    } finally {
+      AutoCloseables.closeSilently(inputStream);
+    }
   }
 
   public static HttpProxyConfig getProxySettings(HttpStoragePluginConfig config, Config drillConfig, HttpUrl url) {
@@ -913,16 +922,25 @@
   }
 
   public static String getRequestAndStringResponse(String url) {
+    ResponseBody respBody = null;
     try {
-      return makeSimpleGetRequest(url).string();
+      respBody = makeSimpleGetRequest(url);
+      return respBody.string();
     } catch (IOException e) {
       throw UserException
         .dataReadError(e)
         .message("HTTP request failed")
         .build(logger);
+    } finally {
+      AutoCloseables.closeSilently(respBody);
     }
   }
 
+  /**
+   *
+   * @param url
+   * @return an input stream which the caller is responsible for closing.
+   */
   public static InputStream getRequestAndStreamResponse(String url) {
     try {
       return makeSimpleGetRequest(url).byteStream();
@@ -934,6 +952,12 @@
     }
   }
 
+  /**
+   *
+   * @param url
+   * @return response body which the caller is responsible for closing.
+   * @throws IOException
+   */
   public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
     OkHttpClient client = getSimpleHttpClient();
     Request.Builder requestBuilder = new Request.Builder()
@@ -943,8 +967,8 @@
     Request request = requestBuilder.build();
 
     // Execute the request
-      Response response = client.newCall(request).execute();
-      return response.body();
+    Response response = client.newCall(request).execute();
+    return response.body();
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
index 573df78..8b80499 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
@@ -38,13 +38,13 @@
   private static final Logger logger = LoggerFactory.getLogger(OAuthUtils.class);
 
   /**
-   * Craft a GET request to obtain an access token.
+   * Crafts a POST request to obtain an access token.
    * @param credentialsProvider A credential provider containing the clientID, clientSecret and authorizationCode
    * @param authorizationCode The authorization code from the OAuth2.0 enabled API
    * @param callbackURL  The callback URL. For our purposes this is obtained from the incoming Drill request as it all goes to the same place.
    * @return A Request Body to obtain an access token
    */
-  public static RequestBody getPostResponse(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) {
+  public static RequestBody getPostRequest(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) {
     return new FormBody.Builder()
       .add("grant_type", "authorization_code")
       .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID))
@@ -55,12 +55,12 @@
   }
 
   /**
-   * Crafts a POST response for refreshing an access token when a refresh token is present.
+   * Crafts a POST request for refreshing an access token when a refresh token is present.
    * @param credentialsProvider A credential provider containing the clientID, clientSecret and refreshToken
    * @param refreshToken The refresh token
    * @return A Request Body with the correct parameters for obtaining an access token
    */
-  public static RequestBody getPostResponseForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) {
+  public static RequestBody getPostRequestForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) {
     return new FormBody.Builder()
       .add("grant_type", "refresh_token")
       .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID))
@@ -90,7 +90,7 @@
       .url(buildAccessTokenURL(credentialsProvider))
       .header("Content-Type", "application/json")
       .addHeader("Accept", "application/json")
-      .post(getPostResponse(credentialsProvider, authenticationCode, callbackURL))
+      .post(getPostRequest(credentialsProvider, authenticationCode, callbackURL))
       .build();
   }
 
@@ -110,7 +110,7 @@
       .url(tokenURI)
       .header("Content-Type", "application/json")
       .addHeader("Accept", "application/json")
-      .post(getPostResponseForTokenRefresh(credentialsProvider, refreshToken))
+      .post(getPostRequestForTokenRefresh(credentialsProvider, refreshToken))
       .build();
   }
 
@@ -127,9 +127,10 @@
     String accessToken;
     String refreshToken;
     Map<String, String> tokens = new HashMap<>();
+    Response response = null;
 
     try {
-      Response response = client.newCall(request).execute();
+      response = client.newCall(request).execute();
       String responseBody = response.body().string();
 
       if (!response.isSuccessful()) {
@@ -164,13 +165,14 @@
         refreshToken = (String) parsedJson.get("refresh_token");
         tokens.put(OAuthTokenCredentials.REFRESH_TOKEN, refreshToken);
       }
-      response.close();
       return tokens;
 
     } catch (NullPointerException | IOException e) {
       throw UserException.connectionError()
         .message("Error refreshing access OAuth2 access token. " + e.getMessage())
         .build(logger);
+    } finally {
+      response.close();
     }
   }
 }