DRILL-8437: Add Header Index Pagination (#2806)

diff --git a/contrib/storage-http/.gitignore b/contrib/storage-http/.gitignore
index 710368b..05c3e7a 100644
--- a/contrib/storage-http/.gitignore
+++ b/contrib/storage-http/.gitignore
@@ -1 +1 @@
-./src/test/resources/logback-test.xml
+/src/test/resources/logback-test.xml
diff --git a/contrib/storage-http/Pagination.md b/contrib/storage-http/Pagination.md
index f555060..0647dbc 100644
--- a/contrib/storage-http/Pagination.md
+++ b/contrib/storage-http/Pagination.md
@@ -76,3 +76,15 @@
 
 
 ** Note: Index / Keyset Pagination is only implemented for APIs that return JSON **
+
+## Header Index Pagination
+Header index pagination is used when the API in question returns a link to the next page in the response header.  Shopify is one such example of an API that does this. 
+
+The only configuration option is the `nextPageParam` which is the parameter that Drill should look for in the response header.
+
+```json
+ "paginator": {
+        "nextPageParam": "page",
+        "method": "HEADER_INDEX"
+      }
+```
\ No newline at end of file
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index e6661ad..85d8073 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -51,6 +51,7 @@
 
 import java.io.File;
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -118,6 +119,7 @@
       .scanDefn(subScan)
       .url(url)
       .tempDir(new File(tempDirPath))
+      .paginator(paginator)
       .proxyConfig(proxySettings(negotiator.drillConfig(), url))
       .errorContext(errorContext)
       .build();
@@ -225,7 +227,7 @@
 
   protected Map<String, Object> generatePaginationFieldMap() {
     if (paginator == null || paginator.getMode() != PaginatorMethod.INDEX) {
-      return null;
+      return Collections.emptyMap();
     }
 
     Map<String, Object> fieldMap = new HashMap<>();
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index df55433..ab710e9 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -99,6 +99,7 @@
       .scanDefn(subScan)
       .url(url)
       .tempDir(new File(tempDirPath))
+      .paginator(paginator)
       .proxyConfig(proxySettings(negotiator.drillConfig(), url))
       .errorContext(errorContext)
       .build();
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
index e6b226f..9eb2f3d 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
@@ -133,10 +133,18 @@
             .build(logger);
         }
         break;
+      case HEADER_INDEX:
+        if (StringUtils.isEmpty(this.nextPageParam)) {
+          throw UserException
+              .validationError()
+              .message("Invalid paginator configuration.  For HEADER_INDEX pagination, the nextPageParam must be defined.")
+              .build(logger);
+        }
+        break;
       default:
         throw UserException
           .validationError()
-          .message("Invalid paginator method: %s.  Drill supports 'OFFSET', 'INDEX' and 'PAGE'", method)
+          .message("Invalid paginator method: %s.  Drill supports 'OFFSET', 'INDEX', 'HEADER_INDEX' and 'PAGE'", method)
           .build(logger);
     }
   }
@@ -230,7 +238,8 @@
   public enum PaginatorMethod {
     OFFSET,
     PAGE,
-    INDEX
+    INDEX,
+    HEADER_INDEX
   }
 
   @JsonIgnore
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index 0da06a4..d5f277f 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -34,6 +34,7 @@
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator;
 import org.apache.drill.exec.store.http.paginator.IndexPaginator;
 import org.apache.drill.exec.store.http.paginator.OffsetPaginator;
 import org.apache.drill.exec.store.http.paginator.PagePaginator;
@@ -92,7 +93,7 @@
   private static class HttpReaderFactory implements ReaderFactory {
     private final HttpSubScan subScan;
     private final HttpPaginatorConfig paginatorConfig;
-    private Paginator paginator;
+    private final Paginator paginator;
 
     private int count;
 
@@ -105,6 +106,8 @@
 
         // Initialize the paginator and generate the base URLs
         this.paginator = getPaginator();
+      } else {
+        this.paginator = null;
       }
     }
 
@@ -119,8 +122,6 @@
         rawUrl = HttpUrl.parse(subScan.tableSpec().connectionConfig().url());
       }
 
-
-
       // If the URL is not parsable or otherwise invalid
       if (rawUrl == null) {
         throw UserException.validationError()
@@ -130,28 +131,33 @@
 
       urlBuilder = rawUrl.newBuilder();
 
-      Paginator paginator = null;
       if (paginatorConfig.getMethodType() == PaginatorMethod.OFFSET) {
-        paginator = new OffsetPaginator(urlBuilder,
+        return new OffsetPaginator(urlBuilder,
           subScan.maxRecords(),
           paginatorConfig.pageSize(),
           paginatorConfig.limitParam(),
           paginatorConfig.offsetParam());
       } else if (paginatorConfig.getMethodType() == PaginatorMethod.PAGE) {
-        paginator = new PagePaginator(urlBuilder,
+        return new PagePaginator(urlBuilder,
           subScan.maxRecords(),
           paginatorConfig.pageSize(),
           paginatorConfig.pageParam(),
           paginatorConfig.pageSizeParam());
       } else if (paginatorConfig.getMethodType() == PaginatorMethod.INDEX) {
-        paginator = new IndexPaginator(urlBuilder,
+        return new IndexPaginator(urlBuilder,
           0,  // Page size not used for Index/Keyset pagination
           subScan.maxRecords(),
           paginatorConfig.hasMoreParam(),
           paginatorConfig.indexParam(),
           paginatorConfig.nextPageParam());
+      } else if (paginatorConfig.getMethodType() == PaginatorMethod.HEADER_INDEX) {
+        return new HeaderIndexPaginator(urlBuilder,
+            subScan.maxRecords(),
+            paginatorConfig.pageSize(),
+            paginatorConfig.nextPageParam(),
+            subScan.tableSpec().connectionConfig().url());
       }
-      return paginator;
+      return null;
     }
 
     @Override
@@ -181,6 +187,7 @@
         * the group scan such that the calls could be sent to different drillbits.
         */
         if (!paginator.hasNext()) {
+          logger.debug("Ending Batch Generation.");
           return null;
         }
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java
new file mode 100644
index 0000000..8bf388f
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.paginator;
+
+import okhttp3.Headers;
+import okhttp3.HttpUrl.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The Header Index Paginator is used when the API in question send a link in the HTTP header
+ * containing the URL for the next page.
+ */
+public class HeaderIndexPaginator extends Paginator {
+
+  private static final Logger logger = LoggerFactory.getLogger(HeaderIndexPaginator.class);
+  private static final Pattern URL_REGEX = Pattern.compile("(https?:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*))");
+
+  private final String nextPageParam;
+  private final String firstPageURL;
+  private Headers headers;
+  private boolean firstPage;
+  private int pageCount;
+
+  public HeaderIndexPaginator(Builder builder, int pageSize, int limit, String nextPageParam, String firstPageURL) {
+    super(builder, PaginatorMethod.HEADER_INDEX, pageSize, limit);
+    this.nextPageParam = nextPageParam;
+    this.firstPageURL = firstPageURL;
+    this.firstPage = true;
+    this.pageCount = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    // If the headers are null and it isn't the first page, end pagination
+    if ( !firstPage &&
+        (headers == null || StringUtils.isEmpty(headers.get(nextPageParam)))
+    ) {
+      notifyPartialPage();
+      logger.debug("Ending pagination.  No additional info in headers.");
+      return false;
+    }
+
+    return !partialPageReceived;
+  }
+
+  /**
+   * This method sets the headers for the Header Index Paginator.  This must be called with updated headers
+   * before the {@link #next()} method is called.
+   * @param headers A {@link Headers} object containing the response headers from the previous API call.
+   */
+  public void setResponseHeaders(Headers headers) {
+    logger.debug("Setting response headers. ");
+    this.headers = headers;
+
+    // If the next page URL is empty or otherwise undefined, halt pagination.
+    if (StringUtils.isEmpty(headers.get(nextPageParam))) {
+      notifyPartialPage();
+    }
+  }
+
+  @Override
+  public String next() {
+    pageCount++;
+    if (firstPage) {
+      firstPage = false;
+      return firstPageURL;
+    }
+
+    if (headers == null) {
+      throw UserException.dataReadError()
+          .message("Headers are empty.  HeaderIndex Pagination requires parameters that are passed in the HTTP header." + pageCount)
+          .build(logger);
+    }
+    // Now attempt to retrieve the field from the response headers.
+    String nextPage = headers.get(nextPageParam);
+
+    // If the next page value is null or empty, halt pagination
+    if (StringUtils.isEmpty(nextPage)) {
+      super.notifyPartialPage();
+      return null;
+    }
+
+    logger.debug("Found next page URL: {}", nextPage);
+
+    // Clean up any extraneous garbage from the header field.
+    Matcher urlMatcher = URL_REGEX.matcher(nextPage);
+    if (urlMatcher.find()) {
+      return urlMatcher.group(1);
+    }
+
+    return nextPage;
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
index b640d3a..a7bd7e5 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
@@ -28,7 +28,7 @@
 
 public class PagePaginator extends Paginator {
 
-  private static final Logger logger = LoggerFactory.getLogger(OffsetPaginator.class);
+  private static final Logger logger = LoggerFactory.getLogger(PagePaginator.class);
 
   private final int limit;
   private final String pageParam;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 3568fe9..6ba9918 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -50,9 +50,11 @@
 import org.apache.drill.exec.store.http.HttpApiConfig;
 import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
 import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
 import org.apache.drill.exec.store.http.HttpStoragePlugin;
 import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
 import org.apache.drill.exec.store.http.HttpSubScan;
+import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator;
 import org.apache.drill.exec.store.http.paginator.Paginator;
 import org.apache.drill.exec.store.http.oauth.AccessTokenAuthenticator;
 import org.apache.drill.exec.store.http.oauth.AccessTokenInterceptor;
@@ -110,8 +112,6 @@
     .writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
     .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
     .build();
-
-
   private final OkHttpClient client;
   private final File tempDir;
   private final HttpProxyConfig proxyConfig;
@@ -436,6 +436,11 @@
       logger.debug("HTTP Request for {} successful.", url());
       logger.debug("Response Headers: {} ", response.headers());
 
+      // In the case of Header Index Pagination, send the header(s) to the paginator
+      if (paginator != null && paginator.getMode() == PaginatorMethod.HEADER_INDEX) {
+        ((HeaderIndexPaginator)paginator).setResponseHeaders(response.headers());
+      }
+
       // Return the InputStream of the response. Note that it is necessary and
       // and sufficient that the caller invokes close() on the returned stream.
       return Objects.requireNonNull(response.body()).byteStream();
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
index afa5fe3..e5294a9 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -164,6 +164,20 @@
     headers.put("header1", "value1");
     headers.put("header2", "value2");
 
+    HttpPaginatorConfig headerIndexPaginator = HttpPaginatorConfig.builder()
+        .nextPageParam("link")
+        .pageSize(10)
+        .method("header_index")
+        .build();
+
+    HttpApiConfig mockJsonConfigWithHeaderIndex = HttpApiConfig.builder()
+        .url("http://localhost:8092/json")
+        .method("get")
+        .requireTail(false)
+        .paginator(headerIndexPaginator)
+        .inputType("json")
+        .build();
+
 
     HttpPaginatorConfig offsetPaginatorForJson = HttpPaginatorConfig.builder()
       .limitParam("limit")
@@ -329,6 +343,7 @@
     configs.put("json_tail", mockJsonConfigWithPaginatorAndTail);
     configs.put("xml_paginator", mockXmlConfigWithPaginator);
     configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams);
+    configs.put("customers", mockJsonConfigWithHeaderIndex);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
       new HttpStoragePluginConfig(false, configs, 2,1000, null, null, "", 80, "", "", "", null,
@@ -339,6 +354,28 @@
 
 
   @Test
+  public void testPagePaginationWithHeaderIndex() throws Exception {
+    String sql = "SELECT col1, _response_url FROM `local`.`customers`";
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1).setHeader("link", "http://localhost:8092/json?page=2"));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE2).setHeader("link", "http://localhost:8092/json?page=3"));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE3));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+          .sql(sql)
+          .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+
+      assertEquals(3, results.size());
+    }
+  }
+
+  @Test
   @Ignore("Requires Live Connection to Github")
   public void testPagePaginationWithURLParameters() throws Exception {
     String sql = "SELECT * FROM live.github WHERE org='apache' LIMIT 15";